source: trunk/jobarchived/jobarchived.py @ 214

Last change on this file since 214 was 214, checked in by bastiaans, 18 years ago

jobarchived/jobarchived.conf:

  • import of config

jobarchived/jobarchived.py:

  • added config file support
File size: 36.7 KB
RevLine 
[3]1#!/usr/bin/env python
2
[214]3import getopt, syslog, ConfigParser, sys
[3]4
[214]5def processArgs( args ):
[6]6
[214]7        SHORT_L = 'c:'
8        LONG_L = 'config='
[169]9
[214]10        config_filename = None
[169]11
[214]12        try:
[169]13
[214]14                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]15
[214]16        except getopt.error, detail:
[60]17
[214]18                print detail
19                sys.exit(1)
[9]20
[214]21        for opt, value in opts:
[60]22
[214]23                if opt in [ '--config', '-c' ]:
[13]24
[214]25                        config_filename = value
[198]26
[214]27        if not config_filename:
[60]28
[214]29                config_filename = '/etc/jobarchived.conf'
[22]30
[214]31        try:
32                return loadConfig( config_filename )
[13]33
[214]34        except ConfigParser.NoOptionError, detail:
35
36                print detail
37                sys.exit( 1 )
38
39def loadConfig( filename ):
40
41        def getlist( cfg_string ):
42
43                my_list = [ ]
44
45                for item_txt in cfg_string.split( ',' ):
46
47                        sep_char = None
48
49                        item_txt = item_txt.strip()
50
51                        for s_char in [ "'", '"' ]:
52
53                                if item_txt.find( s_char ) != -1:
54
55                                        if item_txt.count( s_char ) != 2:
56
57                                                print 'Missing quote: %s' %item_txt
58                                                sys.exit( 1 )
59
60                                        else:
61
62                                                sep_char = s_char
63                                                break
64
65                        if sep_char:
66
67                                item_txt = item_txt.split( sep_char )[1]
68
69                        my_list.append( item_txt )
70
71                return my_list
72
73        cfg = ConfigParser.ConfigParser()
74
75        cfg.read( filename )
76
77        # Which metrics to exclude from archiving
78        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE
79
80        # Where to store the archived rrd's
81        #
82        ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
83
84        # Amount of hours to store in one single archived .rrd
85        #
86        ARCHIVE_HOURS_PER_RRD = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
87
88        # Specify debugging level here (only when _not_ DAEMONIZE)
89        #
90        # 11 = XML: metrics
91        # 10 = XML: host, cluster, grid, ganglia
92        # 9  = RRD activity, gmetad config parsing
93        # 8  = RRD file activity
94        # 6  = SQL
95        # 1  = daemon threading
96        # 0  = errors
97        #
98        # default: 0
99        DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
100
101        # Enable logging to syslog?
102        #
103        USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
104
105        # What level msg'es should be logged to syslog?
106        #
107        # default: lvl 0 (errors)
108        #
109        SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
110
111        # Which facility to use in syslog
112        #
113        # Syntax I.e.:
114        #       LOG_KERN, LOG_USER, LOG_MAIL, LOG_DAEMON, LOG_AUTH, LOG_LPR,
115        #       LOG_NEWS, LOG_UUCP, LOG_CRON and LOG_LOCAL0 to LOG_LOCAL7
116        #
117        try:
118
119                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
120
121        except AttributeError, detail:
122
123                print 'Unknown syslog facility'
124                sys.exit( 1 )
125
126        # Where is the gmetad.conf located
127        #
128        GMETAD_CONF = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
129
130        # Where to grab XML data from
131        # Normally: local gmetad (port 8651)
132        #
133        # Syntax: <hostname>:<port>
134        #
135        ARCHIVE_XMLSOURCE = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
136
137        # List of data_source names to archive for
138        #
139        # Syntax: [ "<clustername>", "<clustername>" ]
140        #
141        ARCHIVE_DATASOURCES = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
142
143        # NOTE: This can be a regexp or a string
144        #
145        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
146
147        # Toga's SQL dbase to use
148        #
149        # Syntax: <hostname>/<database>
150        #
151        JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
152
153        # Wether or not to run as a daemon in background
154        #
155        DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
156
157        return True
158
[60]159###
160# You'll only want to change anything below here unless you
[63]161# know what you are doing (i.e. your name is Ramon Bastiaans :D )
[60]162###
163
[17]164# What XML data types not to store
[13]165#
[17]166UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]167
[47]168# Maximum time (in seconds) a parsethread may run
169#
170PARSE_TIMEOUT = 60
171
172# Maximum time (in seconds) a storethread may run
173#
174STORE_TIMEOUT = 360
175
[8]176"""
177This is TOrque-GAnglia's data Daemon
178"""
179
[214]180from types import *
181
182import DBClass
183import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
184
[84]185class DataSQLStore:
186
187        db_vars = None
188        dbc = None
189
190        def __init__( self, hostname, database ):
191
192                self.db_vars = DBClass.InitVars(DataBaseName=database,
193                                User='root',
194                                Host=hostname,
195                                Password='',
196                                Dictionary='true')
197
198                try:
199                        self.dbc     = DBClass.DB(self.db_vars)
200                except DBClass.DBError, details:
[169]201                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
[84]202                        sys.exit(1)
203
204        def setDatabase(self, statement):
205                ret = self.doDatabase('set', statement)
206                return ret
207               
208        def getDatabase(self, statement):
209                ret = self.doDatabase('get', statement)
210                return ret
211
212        def doDatabase(self, type, statement):
213
214                debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) )
215                try:
216                        if type == 'set':
217                                result = self.dbc.Set( statement )
218                                self.dbc.Commit()
219                        elif type == 'get':
220                                result = self.dbc.Get( statement )
221                               
222                except DBClass.DBError, detail:
223                        operation = statement.split(' ')[0]
[169]224                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
[84]225                        sys.exit(1)
226
227                debug_msg( 6, 'doDatabase(): result: %s' %(result) )
228                return result
229
[191]230        def getJobNodeId( self, job_id, node_id ):
231
232                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
233                if len( id ) > 0:
234
235                        if len( id[0] ) > 0 and id[0] != '':
236                       
237                                return 1
238
239                return 0
240
[84]241        def getNodeId( self, hostname ):
242
[89]243                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]244
[89]245                if len( id ) > 0:
[84]246
[89]247                        id = id[0][0]
248
[84]249                        return id
250                else:
251                        return None
252
253        def getNodeIds( self, hostnames ):
254
255                ids = [ ]
256
257                for node in hostnames:
258
259                        id = self.getNodeId( node )
260
261                        if id:
262                                ids.append( id )
263
264                return ids
265
266        def getJobId( self, jobid ):
267
268                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
269
270                if id:
[89]271                        id = id[0][0]
[84]272
273                        return id
274                else:
275                        return None
276
277        def addJob( self, job_id, jobattrs ):
278
279                if not self.getJobId( job_id ):
280
281                        self.mutateJob( 'insert', job_id, jobattrs ) 
282                else:
283                        self.mutateJob( 'update', job_id, jobattrs )
284
285        def mutateJob( self, action, job_id, jobattrs ):
286
287                job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
288
289                insert_col_str = 'job_id'
290                insert_val_str = "'%s'" %job_id
291                update_str = None
292
293                debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))
294
[99]295                ids = [ ]
296
[84]297                for valname, value in jobattrs.items():
298
[96]299                        if valname in job_values and value != '':
[84]300
301                                column_name = 'job_' + valname
302
303                                if action == 'insert':
304
305                                        if not insert_col_str:
306                                                insert_col_str = column_name
307                                        else:
308                                                insert_col_str = insert_col_str + ',' + column_name
309
310                                        if not insert_val_str:
311                                                insert_val_str = value
312                                        else:
313                                                insert_val_str = insert_val_str + ",'%s'" %value
314
315                                elif action == 'update':
316                                       
317                                        if not update_str:
318                                                update_str = "%s='%s'" %(column_name, value)
319                                        else:
320                                                update_str = update_str + ",%s='%s'" %(column_name, value)
321
[90]322                        elif valname == 'nodes' and value:
[84]323
[191]324                                node_valid = 1
[190]325
326                                if len(value) == 1:
327                               
[191]328                                        if jobattrs['status'] == 'Q':
[190]329
[191]330                                                node_valid = 0
[190]331
[191]332                                        else:
[190]333
[191]334                                                node_valid = 0
[190]335
[191]336                                                for node_char in str(value[0]):
[190]337
[191]338                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]339
[191]340                                                                node_valid = 1
[84]341
[191]342                                if node_valid:
343
344                                        ids = self.addNodes( value, jobattrs['domain'] )
345
[84]346                if action == 'insert':
347
348                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]349
[84]350                elif action == 'update':
351
[89]352                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
[84]353
[191]354                if len( ids ) > 0:
355                        self.addJobNodes( job_id, ids )
[190]356
[154]357        def addNodes( self, hostnames, domain ):
[84]358
[98]359                ids = [ ]
360
[84]361                for node in hostnames:
362
[154]363                        node = '%s.%s' %( node, domain )
[84]364                        id = self.getNodeId( node )
365       
366                        if not id:
367                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
[98]368                                id = self.getNodeId( node )
[84]369
[98]370                        ids.append( id )
371
372                return ids
373
[86]374        def addJobNodes( self, jobid, nodes ):
375
376                for node in nodes:
377
[191]378                        if not self.getJobNodeId( jobid, node ):
379
380                                self.addJobNode( jobid, node )
381
[84]382        def addJobNode( self, jobid, nodeid ):
383
384                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
385
386        def storeJobInfo( self, jobid, jobattrs ):
387
388                self.addJob( jobid, jobattrs )
389
[37]390class RRDMutator:
[63]391        """A class for performing RRD mutations"""
[37]392
393        binary = '/usr/bin/rrdtool'
394
395        def __init__( self, binary=None ):
[63]396                """Set alternate binary if supplied"""
[37]397
398                if binary:
399                        self.binary = binary
400
401        def create( self, filename, args ):
[63]402                """Create a new rrd with args"""
403
[40]404                return self.perform( 'create', '"' + filename + '"', args )
[37]405
406        def update( self, filename, args ):
[63]407                """Update a rrd with args"""
408
[40]409                return self.perform( 'update', '"' + filename + '"', args )
[37]410
[42]411        def grabLastUpdate( self, filename ):
[63]412                """Determine the last update time of filename rrd"""
[42]413
414                last_update = 0
415
[53]416                debug_msg( 8, self.binary + ' info "' + filename + '"' )
417
[42]418                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
419
420                        if line.find( 'last_update') != -1:
421
422                                last_update = line.split( ' = ' )[1]
423
424                if last_update:
425                        return last_update
426                else:
427                        return 0
428
[40]429        def perform( self, action, filename, args ):
[63]430                """Perform action on rrd filename with args"""
[37]431
432                arg_string = None
433
[40]434                if type( args ) is not ListType:
435                        debug_msg( 8, 'Arguments needs to be of type List' )
436                        return 1
437
[37]438                for arg in args:
439
440                        if not arg_string:
441
442                                arg_string = arg
443                        else:
444                                arg_string = arg_string + ' ' + arg
445
[54]446                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]447
[146]448                cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
449                lines = cmd.readlines()
450                cmd.close()
[37]451
[146]452                for line in lines:
453
[37]454                        if line.find( 'ERROR' ) != -1:
455
456                                error_msg = string.join( line.split( ' ' )[1:] )
[40]457                                debug_msg( 8, error_msg )
[37]458                                return 1
459
460                return 0
461
[78]462class XMLProcessor:
463        """Skeleton class for XML processor's"""
464
465        def run( self ):
466                """Do main processing of XML here"""
467
468                pass
469
470class TorqueXMLProcessor( XMLProcessor ):
471        """Main class for processing XML and acting with it"""
472
473        def __init__( self ):
474                """Setup initial XML connection and handlers"""
475
476                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 
477                self.myXMLSource = self.myXMLGatherer.getFileObject()
478                self.myXMLHandler = TorqueXMLHandler()
479                self.myXMLError = XMLErrorHandler()
[87]480                self.config = GangliaConfigParser( GMETAD_CONF )
[78]481
482        def run( self ):
483                """Main XML processing"""
484
[169]485                debug_msg( 1, 'torque_xml_thread(): started.' )
[87]486
[78]487                while( 1 ):
488
489                        self.myXMLSource = self.myXMLGatherer.getFileObject()
[169]490                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
[176]491
492                        try:
493                                xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
494                        except socket.error, msg:
495                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
496                               
[169]497                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
498                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[87]499                        time.sleep( self.config.getLowestInterval() )
[78]500
[71]501class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
[63]502        """Parse Torque's jobinfo XML from our plugin"""
503
[72]504        jobAttrs = { }
505
[84]506        def __init__( self ):
507
[214]508                self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[98]509                self.jobs_processed = [ ]
510                self.jobs_to_store = [ ]
[84]511
[183]512        def startDocument( self ):
513
514                self.heartbeat = 0
515
[63]516        def startElement( self, name, attrs ):
517                """
518                This XML will be all gmetric XML
519                so there will be no specific start/end element
520                just one XML statement with all info
521                """
522               
[73]523                jobinfo = { }
[63]524
[199]525                if name == 'CLUSTER':
[63]526
[199]527                        self.clustername = attrs.get( 'NAME', "" )
528
529                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
530
[70]531                        metricname = attrs.get( 'NAME', "" )
[63]532
533                        if metricname == 'TOGA-HEARTBEAT':
[70]534                                self.heartbeat = attrs.get( 'VAL', "" )
[63]535
[70]536                        elif metricname.find( 'TOGA-JOB' ) != -1:
[63]537
[72]538                                job_id = metricname.split( 'TOGA-JOB-' )[1]
[63]539                                val = attrs.get( 'VAL', "" )
540
[96]541                                if not job_id in self.jobs_processed:
542                                        self.jobs_processed.append( job_id )
543
[73]544                                check_change = 0
545
546                                if self.jobAttrs.has_key( job_id ):
547                                        check_change = 1
548
[63]549                                valinfo = val.split( ' ' )
550
551                                for myval in valinfo:
552
[84]553                                        if len( myval.split( '=' ) ) > 1:
[63]554
[84]555                                                valname = myval.split( '=' )[0]
556                                                value = myval.split( '=' )[1]
[70]557
[84]558                                                if valname == 'nodes':
559                                                        value = value.split( ';' )
[72]560
[84]561                                                jobinfo[ valname ] = value
562
[73]563                                if check_change:
[182]564                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
[100]565                                                self.jobAttrs[ job_id ]['stop_timestamp'] = ''
[82]566                                                self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
[84]567                                                if not job_id in self.jobs_to_store:
568                                                        self.jobs_to_store.append( job_id )
569
[102]570                                                debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
[73]571                                else:
572                                        self.jobAttrs[ job_id ] = jobinfo
[84]573
574                                        if not job_id in self.jobs_to_store:
575                                                self.jobs_to_store.append( job_id )
576
[102]577                                        debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
[73]578                                       
[77]579        def endDocument( self ):
[74]580                """When all metrics have gone, check if any jobs have finished"""
[72]581
[182]582                if self.heartbeat:
583                        for jobid, jobinfo in self.jobAttrs.items():
[74]584
[182]585                                # This is an old job, not in current jobinfo list anymore
586                                # it must have finished, since we _did_ get a new heartbeat
587                                #
588                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
[102]589
[184]590                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
[74]591
[182]592                                        if not jobid in self.jobs_processed:
593                                                self.jobs_processed.append( jobid )
[96]594
[182]595                                        self.jobAttrs[ jobid ]['status'] = 'F'
596                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( mytime )
[96]597
[182]598                                        if not jobid in self.jobs_to_store:
599                                                self.jobs_to_store.append( jobid )
[74]600
[182]601                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
[87]602
[182]603                        for jobid in self.jobs_to_store:
604                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'Q', 'F' ]:
[84]605
[184]606                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
607
608                                        if self.jobAttrs[ jobid ]['status'] == 'F':
609                                                del self.jobAttrs[ jobid ]
610
[182]611                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
[87]612
[182]613                        self.jobs_processed = [ ]
614                        self.jobs_to_store = [ ]
[84]615
[82]616        def setJobAttrs( self, old, new ):
617                """
618                Set new job attributes in old, but not lose existing fields
619                if old attributes doesn't have those
620                """
621
622                for valname, value in new.items():
623                        old[ valname ] = value
624
625                return old
626               
627
[73]628        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
[74]629                """
630                Check if jobinfo has changed from jobattrs[jobid]
631                if it's report time is bigger than previous one
632                and it is report time is recent (equal to heartbeat)
633                """
[72]634
[87]635                ignore_changes = [ 'reported' ]
636
[73]637                if jobattrs.has_key( jobid ):
638
639                        for valname, value in jobinfo.items():
640
[87]641                                if valname not in ignore_changes:
[73]642
[87]643                                        if jobattrs[ jobid ].has_key( valname ):
[73]644
[87]645                                                if value != jobattrs[ jobid ][ valname ]:
[73]646
[87]647                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
648                                                                return 1
[73]649
[87]650                                        else:
651                                                return 1
652
[73]653                return 0
654
[71]655class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[63]656        """Parse Ganglia's XML"""
[3]657
[33]658        def __init__( self, config ):
[63]659                """Setup initial variables and gather info on existing rrd archive"""
660
[33]661                self.config = config
[35]662                self.clusters = { }
[169]663                debug_msg( 1, 'Checking existing toga rrd archive..' )
[44]664                self.gatherClusters()
[169]665                debug_msg( 1, 'Check done.' )
[33]666
[44]667        def gatherClusters( self ):
[63]668                """Find all existing clusters in archive dir"""
[44]669
[45]670                archive_dir = check_dir(ARCHIVE_PATH)
[44]671
672                hosts = [ ]
673
674                if os.path.exists( archive_dir ):
675
676                        dirlist = os.listdir( archive_dir )
677
678                        for item in dirlist:
679
680                                clustername = item
681
[60]682                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]683
684                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
685
[6]686        def startElement( self, name, attrs ):
[63]687                """Memorize appropriate data from xml start tags"""
[3]688
[7]689                if name == 'GANGLIA_XML':
[32]690
691                        self.XMLSource = attrs.get( 'SOURCE', "" )
692                        self.gangliaVersion = attrs.get( 'VERSION', "" )
693
[12]694                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]695
[7]696                elif name == 'GRID':
[32]697
698                        self.gridName = attrs.get( 'NAME', "" )
699                        self.time = attrs.get( 'LOCALTIME', "" )
700
[12]701                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]702
[7]703                elif name == 'CLUSTER':
[32]704
705                        self.clusterName = attrs.get( 'NAME', "" )
706                        self.time = attrs.get( 'LOCALTIME', "" )
707
[60]708                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]709
[34]710                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]711
[35]712                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]713
[60]714                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]715
716                        self.hostName = attrs.get( 'NAME', "" )
717                        self.hostIp = attrs.get( 'IP', "" )
718                        self.hostReported = attrs.get( 'REPORTED', "" )
719
[12]720                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]721
[60]722                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]723
[32]724                        type = attrs.get( 'TYPE', "" )
[198]725                       
726                        exclude_metric = False
727                       
728                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]729
[198]730                                orig_name = attrs.get( 'NAME', "" )     
[3]731
[198]732                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
733                               
734                                        exclude_metric = True
735
736                                elif re.match( ex_metricstr, orig_name ):
737
738                                        exclude_metric = True
739
740                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
741
[32]742                                myMetric = { }
743                                myMetric['name'] = attrs.get( 'NAME', "" )
744                                myMetric['val'] = attrs.get( 'VAL', "" )
745                                myMetric['time'] = self.hostReported
[3]746
[34]747                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]748
[34]749                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]750
[34]751        def storeMetrics( self ):
[63]752                """Store metrics of each cluster rrd handler"""
[9]753
[34]754                for clustername, rrdh in self.clusters.items():
[16]755
[38]756                        ret = rrdh.storeMetrics()
[9]757
[38]758                        if ret:
759                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
760                                return 1
761
762                return 0
763
[71]764class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
765
766        def error( self, exception ):
767                """Recoverable error"""
768
[169]769                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]770
771        def fatalError( self, exception ):
772                """Non-recoverable error"""
773
774                exception_str = str( exception )
775
776                # Ignore 'no element found' errors
777                if exception_str.find( 'no element found' ) != -1:
[169]778                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
[71]779                        return 0
780
[170]781                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]782                sys.exit( 1 )
783
784        def warning( self, exception ):
785                """Warning"""
786
787                debug_msg( 0, 'Warning ' + str( exception ) )
788
[78]789class XMLGatherer:
[63]790        """Setup a connection and file object to Ganglia's XML"""
[3]791
[8]792        s = None
[70]793        fd = None
[8]794
795        def __init__( self, host, port ):
[63]796                """Store host and port for connection"""
[8]797
[5]798                self.host = host
799                self.port = port
[33]800                self.connect()
[70]801                self.makeFileDescriptor()
[3]802
[33]803        def connect( self ):
[63]804                """Setup connection to XML source"""
[8]805
806                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]807
[5]808                        af, socktype, proto, canonname, sa = res
[32]809
[5]810                        try:
[32]811
[8]812                                self.s = socket.socket( af, socktype, proto )
[32]813
[5]814                        except socket.error, msg:
[32]815
[8]816                                self.s = None
[5]817                                continue
[32]818
[5]819                        try:
[32]820
[8]821                                self.s.connect( sa )
[32]822
[5]823                        except socket.error, msg:
[32]824
[70]825                                self.disconnect()
[5]826                                continue
[32]827
[5]828                        break
[3]829
[8]830                if self.s is None:
[32]831
[170]832                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
[33]833                        sys.exit( 1 )
[5]834
[33]835        def disconnect( self ):
[63]836                """Close socket"""
[33]837
838                if self.s:
[71]839                        self.s.shutdown( 2 )
[33]840                        self.s.close()
841                        self.s = None
842
843        def __del__( self ):
[63]844                """Kill the socket before we leave"""
[33]845
846                self.disconnect()
847
[70]848        def reconnect( self ):
849                """Reconnect"""
[33]850
[38]851                if self.s:
852                        self.disconnect()
[33]853
[70]854                self.connect()
[5]855
[70]856        def makeFileDescriptor( self ):
857                """Make file descriptor that points to our socket connection"""
858
859                self.reconnect()
860
861                if self.s:
862                        self.fd = self.s.makefile( 'r' )
863
864        def getFileObject( self ):
865                """Connect, and return a file object"""
866
[78]867                self.makeFileDescriptor()
868
[70]869                if self.fd:
870                        return self.fd
871
[78]872class GangliaXMLProcessor( XMLProcessor ):
[63]873        """Main class for processing XML and acting with it"""
[5]874
[33]875        def __init__( self ):
[63]876                """Setup initial XML connection and handlers"""
[33]877
878                self.config = GangliaConfigParser( GMETAD_CONF )
879
[78]880                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 
[70]881                self.myXMLSource = self.myXMLGatherer.getFileObject()
[78]882                self.myXMLHandler = GangliaXMLHandler( self.config )
883                self.myXMLError = XMLErrorHandler()
[73]884
[9]885        def run( self ):
[63]886                """Main XML processing; start a xml and storethread"""
[8]887
[102]888                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
889                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]890
[36]891                while( 1 ):
892
[102]893                        if not xml_thread.isAlive():
[36]894                                # Gather XML at the same interval as gmetad
895
896                                # threaded call to: self.processXML()
897                                #
[169]898                                try:
899                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
900                                        xml_thread.start()
[176]901                                except thread.error, msg:
[169]902                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
903                                        #return 1
[36]904
[102]905                        if not store_thread.isAlive():
[55]906                                # Store metrics every .. sec
[36]907
[55]908                                # threaded call to: self.storeMetrics()
909                                #
[169]910                                try:
911                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
912                                        store_thread.start()
[176]913                                except thread.error, msg:
[169]914                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
915                                        #return 1
[36]916               
917                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
918                        time.sleep( 1 ) 
919
[33]920        def storeMetrics( self ):
[63]921                """Store metrics retained in memory to disk"""
[22]922
[63]923                # Store metrics somewhere between every 360 and 640 seconds
[38]924                #
[55]925                STORE_INTERVAL = random.randint( 360, 640 )
[22]926
[169]927                try:
928                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
929                        store_metric_thread.start()
[176]930                except thread.error, msg:
[169]931                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
932                        return 1
[36]933
[169]934                debug_msg( 1, 'ganglia_store_thread(): started.' )
935
936                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]937                time.sleep( STORE_INTERVAL )
[169]938                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]939
[102]940                if store_metric_thread.isAlive():
[36]941
[169]942                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[136]943                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[169]944                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
[36]945
[169]946                debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]947
948                return 0
949
[39]950        def storeThread( self ):
[63]951                """Actual metric storing thread"""
[39]952
[169]953                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
954                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[78]955                ret = self.myXMLHandler.storeMetrics()
[176]956                if ret > 0:
957                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[169]958                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
959                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
[39]960               
[176]961                return 0
[39]962
[8]963        def processXML( self ):
[63]964                """Process XML"""
[8]965
[169]966                try:
967                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
968                        parsethread.start()
[176]969                except thread.error, msg:
[169]970                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
971                        return 1
[8]972
[169]973                debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]974
[169]975                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]976                time.sleep( float( self.config.getLowestInterval() ) ) 
[169]977                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]978
979                if parsethread.isAlive():
980
[169]981                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[47]982                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[169]983                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
[36]984
[169]985                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]986
987                return 0
988
[39]989        def parseThread( self ):
[63]990                """Actual parsing thread"""
[39]991
[169]992                debug_msg( 1, 'ganglia_parse_thread(): started.' )
993                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
[78]994                self.myXMLSource = self.myXMLGatherer.getFileObject()
[176]995
996                try:
997                        xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
998                except socket.error, msg:
999                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1000
[169]1001                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1002                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]1003
[176]1004                return 0
[39]1005
[9]1006class GangliaConfigParser:
1007
[34]1008        sources = [ ]
[9]1009
1010        def __init__( self, config ):
[63]1011                """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1012
[9]1013                self.config = config
1014                self.parseValues()
1015
[32]1016        def parseValues( self ):
[63]1017                """Parse certain values from gmetad.conf"""
[9]1018
1019                readcfg = open( self.config, 'r' )
1020
1021                for line in readcfg.readlines():
1022
1023                        if line.count( '"' ) > 1:
1024
[10]1025                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1026
[11]1027                                        source = { }
1028                                        source['name'] = line.split( '"' )[1]
[9]1029                                        source_words = line.split( '"' )[2].split( ' ' )
1030
1031                                        for word in source_words:
1032
1033                                                valid_interval = 1
1034
1035                                                for letter in word:
[32]1036
[9]1037                                                        if letter not in string.digits:
[32]1038
[9]1039                                                                valid_interval = 0
1040
[10]1041                                                if valid_interval and len(word) > 0:
[32]1042
[9]1043                                                        source['interval'] = word
[12]1044                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]1045       
1046                                        # No interval found, use Ganglia's default     
1047                                        if not source.has_key( 'interval' ):
1048                                                source['interval'] = 15
1049                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1050
[33]1051                                        self.sources.append( source )
[9]1052
1053        def getInterval( self, source_name ):
[63]1054                """Return interval for source_name"""
[32]1055
[9]1056                for source in self.sources:
[32]1057
[12]1058                        if source['name'] == source_name:
[32]1059
[9]1060                                return source['interval']
[32]1061
[9]1062                return None
1063
[34]1064        def getLowestInterval( self ):
[63]1065                """Return the lowest interval of all clusters"""
[34]1066
1067                lowest_interval = 0
1068
1069                for source in self.sources:
1070
1071                        if not lowest_interval or source['interval'] <= lowest_interval:
1072
1073                                lowest_interval = source['interval']
1074
1075                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1076                if lowest_interval:
1077                        return lowest_interval
1078                else:
1079                        return 15
1080
[9]1081class RRDHandler:
[63]1082        """Class for handling RRD activity"""
[9]1083
[32]1084        myMetrics = { }
[40]1085        lastStored = { }
[47]1086        timeserials = { }
[36]1087        slot = None
[32]1088
[33]1089        def __init__( self, config, cluster ):
[63]1090                """Setup initial variables"""
[78]1091
[44]1092                self.block = 0
[9]1093                self.cluster = cluster
[33]1094                self.config = config
[40]1095                self.slot = threading.Lock()
[37]1096                self.rrdm = RRDMutator()
[42]1097                self.gatherLastUpdates()
[9]1098
[42]1099        def gatherLastUpdates( self ):
[63]1100                """Populate the lastStored list, containing timestamps of all last updates"""
[42]1101
1102                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1103
1104                hosts = [ ]
1105
1106                if os.path.exists( cluster_dir ):
1107
[44]1108                        dirlist = os.listdir( cluster_dir )
[42]1109
[44]1110                        for dir in dirlist:
[42]1111
[44]1112                                hosts.append( dir )
[42]1113
1114                for host in hosts:
1115
[47]1116                        host_dir = cluster_dir + '/' + host
1117                        dirlist = os.listdir( host_dir )
1118
1119                        for dir in dirlist:
1120
1121                                if not self.timeserials.has_key( host ):
1122
1123                                        self.timeserials[ host ] = [ ]
1124
1125                                self.timeserials[ host ].append( dir )
1126
[42]1127                        last_serial = self.getLastRrdTimeSerial( host )
1128                        if last_serial:
1129
1130                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1131                                if os.path.exists( metric_dir ):
1132
[44]1133                                        dirlist = os.listdir( metric_dir )
[42]1134
[44]1135                                        for file in dirlist:
[42]1136
[44]1137                                                metricname = file.split( '.rrd' )[0]
[42]1138
[44]1139                                                if not self.lastStored.has_key( host ):
[42]1140
[44]1141                                                        self.lastStored[ host ] = { }
[42]1142
[44]1143                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1144
[32]1145        def getClusterName( self ):
[63]1146                """Return clustername"""
1147
[32]1148                return self.cluster
1149
1150        def memMetric( self, host, metric ):
[63]1151                """Store metric from host in memory"""
[32]1152
[179]1153                # <ATOMIC>
1154                #
1155                self.slot.acquire()
1156               
[34]1157                if self.myMetrics.has_key( host ):
[32]1158
[34]1159                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1160
[34]1161                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1162
[34]1163                                        if mymetric['time'] == metric['time']:
[32]1164
[34]1165                                                # Allready have this metric, abort
[179]1166                                                self.slot.release()
[34]1167                                                return 1
1168                        else:
1169                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1170                else:
[32]1171                        self.myMetrics[ host ] = { }
[34]1172                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]1173
[63]1174                # Push new metric onto stack
1175                # atomic code; only 1 thread at a time may access the stack
1176
[32]1177                self.myMetrics[ host ][ metric['name'] ].append( metric )
1178
[40]1179                self.slot.release()
[53]1180                #
1181                # </ATOMIC>
[40]1182
[47]1183        def makeUpdateList( self, host, metriclist ):
[63]1184                """
1185                Make a list of update values for rrdupdate
1186                but only those that we didn't store before
1187                """
[37]1188
1189                update_list = [ ]
[41]1190                metric = None
[37]1191
[47]1192                while len( metriclist ) > 0:
[37]1193
[53]1194                        metric = metriclist.pop( 0 )
[37]1195
[53]1196                        if self.checkStoreMetric( host, metric ):
1197                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[40]1198
[37]1199                return update_list
1200
[49]1201        def checkStoreMetric( self, host, metric ):
[63]1202                """Check if supplied metric if newer than last one stored"""
[40]1203
1204                if self.lastStored.has_key( host ):
1205
[47]1206                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]1207
[47]1208                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1209
[50]1210                                        # This is old
[40]1211                                        return 0
1212
[50]1213                return 1
1214
[54]1215        def memLastUpdate( self, host, metricname, metriclist ):
[63]1216                """
1217                Memorize the time of the latest metric from metriclist
1218                but only if it wasn't allready memorized
1219                """
[50]1220
[54]1221                if not self.lastStored.has_key( host ):
1222                        self.lastStored[ host ] = { }
1223
[50]1224                last_update_time = 0
1225
1226                for metric in metriclist:
1227
[54]1228                        if metric['name'] == metricname:
[50]1229
[54]1230                                if metric['time'] > last_update_time:
[50]1231
[54]1232                                        last_update_time = metric['time']
[40]1233
[54]1234                if self.lastStored[ host ].has_key( metricname ):
[52]1235                       
[54]1236                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]1237                                return 1
[40]1238
[54]1239                self.lastStored[ host ][ metricname ] = last_update_time
[52]1240
[33]1241        def storeMetrics( self ):
[63]1242                """
1243                Store all metrics from memory to disk
1244                and do it to the RRD's in appropriate timeperiod directory
1245                """
[33]1246
1247                for hostname, mymetrics in self.myMetrics.items():     
1248
1249                        for metricname, mymetric in mymetrics.items():
1250
[53]1251                                metrics_to_store = [ ]
1252
[63]1253                                # Pop metrics from stack for storing until none is left
1254                                # atomic code: only 1 thread at a time may access myMetrics
1255
[53]1256                                # <ATOMIC>
[50]1257                                #
[47]1258                                self.slot.acquire() 
[33]1259
[54]1260                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1261
[54]1262                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1263
[176]1264                                                try:
1265                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1266                                                except IndexError, msg:
1267
[179]1268                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1269                                                        # is still len 0 when the statement is executed.
1270                                                        # Just ignore indexerror's..
[176]1271                                                        pass
1272
[53]1273                                self.slot.release()
1274                                #
1275                                # </ATOMIC>
1276
[47]1277                                # Create a mapping table, each metric to the period where it should be stored
1278                                #
[53]1279                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1280
[50]1281                                update_rets = [ ]
1282
[47]1283                                for period, pmetric in metric_serial_table.items():
1284
[146]1285                                        create_ret = self.createCheck( hostname, metricname, period )   
[47]1286
1287                                        update_ret = self.update( hostname, metricname, period, pmetric )
1288
1289                                        if update_ret == 0:
1290
1291                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1292                                        else:
1293                                                debug_msg( 9, 'metric update failed' )
1294
[146]1295                                        update_rets.append( create_ret )
[50]1296                                        update_rets.append( update_ret )
[47]1297
[179]1298                                # Lets ignore errors here for now, we need to make sure last update time
1299                                # is correct!
1300                                #
1301                                #if not (1) in update_rets:
[50]1302
[179]1303                                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1304
[17]1305        def makeTimeSerial( self ):
[63]1306                """Generate a time serial. Seconds since epoch"""
[17]1307
1308                # Seconds since epoch
1309                mytime = int( time.time() )
1310
1311                return mytime
1312
[50]1313        def makeRrdPath( self, host, metricname, timeserial ):
[63]1314                """Make a RRD location/path and filename"""
[17]1315
[50]1316                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1317                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[17]1318
1319                return rrd_dir, rrd_file
1320
[20]1321        def getLastRrdTimeSerial( self, host ):
[63]1322                """Find the last timeserial (directory) for this host"""
[17]1323
[19]1324                newest_timeserial = 0
1325
[47]1326                for dir in self.timeserials[ host ]:
[32]1327
[47]1328                        valid_dir = 1
[17]1329
[47]1330                        for letter in dir:
1331                                if letter not in string.digits:
1332                                        valid_dir = 0
[17]1333
[47]1334                        if valid_dir:
1335                                timeserial = dir
1336                                if timeserial > newest_timeserial:
1337                                        newest_timeserial = timeserial
[17]1338
1339                if newest_timeserial:
[18]1340                        return newest_timeserial
[17]1341                else:
1342                        return 0
1343
[47]1344        def determinePeriod( self, host, check_serial ):
[63]1345                """Determine to which period (directory) this time(serial) belongs"""
[47]1346
1347                period_serial = 0
1348
[56]1349                if self.timeserials.has_key( host ):
[47]1350
[56]1351                        for serial in self.timeserials[ host ]:
[47]1352
[56]1353                                if check_serial >= serial and period_serial < serial:
[47]1354
[56]1355                                        period_serial = serial
1356
[47]1357                return period_serial
1358
1359        def determineSerials( self, host, metricname, metriclist ):
1360                """
1361                Determine the correct serial and corresponding rrd to store
1362                for a list of metrics
1363                """
1364
1365                metric_serial_table = { }
1366
1367                for metric in metriclist:
1368
1369                        if metric['name'] == metricname:
1370
1371                                period = self.determinePeriod( host, metric['time'] )   
1372
1373                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1374
[49]1375                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1376
1377                                        # This one should get it's own new period
1378                                        period = metric['time']
[57]1379
1380                                        if not self.timeserials.has_key( host ):
1381                                                self.timeserials[ host ] = [ ]
1382
[50]1383                                        self.timeserials[ host ].append( period )
[47]1384
1385                                if not metric_serial_table.has_key( period ):
1386
[49]1387                                        metric_serial_table[ period ] = [ ]
[47]1388
1389                                metric_serial_table[ period ].append( metric )
1390
1391                return metric_serial_table
1392
[33]1393        def createCheck( self, host, metricname, timeserial ):
[63]1394                """Check if an rrd allready exists for this metric, create if not"""
[9]1395
[35]1396                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]1397               
[33]1398                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]1399
[9]1400                if not os.path.exists( rrd_dir ):
[58]1401
1402                        try:
1403                                os.makedirs( rrd_dir )
1404
[169]1405                        except os.OSError, msg:
[58]1406
1407                                if msg.find( 'File exists' ) != -1:
1408
1409                                        # Ignore exists errors
1410                                        pass
1411
1412                                else:
1413
1414                                        print msg
1415                                        return
1416
[14]1417                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]1418
[14]1419                if not os.path.exists( rrd_file ):
[9]1420
[33]1421                        interval = self.config.getInterval( self.cluster )
[47]1422                        heartbeat = 8 * int( interval )
[9]1423
[37]1424                        params = [ ]
[12]1425
[37]1426                        params.append( '--step' )
1427                        params.append( str( interval ) )
[12]1428
[37]1429                        params.append( '--start' )
[47]1430                        params.append( str( int( timeserial ) - 1 ) )
[12]1431
[37]1432                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1433                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]1434
[37]1435                        self.rrdm.create( str(rrd_file), params )
1436
[14]1437                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1438
[47]1439        def update( self, host, metricname, timeserial, metriclist ):
[63]1440                """
1441                Update rrd file for host with metricname
1442                in directory timeserial with metriclist
1443                """
[9]1444
[35]1445                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]1446
[33]1447                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]1448
[47]1449                update_list = self.makeUpdateList( host, metriclist )
[15]1450
[41]1451                if len( update_list ) > 0:
1452                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]1453
[41]1454                        if ret:
1455                                return 1
[27]1456               
[41]1457                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]1458
[36]1459                return 0
1460
[169]1461def daemon():
1462        """daemonized threading"""
[8]1463
[169]1464        # Fork the first child
1465        #
1466        pid = os.fork()
1467
1468        if pid > 0:
1469
1470                sys.exit(0)  # end parent
1471
1472        # creates a session and sets the process group ID
1473        #
1474        os.setsid()
1475
1476        # Fork the second child
1477        #
1478        pid = os.fork()
1479
1480        if pid > 0:
1481
1482                sys.exit(0)  # end parent
1483
1484        # Go to the root directory and set the umask
1485        #
1486        os.chdir('/')
1487        os.umask(0)
1488
1489        sys.stdin.close()
1490        sys.stdout.close()
1491        sys.stderr.close()
1492
1493        os.open('/dev/null', 0)
1494        os.dup(0)
1495        os.dup(0)
1496
1497        run()
1498
1499def run():
1500        """Threading start"""
1501
[102]1502        myTorqueProcessor = TorqueXMLProcessor()
1503        myGangliaProcessor = GangliaXMLProcessor()
[8]1504
[169]1505        try:
[102]1506                torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1507                ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]1508
[169]1509                torque_xml_thread.start()
1510                ganglia_xml_thread.start()
1511               
[176]1512        except thread.error, msg:
[169]1513                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1514                syslog.closelog()
1515                sys.exit(1)
1516               
1517        debug_msg( 0, 'main threading started.' )
[78]1518
[169]1519def main():
1520        """Program startup"""
1521
[214]1522        if not processArgs( sys.argv[1:] ):
1523                sys.exit( 1 )
1524
[169]1525        if( DAEMONIZE and USE_SYSLOG ):
1526                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1527
1528        if DAEMONIZE:
1529                daemon()
1530        else:
1531                run()
1532
1533#
[81]1534# Global functions
[169]1535#
[81]1536
[9]1537def check_dir( directory ):
[63]1538        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]1539
1540        if directory[-1] == '/':
1541                directory = directory[:-1]
1542
1543        return directory
1544
[12]1545def debug_msg( level, msg ):
[169]1546        """Only print msg if correct levels"""
[12]1547
[169]1548        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1549                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1550       
1551        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1552                syslog.syslog( msg )
[12]1553
[46]1554def printTime( ):
[63]1555        """Print current time in human readable format"""
[46]1556
1557        return time.strftime("%a %d %b %Y %H:%M:%S")
1558
[63]1559# Ooohh, someone started me! Let's go..
[9]1560if __name__ == '__main__':
1561        main()
Note: See TracBrowser for help on using the repository browser.