source: trunk/jobarchived/jobarchived.py @ 274

Last change on this file since 274 was 273, checked in by bastiaans, 18 years ago

jobarchived/jobarchived.py:

  • fixed fd closure bug
  • Property svn:keywords set to Id
File size: 36.3 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 273 2006-07-20 10:53:24Z bastiaans $
22#
23
24import getopt, syslog, ConfigParser, sys
25
26def processArgs( args ):
27
28        SHORT_L = 'c:'
29        LONG_L = 'config='
30
31        config_filename = None
32
33        try:
34
35                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
36
37        except getopt.error, detail:
38
39                print detail
40                sys.exit(1)
41
42        for opt, value in opts:
43
44                if opt in [ '--config', '-c' ]:
45
46                        config_filename = value
47
48        if not config_filename:
49
50                config_filename = '/etc/jobarchived.conf'
51
52        try:
53                return loadConfig( config_filename )
54
55        except ConfigParser.NoOptionError, detail:
56
57                print detail
58                sys.exit( 1 )
59
60def loadConfig( filename ):
61
62        def getlist( cfg_string ):
63
64                my_list = [ ]
65
66                for item_txt in cfg_string.split( ',' ):
67
68                        sep_char = None
69
70                        item_txt = item_txt.strip()
71
72                        for s_char in [ "'", '"' ]:
73
74                                if item_txt.find( s_char ) != -1:
75
76                                        if item_txt.count( s_char ) != 2:
77
78                                                print 'Missing quote: %s' %item_txt
79                                                sys.exit( 1 )
80
81                                        else:
82
83                                                sep_char = s_char
84                                                break
85
86                        if sep_char:
87
88                                item_txt = item_txt.split( sep_char )[1]
89
90                        my_list.append( item_txt )
91
92                return my_list
93
94        cfg = ConfigParser.ConfigParser()
95
96        cfg.read( filename )
97
98        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL
99
100        ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
101
102        ARCHIVE_HOURS_PER_RRD = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
103
104        DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
105
106        USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
107
108        SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
109
110        try:
111
112                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
113
114        except AttributeError, detail:
115
116                print 'Unknown syslog facility'
117                sys.exit( 1 )
118
119        GMETAD_CONF = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
120
121        ARCHIVE_XMLSOURCE = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
122
123        ARCHIVE_DATASOURCES = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
124
125        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
126
127        JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
128
129        DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
130
131        RRDTOOL = cfg.get( 'DEFAULT', 'RRDTOOL' )
132
133        return True
134
135# What XML data types not to store
136#
137UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
138
139# Maximum time (in seconds) a parsethread may run
140#
141PARSE_TIMEOUT = 60
142
143# Maximum time (in seconds) a storethread may run
144#
145STORE_TIMEOUT = 360
146
147"""
148The Job Archiving Daemon
149"""
150
151from types import *
152
153import DBClass
154import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
155
156class DataSQLStore:
157
158        db_vars = None
159        dbc = None
160
161        def __init__( self, hostname, database ):
162
163                self.db_vars = DBClass.InitVars(DataBaseName=database,
164                                User='root',
165                                Host=hostname,
166                                Password='',
167                                Dictionary='true')
168
169                try:
170                        self.dbc     = DBClass.DB(self.db_vars)
171                except DBClass.DBError, details:
172                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
173                        sys.exit(1)
174
175        def setDatabase(self, statement):
176                ret = self.doDatabase('set', statement)
177                return ret
178               
179        def getDatabase(self, statement):
180                ret = self.doDatabase('get', statement)
181                return ret
182
183        def doDatabase(self, type, statement):
184
185                debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) )
186                try:
187                        if type == 'set':
188                                result = self.dbc.Set( statement )
189                                self.dbc.Commit()
190                        elif type == 'get':
191                                result = self.dbc.Get( statement )
192                               
193                except DBClass.DBError, detail:
194                        operation = statement.split(' ')[0]
195                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
196                        sys.exit(1)
197
198                debug_msg( 6, 'doDatabase(): result: %s' %(result) )
199                return result
200
201        def getJobNodeId( self, job_id, node_id ):
202
203                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
204                if len( id ) > 0:
205
206                        if len( id[0] ) > 0 and id[0] != '':
207                       
208                                return 1
209
210                return 0
211
212        def getNodeId( self, hostname ):
213
214                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
215
216                if len( id ) > 0:
217
218                        id = id[0][0]
219
220                        return id
221                else:
222                        return None
223
224        def getNodeIds( self, hostnames ):
225
226                ids = [ ]
227
228                for node in hostnames:
229
230                        id = self.getNodeId( node )
231
232                        if id:
233                                ids.append( id )
234
235                return ids
236
237        def getJobId( self, jobid ):
238
239                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
240
241                if id:
242                        id = id[0][0]
243
244                        return id
245                else:
246                        return None
247
248        def addJob( self, job_id, jobattrs ):
249
250                if not self.getJobId( job_id ):
251
252                        self.mutateJob( 'insert', job_id, jobattrs ) 
253                else:
254                        self.mutateJob( 'update', job_id, jobattrs )
255
256        def mutateJob( self, action, job_id, jobattrs ):
257
258                job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
259
260                insert_col_str = 'job_id'
261                insert_val_str = "'%s'" %job_id
262                update_str = None
263
264                debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))
265
266                ids = [ ]
267
268                for valname, value in jobattrs.items():
269
270                        if valname in job_values and value != '':
271
272                                column_name = 'job_' + valname
273
274                                if action == 'insert':
275
276                                        if not insert_col_str:
277                                                insert_col_str = column_name
278                                        else:
279                                                insert_col_str = insert_col_str + ',' + column_name
280
281                                        if not insert_val_str:
282                                                insert_val_str = value
283                                        else:
284                                                insert_val_str = insert_val_str + ",'%s'" %value
285
286                                elif action == 'update':
287                                       
288                                        if not update_str:
289                                                update_str = "%s='%s'" %(column_name, value)
290                                        else:
291                                                update_str = update_str + ",%s='%s'" %(column_name, value)
292
293                        elif valname == 'nodes' and value:
294
295                                node_valid = 1
296
297                                if len(value) == 1:
298                               
299                                        if jobattrs['status'] == 'Q':
300
301                                                node_valid = 0
302
303                                        else:
304
305                                                node_valid = 0
306
307                                                for node_char in str(value[0]):
308
309                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
310
311                                                                node_valid = 1
312
313                                if node_valid:
314
315                                        ids = self.addNodes( value, jobattrs['domain'] )
316
317                if action == 'insert':
318
319                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
320
321                elif action == 'update':
322
323                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
324
325                if len( ids ) > 0:
326                        self.addJobNodes( job_id, ids )
327
328        def addNodes( self, hostnames, domain ):
329
330                ids = [ ]
331
332                for node in hostnames:
333
334                        node = '%s.%s' %( node, domain )
335                        id = self.getNodeId( node )
336       
337                        if not id:
338                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
339                                id = self.getNodeId( node )
340
341                        ids.append( id )
342
343                return ids
344
345        def addJobNodes( self, jobid, nodes ):
346
347                for node in nodes:
348
349                        if not self.getJobNodeId( jobid, node ):
350
351                                self.addJobNode( jobid, node )
352
353        def addJobNode( self, jobid, nodeid ):
354
355                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
356
357        def storeJobInfo( self, jobid, jobattrs ):
358
359                self.addJob( jobid, jobattrs )
360
361class RRDMutator:
362        """A class for performing RRD mutations"""
363
364        binary = RRDTOOL
365
366        def __init__( self, binary=None ):
367                """Set alternate binary if supplied"""
368
369                if binary:
370                        self.binary = binary
371
372        def create( self, filename, args ):
373                """Create a new rrd with args"""
374
375                return self.perform( 'create', '"' + filename + '"', args )
376
377        def update( self, filename, args ):
378                """Update a rrd with args"""
379
380                return self.perform( 'update', '"' + filename + '"', args )
381
382        def grabLastUpdate( self, filename ):
383                """Determine the last update time of filename rrd"""
384
385                last_update = 0
386
387                debug_msg( 8, self.binary + ' info "' + filename + '"' )
388
389                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
390
391                        if line.find( 'last_update') != -1:
392
393                                last_update = line.split( ' = ' )[1]
394
395                if last_update:
396                        return last_update
397                else:
398                        return 0
399
400        def perform( self, action, filename, args ):
401                """Perform action on rrd filename with args"""
402
403                arg_string = None
404
405                if type( args ) is not ListType:
406                        debug_msg( 8, 'Arguments needs to be of type List' )
407                        return 1
408
409                for arg in args:
410
411                        if not arg_string:
412
413                                arg_string = arg
414                        else:
415                                arg_string = arg_string + ' ' + arg
416
417                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
418
419                cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
420                lines = cmd.readlines()
421                cmd.close()
422
423                for line in lines:
424
425                        if line.find( 'ERROR' ) != -1:
426
427                                error_msg = string.join( line.split( ' ' )[1:] )
428                                debug_msg( 8, error_msg )
429                                return 1
430
431                return 0
432
433class XMLProcessor:
434        """Skeleton class for XML processor's"""
435
436        def run( self ):
437                """Do main processing of XML here"""
438
439                pass
440
441class TorqueXMLProcessor( XMLProcessor ):
442        """Main class for processing XML and acting with it"""
443
444        def __init__( self ):
445                """Setup initial XML connection and handlers"""
446
447                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 
448                self.myXMLSource = self.myXMLGatherer.getFileObject()
449                self.myXMLHandler = TorqueXMLHandler()
450                self.myXMLError = XMLErrorHandler()
451                self.config = GangliaConfigParser( GMETAD_CONF )
452
453        def run( self ):
454                """Main XML processing"""
455
456                debug_msg( 1, 'torque_xml_thread(): started.' )
457
458                while( 1 ):
459
460                        self.myXMLSource = self.myXMLGatherer.getFileObject()
461                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
462
463                        try:
464                                xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
465                        except socket.error, msg:
466                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
467                               
468                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
469                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
470                        time.sleep( self.config.getLowestInterval() )
471
472class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
473        """Parse Torque's jobinfo XML from our plugin"""
474
475        jobAttrs = { }
476
477        def __init__( self ):
478
479                self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
480                self.jobs_processed = [ ]
481                self.jobs_to_store = [ ]
482
483        def startDocument( self ):
484
485                self.heartbeat = 0
486
487        def startElement( self, name, attrs ):
488                """
489                This XML will be all gmetric XML
490                so there will be no specific start/end element
491                just one XML statement with all info
492                """
493               
494                jobinfo = { }
495
496                if name == 'CLUSTER':
497
498                        self.clustername = attrs.get( 'NAME', "" )
499
500                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
501
502                        metricname = attrs.get( 'NAME', "" )
503
504                        if metricname == 'TOGA-HEARTBEAT':
505                                self.heartbeat = attrs.get( 'VAL', "" )
506
507                        elif metricname.find( 'TOGA-JOB' ) != -1:
508
509                                job_id = metricname.split( 'TOGA-JOB-' )[1]
510                                val = attrs.get( 'VAL', "" )
511
512                                if not job_id in self.jobs_processed:
513                                        self.jobs_processed.append( job_id )
514
515                                check_change = 0
516
517                                if self.jobAttrs.has_key( job_id ):
518                                        check_change = 1
519
520                                valinfo = val.split( ' ' )
521
522                                for myval in valinfo:
523
524                                        if len( myval.split( '=' ) ) > 1:
525
526                                                valname = myval.split( '=' )[0]
527                                                value = myval.split( '=' )[1]
528
529                                                if valname == 'nodes':
530                                                        value = value.split( ';' )
531
532                                                jobinfo[ valname ] = value
533
534                                if check_change:
535                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
536                                                self.jobAttrs[ job_id ]['stop_timestamp'] = ''
537                                                self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
538                                                if not job_id in self.jobs_to_store:
539                                                        self.jobs_to_store.append( job_id )
540
541                                                debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
542                                else:
543                                        self.jobAttrs[ job_id ] = jobinfo
544
545                                        if not job_id in self.jobs_to_store:
546                                                self.jobs_to_store.append( job_id )
547
548                                        debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
549                                       
550        def endDocument( self ):
551                """When all metrics have gone, check if any jobs have finished"""
552
553                if self.heartbeat:
554                        for jobid, jobinfo in self.jobAttrs.items():
555
556                                # This is an old job, not in current jobinfo list anymore
557                                # it must have finished, since we _did_ get a new heartbeat
558                                #
559                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
560
561                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
562
563                                        if not jobid in self.jobs_processed:
564                                                self.jobs_processed.append( jobid )
565
566                                        self.jobAttrs[ jobid ]['status'] = 'F'
567                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( mytime )
568
569                                        if not jobid in self.jobs_to_store:
570                                                self.jobs_to_store.append( jobid )
571
572                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
573
574                        for jobid in self.jobs_to_store:
575                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'Q', 'F' ]:
576
577                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
578
579                                        if self.jobAttrs[ jobid ]['status'] == 'F':
580                                                del self.jobAttrs[ jobid ]
581
582                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
583
584                        self.jobs_processed = [ ]
585                        self.jobs_to_store = [ ]
586
587        def setJobAttrs( self, old, new ):
588                """
589                Set new job attributes in old, but not lose existing fields
590                if old attributes doesn't have those
591                """
592
593                for valname, value in new.items():
594                        old[ valname ] = value
595
596                return old
597               
598
599        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
600                """
601                Check if jobinfo has changed from jobattrs[jobid]
602                if it's report time is bigger than previous one
603                and it is report time is recent (equal to heartbeat)
604                """
605
606                ignore_changes = [ 'reported' ]
607
608                if jobattrs.has_key( jobid ):
609
610                        for valname, value in jobinfo.items():
611
612                                if valname not in ignore_changes:
613
614                                        if jobattrs[ jobid ].has_key( valname ):
615
616                                                if value != jobattrs[ jobid ][ valname ]:
617
618                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
619                                                                return 1
620
621                                        else:
622                                                return 1
623
624                return 0
625
626class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
627        """Parse Ganglia's XML"""
628
629        def __init__( self, config ):
630                """Setup initial variables and gather info on existing rrd archive"""
631
632                self.config = config
633                self.clusters = { }
634                debug_msg( 1, 'Checking existing toga rrd archive..' )
635                self.gatherClusters()
636                debug_msg( 1, 'Check done.' )
637
638        def gatherClusters( self ):
639                """Find all existing clusters in archive dir"""
640
641                archive_dir = check_dir(ARCHIVE_PATH)
642
643                hosts = [ ]
644
645                if os.path.exists( archive_dir ):
646
647                        dirlist = os.listdir( archive_dir )
648
649                        for item in dirlist:
650
651                                clustername = item
652
653                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
654
655                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
656
657        def startElement( self, name, attrs ):
658                """Memorize appropriate data from xml start tags"""
659
660                if name == 'GANGLIA_XML':
661
662                        self.XMLSource = attrs.get( 'SOURCE', "" )
663                        self.gangliaVersion = attrs.get( 'VERSION', "" )
664
665                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
666
667                elif name == 'GRID':
668
669                        self.gridName = attrs.get( 'NAME', "" )
670                        self.time = attrs.get( 'LOCALTIME', "" )
671
672                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
673
674                elif name == 'CLUSTER':
675
676                        self.clusterName = attrs.get( 'NAME', "" )
677                        self.time = attrs.get( 'LOCALTIME', "" )
678
679                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
680
681                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
682
683                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
684
685                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
686
687                        self.hostName = attrs.get( 'NAME', "" )
688                        self.hostIp = attrs.get( 'IP', "" )
689                        self.hostReported = attrs.get( 'REPORTED', "" )
690
691                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
692
693                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
694
695                        type = attrs.get( 'TYPE', "" )
696                       
697                        exclude_metric = False
698                       
699                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
700
701                                orig_name = attrs.get( 'NAME', "" )     
702
703                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
704                               
705                                        exclude_metric = True
706
707                                elif re.match( ex_metricstr, orig_name ):
708
709                                        exclude_metric = True
710
711                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
712
713                                myMetric = { }
714                                myMetric['name'] = attrs.get( 'NAME', "" )
715                                myMetric['val'] = attrs.get( 'VAL', "" )
716                                myMetric['time'] = self.hostReported
717
718                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
719
720                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
721
722        def storeMetrics( self ):
723                """Store metrics of each cluster rrd handler"""
724
725                for clustername, rrdh in self.clusters.items():
726
727                        ret = rrdh.storeMetrics()
728
729                        if ret:
730                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
731                                return 1
732
733                return 0
734
735class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
736
737        def error( self, exception ):
738                """Recoverable error"""
739
740                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
741
742        def fatalError( self, exception ):
743                """Non-recoverable error"""
744
745                exception_str = str( exception )
746
747                # Ignore 'no element found' errors
748                if exception_str.find( 'no element found' ) != -1:
749                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
750                        return 0
751
752                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
753                sys.exit( 1 )
754
755        def warning( self, exception ):
756                """Warning"""
757
758                debug_msg( 0, 'Warning ' + str( exception ) )
759
760class XMLGatherer:
761        """Setup a connection and file object to Ganglia's XML"""
762
763        s = None
764        fd = None
765
766        def __init__( self, host, port ):
767                """Store host and port for connection"""
768
769                self.host = host
770                self.port = port
771                self.connect()
772                self.makeFileDescriptor()
773
774        def connect( self ):
775                """Setup connection to XML source"""
776
777                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
778
779                        af, socktype, proto, canonname, sa = res
780
781                        try:
782
783                                self.s = socket.socket( af, socktype, proto )
784
785                        except socket.error, msg:
786
787                                self.s = None
788                                continue
789
790                        try:
791
792                                self.s.connect( sa )
793
794                        except socket.error, msg:
795
796                                self.disconnect()
797                                continue
798
799                        break
800
801                if self.s is None:
802
803                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
804                        sys.exit( 1 )
805
806        def disconnect( self ):
807                """Close socket"""
808
809                if self.s:
810                        self.s.shutdown( 2 )
811                        self.s.close()
812                        self.s = None
813
814        def __del__( self ):
815                """Kill the socket before we leave"""
816
817                self.disconnect()
818
819        def reconnect( self ):
820                """Reconnect"""
821
822                if self.s:
823                        self.disconnect()
824
825                self.connect()
826
827        def makeFileDescriptor( self ):
828                """Make file descriptor that points to our socket connection"""
829
830                self.reconnect()
831
832                if self.s:
833                        self.fd = self.s.makefile( 'r' )
834
835        def getFileObject( self ):
836                """Connect, and return a file object"""
837
838                self.makeFileDescriptor()
839
840                if self.fd:
841                        return self.fd
842
843class GangliaXMLProcessor( XMLProcessor ):
844        """Main class for processing XML and acting with it"""
845
846        def __init__( self ):
847                """Setup initial XML connection and handlers"""
848
849                self.config = GangliaConfigParser( GMETAD_CONF )
850
851                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 
852                self.myXMLSource = self.myXMLGatherer.getFileObject()
853                self.myXMLHandler = GangliaXMLHandler( self.config )
854                self.myXMLError = XMLErrorHandler()
855
856        def run( self ):
857                """Main XML processing; start a xml and storethread"""
858
859                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
860                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
861
862                while( 1 ):
863
864                        if not xml_thread.isAlive():
865                                # Gather XML at the same interval as gmetad
866
867                                # threaded call to: self.processXML()
868                                #
869                                try:
870                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
871                                        xml_thread.start()
872                                except thread.error, msg:
873                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
874                                        #return 1
875
876                        if not store_thread.isAlive():
877                                # Store metrics every .. sec
878
879                                # threaded call to: self.storeMetrics()
880                                #
881                                try:
882                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
883                                        store_thread.start()
884                                except thread.error, msg:
885                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
886                                        #return 1
887               
888                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
889                        time.sleep( 1 ) 
890
891        def storeMetrics( self ):
892                """Store metrics retained in memory to disk"""
893
894                # Store metrics somewhere between every 360 and 640 seconds
895                #
896                STORE_INTERVAL = random.randint( 360, 640 )
897
898                try:
899                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
900                        store_metric_thread.start()
901                except thread.error, msg:
902                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
903                        return 1
904
905                debug_msg( 1, 'ganglia_store_thread(): started.' )
906
907                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
908                time.sleep( STORE_INTERVAL )
909                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
910
911                if store_metric_thread.isAlive():
912
913                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
914                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
915                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
916
917                debug_msg( 1, 'ganglia_store_thread(): finished.' )
918
919                return 0
920
921        def storeThread( self ):
922                """Actual metric storing thread"""
923
924                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
925                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
926                ret = self.myXMLHandler.storeMetrics()
927                if ret > 0:
928                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
929                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
930                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
931               
932                return 0
933
934        def processXML( self ):
935                """Process XML"""
936
937                try:
938                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
939                        parsethread.start()
940                except thread.error, msg:
941                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
942                        return 1
943
944                debug_msg( 1, 'ganglia_xml_thread(): started.' )
945
946                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
947                time.sleep( float( self.config.getLowestInterval() ) ) 
948                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
949
950                if parsethread.isAlive():
951
952                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
953                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
954                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
955
956                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
957
958                return 0
959
960        def parseThread( self ):
961                """Actual parsing thread"""
962
963                debug_msg( 1, 'ganglia_parse_thread(): started.' )
964                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
965                self.myXMLSource = self.myXMLGatherer.getFileObject()
966
967                try:
968                        xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
969                except socket.error, msg:
970                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
971
972                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
973                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
974
975                return 0
976
977class GangliaConfigParser:
978
979        sources = [ ]
980
981        def __init__( self, config ):
982                """Parse some stuff from our gmetad's config, such as polling interval"""
983
984                self.config = config
985                self.parseValues()
986
987        def parseValues( self ):
988                """Parse certain values from gmetad.conf"""
989
990                readcfg = open( self.config, 'r' )
991
992                for line in readcfg.readlines():
993
994                        if line.count( '"' ) > 1:
995
996                                if line.find( 'data_source' ) != -1 and line[0] != '#':
997
998                                        source = { }
999                                        source['name'] = line.split( '"' )[1]
1000                                        source_words = line.split( '"' )[2].split( ' ' )
1001
1002                                        for word in source_words:
1003
1004                                                valid_interval = 1
1005
1006                                                for letter in word:
1007
1008                                                        if letter not in string.digits:
1009
1010                                                                valid_interval = 0
1011
1012                                                if valid_interval and len(word) > 0:
1013
1014                                                        source['interval'] = word
1015                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
1016       
1017                                        # No interval found, use Ganglia's default     
1018                                        if not source.has_key( 'interval' ):
1019                                                source['interval'] = 15
1020                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1021
1022                                        self.sources.append( source )
1023
1024        def getInterval( self, source_name ):
1025                """Return interval for source_name"""
1026
1027                for source in self.sources:
1028
1029                        if source['name'] == source_name:
1030
1031                                return source['interval']
1032
1033                return None
1034
1035        def getLowestInterval( self ):
1036                """Return the lowest interval of all clusters"""
1037
1038                lowest_interval = 0
1039
1040                for source in self.sources:
1041
1042                        if not lowest_interval or source['interval'] <= lowest_interval:
1043
1044                                lowest_interval = source['interval']
1045
1046                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1047                if lowest_interval:
1048                        return lowest_interval
1049                else:
1050                        return 15
1051
1052class RRDHandler:
1053        """Class for handling RRD activity"""
1054
1055        myMetrics = { }
1056        lastStored = { }
1057        timeserials = { }
1058        slot = None
1059
1060        def __init__( self, config, cluster ):
1061                """Setup initial variables"""
1062
1063                self.block = 0
1064                self.cluster = cluster
1065                self.config = config
1066                self.slot = threading.Lock()
1067                self.rrdm = RRDMutator()
1068                self.gatherLastUpdates()
1069
1070        def gatherLastUpdates( self ):
1071                """Populate the lastStored list, containing timestamps of all last updates"""
1072
1073                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1074
1075                hosts = [ ]
1076
1077                if os.path.exists( cluster_dir ):
1078
1079                        dirlist = os.listdir( cluster_dir )
1080
1081                        for dir in dirlist:
1082
1083                                hosts.append( dir )
1084
1085                for host in hosts:
1086
1087                        host_dir = cluster_dir + '/' + host
1088                        dirlist = os.listdir( host_dir )
1089
1090                        for dir in dirlist:
1091
1092                                if not self.timeserials.has_key( host ):
1093
1094                                        self.timeserials[ host ] = [ ]
1095
1096                                self.timeserials[ host ].append( dir )
1097
1098                        last_serial = self.getLastRrdTimeSerial( host )
1099                        if last_serial:
1100
1101                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1102                                if os.path.exists( metric_dir ):
1103
1104                                        dirlist = os.listdir( metric_dir )
1105
1106                                        for file in dirlist:
1107
1108                                                metricname = file.split( '.rrd' )[0]
1109
1110                                                if not self.lastStored.has_key( host ):
1111
1112                                                        self.lastStored[ host ] = { }
1113
1114                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1115
1116        def getClusterName( self ):
1117                """Return clustername"""
1118
1119                return self.cluster
1120
1121        def memMetric( self, host, metric ):
1122                """Store metric from host in memory"""
1123
1124                # <ATOMIC>
1125                #
1126                self.slot.acquire()
1127               
1128                if self.myMetrics.has_key( host ):
1129
1130                        if self.myMetrics[ host ].has_key( metric['name'] ):
1131
1132                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1133
1134                                        if mymetric['time'] == metric['time']:
1135
1136                                                # Allready have this metric, abort
1137                                                self.slot.release()
1138                                                return 1
1139                        else:
1140                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1141                else:
1142                        self.myMetrics[ host ] = { }
1143                        self.myMetrics[ host ][ metric['name'] ] = [ ]
1144
1145                # Push new metric onto stack
1146                # atomic code; only 1 thread at a time may access the stack
1147
1148                self.myMetrics[ host ][ metric['name'] ].append( metric )
1149
1150                self.slot.release()
1151                #
1152                # </ATOMIC>
1153
1154        def makeUpdateList( self, host, metriclist ):
1155                """
1156                Make a list of update values for rrdupdate
1157                but only those that we didn't store before
1158                """
1159
1160                update_list = [ ]
1161                metric = None
1162
1163                while len( metriclist ) > 0:
1164
1165                        metric = metriclist.pop( 0 )
1166
1167                        if self.checkStoreMetric( host, metric ):
1168                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
1169
1170                return update_list
1171
1172        def checkStoreMetric( self, host, metric ):
1173                """Check if supplied metric if newer than last one stored"""
1174
1175                if self.lastStored.has_key( host ):
1176
1177                        if self.lastStored[ host ].has_key( metric['name'] ):
1178
1179                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1180
1181                                        # This is old
1182                                        return 0
1183
1184                return 1
1185
1186        def memLastUpdate( self, host, metricname, metriclist ):
1187                """
1188                Memorize the time of the latest metric from metriclist
1189                but only if it wasn't allready memorized
1190                """
1191
1192                if not self.lastStored.has_key( host ):
1193                        self.lastStored[ host ] = { }
1194
1195                last_update_time = 0
1196
1197                for metric in metriclist:
1198
1199                        if metric['name'] == metricname:
1200
1201                                if metric['time'] > last_update_time:
1202
1203                                        last_update_time = metric['time']
1204
1205                if self.lastStored[ host ].has_key( metricname ):
1206                       
1207                        if last_update_time <= self.lastStored[ host ][ metricname ]:
1208                                return 1
1209
1210                self.lastStored[ host ][ metricname ] = last_update_time
1211
1212        def storeMetrics( self ):
1213                """
1214                Store all metrics from memory to disk
1215                and do it to the RRD's in appropriate timeperiod directory
1216                """
1217
1218                for hostname, mymetrics in self.myMetrics.items():     
1219
1220                        for metricname, mymetric in mymetrics.items():
1221
1222                                metrics_to_store = [ ]
1223
1224                                # Pop metrics from stack for storing until none is left
1225                                # atomic code: only 1 thread at a time may access myMetrics
1226
1227                                # <ATOMIC>
1228                                #
1229                                self.slot.acquire() 
1230
1231                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1232
1233                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1234
1235                                                try:
1236                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1237                                                except IndexError, msg:
1238
1239                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1240                                                        # is still len 0 when the statement is executed.
1241                                                        # Just ignore indexerror's..
1242                                                        pass
1243
1244                                self.slot.release()
1245                                #
1246                                # </ATOMIC>
1247
1248                                # Create a mapping table, each metric to the period where it should be stored
1249                                #
1250                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1251
1252                                update_rets = [ ]
1253
1254                                for period, pmetric in metric_serial_table.items():
1255
1256                                        create_ret = self.createCheck( hostname, metricname, period )   
1257
1258                                        update_ret = self.update( hostname, metricname, period, pmetric )
1259
1260                                        if update_ret == 0:
1261
1262                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1263                                        else:
1264                                                debug_msg( 9, 'metric update failed' )
1265
1266                                        update_rets.append( create_ret )
1267                                        update_rets.append( update_ret )
1268
1269                                # Lets ignore errors here for now, we need to make sure last update time
1270                                # is correct!
1271                                #
1272                                #if not (1) in update_rets:
1273
1274                                self.memLastUpdate( hostname, metricname, metrics_to_store )
1275
1276        def makeTimeSerial( self ):
1277                """Generate a time serial. Seconds since epoch"""
1278
1279                # Seconds since epoch
1280                mytime = int( time.time() )
1281
1282                return mytime
1283
1284        def makeRrdPath( self, host, metricname, timeserial ):
1285                """Make a RRD location/path and filename"""
1286
1287                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1288                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
1289
1290                return rrd_dir, rrd_file
1291
1292        def getLastRrdTimeSerial( self, host ):
1293                """Find the last timeserial (directory) for this host"""
1294
1295                newest_timeserial = 0
1296
1297                for dir in self.timeserials[ host ]:
1298
1299                        valid_dir = 1
1300
1301                        for letter in dir:
1302                                if letter not in string.digits:
1303                                        valid_dir = 0
1304
1305                        if valid_dir:
1306                                timeserial = dir
1307                                if timeserial > newest_timeserial:
1308                                        newest_timeserial = timeserial
1309
1310                if newest_timeserial:
1311                        return newest_timeserial
1312                else:
1313                        return 0
1314
1315        def determinePeriod( self, host, check_serial ):
1316                """Determine to which period (directory) this time(serial) belongs"""
1317
1318                period_serial = 0
1319
1320                if self.timeserials.has_key( host ):
1321
1322                        for serial in self.timeserials[ host ]:
1323
1324                                if check_serial >= serial and period_serial < serial:
1325
1326                                        period_serial = serial
1327
1328                return period_serial
1329
1330        def determineSerials( self, host, metricname, metriclist ):
1331                """
1332                Determine the correct serial and corresponding rrd to store
1333                for a list of metrics
1334                """
1335
1336                metric_serial_table = { }
1337
1338                for metric in metriclist:
1339
1340                        if metric['name'] == metricname:
1341
1342                                period = self.determinePeriod( host, metric['time'] )   
1343
1344                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1345
1346                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1347
1348                                        # This one should get it's own new period
1349                                        period = metric['time']
1350
1351                                        if not self.timeserials.has_key( host ):
1352                                                self.timeserials[ host ] = [ ]
1353
1354                                        self.timeserials[ host ].append( period )
1355
1356                                if not metric_serial_table.has_key( period ):
1357
1358                                        metric_serial_table[ period ] = [ ]
1359
1360                                metric_serial_table[ period ].append( metric )
1361
1362                return metric_serial_table
1363
1364        def createCheck( self, host, metricname, timeserial ):
1365                """Check if an rrd allready exists for this metric, create if not"""
1366
1367                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1368               
1369                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1370
1371                if not os.path.exists( rrd_dir ):
1372
1373                        try:
1374                                os.makedirs( rrd_dir )
1375
1376                        except os.OSError, msg:
1377
1378                                if msg.find( 'File exists' ) != -1:
1379
1380                                        # Ignore exists errors
1381                                        pass
1382
1383                                else:
1384
1385                                        print msg
1386                                        return
1387
1388                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
1389
1390                if not os.path.exists( rrd_file ):
1391
1392                        interval = self.config.getInterval( self.cluster )
1393                        heartbeat = 8 * int( interval )
1394
1395                        params = [ ]
1396
1397                        params.append( '--step' )
1398                        params.append( str( interval ) )
1399
1400                        params.append( '--start' )
1401                        params.append( str( int( timeserial ) - 1 ) )
1402
1403                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1404                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
1405
1406                        self.rrdm.create( str(rrd_file), params )
1407
1408                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1409
1410        def update( self, host, metricname, timeserial, metriclist ):
1411                """
1412                Update rrd file for host with metricname
1413                in directory timeserial with metriclist
1414                """
1415
1416                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1417
1418                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1419
1420                update_list = self.makeUpdateList( host, metriclist )
1421
1422                if len( update_list ) > 0:
1423                        ret = self.rrdm.update( str(rrd_file), update_list )
1424
1425                        if ret:
1426                                return 1
1427               
1428                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
1429
1430                return 0
1431
1432def daemon():
1433        """daemonized threading"""
1434
1435        # Fork the first child
1436        #
1437        pid = os.fork()
1438
1439        if pid > 0:
1440
1441                sys.exit(0)  # end parent
1442
1443        # creates a session and sets the process group ID
1444        #
1445        os.setsid()
1446
1447        # Fork the second child
1448        #
1449        pid = os.fork()
1450
1451        if pid > 0:
1452
1453                sys.exit(0)  # end parent
1454
1455        # Go to the root directory and set the umask
1456        #
1457        os.chdir('/')
1458        os.umask(0)
1459
1460        sys.stdin.close()
1461        sys.stdout.close()
1462        sys.stderr.close()
1463
1464        os.open('/dev/null', os.O_RDWR)
1465        os.dup2(0, 1)
1466        os.dup2(0, 2)
1467
1468        run()
1469
1470def run():
1471        """Threading start"""
1472
1473        myTorqueProcessor = TorqueXMLProcessor()
1474        myGangliaProcessor = GangliaXMLProcessor()
1475
1476        try:
1477                torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1478                ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
1479
1480                torque_xml_thread.start()
1481                ganglia_xml_thread.start()
1482               
1483        except thread.error, msg:
1484                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1485                syslog.closelog()
1486                sys.exit(1)
1487               
1488        debug_msg( 0, 'main threading started.' )
1489
1490def main():
1491        """Program startup"""
1492
1493        if not processArgs( sys.argv[1:] ):
1494                sys.exit( 1 )
1495
1496        if( DAEMONIZE and USE_SYSLOG ):
1497                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1498
1499        if DAEMONIZE:
1500                daemon()
1501        else:
1502                run()
1503
1504#
1505# Global functions
1506#
1507
1508def check_dir( directory ):
1509        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
1510
1511        if directory[-1] == '/':
1512                directory = directory[:-1]
1513
1514        return directory
1515
1516def debug_msg( level, msg ):
1517        """Only print msg if correct levels"""
1518
1519        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1520                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1521       
1522        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1523                syslog.syslog( msg )
1524
1525def printTime( ):
1526        """Print current time in human readable format"""
1527
1528        return time.strftime("%a %d %b %Y %H:%M:%S")
1529
1530# Ooohh, someone started me! Let's go..
1531if __name__ == '__main__':
1532        main()
Note: See TracBrowser for help on using the repository browser.