source: trunk/jobarchived/jobarchived.py @ 375

Last change on this file since 375 was 375, checked in by bastiaans, 17 years ago

jobarchived/jobarchived.py:

  • backwards compatible for rrdtool pipes, use rrdtool module automaticly if available
  • Property svn:keywords set to Id
File size: 42.8 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 375 2007-06-13 13:54:25Z bastiaans $
22#
[3]23
[284]24DEFAULT_SEARCH_PATH     = '/usr/share/jobarchived'
[3]25
[284]26import sys
27
28if DEFAULT_SEARCH_PATH not in sys.path:
29
30        sys.path.append( DEFAULT_SEARCH_PATH )
31
32import getopt, syslog, ConfigParser
33
[214]34def processArgs( args ):
[6]35
[292]36        SHORT_L = 'c:'
37        LONG_L  = 'config='
[169]38
[214]39        config_filename = None
[169]40
[214]41        try:
[169]42
[214]43                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]44
[214]45        except getopt.error, detail:
[60]46
[214]47                print detail
48                sys.exit(1)
[9]49
[214]50        for opt, value in opts:
[60]51
[214]52                if opt in [ '--config', '-c' ]:
[13]53
[214]54                        config_filename = value
[198]55
[214]56        if not config_filename:
[60]57
[214]58                config_filename = '/etc/jobarchived.conf'
[22]59
[214]60        try:
61                return loadConfig( config_filename )
[13]62
[214]63        except ConfigParser.NoOptionError, detail:
64
65                print detail
66                sys.exit( 1 )
67
68def loadConfig( filename ):
69
70        def getlist( cfg_string ):
71
72                my_list = [ ]
73
74                for item_txt in cfg_string.split( ',' ):
75
76                        sep_char = None
77
78                        item_txt = item_txt.strip()
79
80                        for s_char in [ "'", '"' ]:
81
82                                if item_txt.find( s_char ) != -1:
83
84                                        if item_txt.count( s_char ) != 2:
85
86                                                print 'Missing quote: %s' %item_txt
87                                                sys.exit( 1 )
88
89                                        else:
90
91                                                sep_char = s_char
92                                                break
93
94                        if sep_char:
95
96                                item_txt = item_txt.split( sep_char )[1]
97
98                        my_list.append( item_txt )
99
100                return my_list
101
102        cfg = ConfigParser.ConfigParser()
103
104        cfg.read( filename )
105
[375]106        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
[214]107
[292]108        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
[214]109
[292]110        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
[214]111
[292]112        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[214]113
[292]114        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[214]115
[292]116        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[214]117
[375]118        MODRRDTOOL              = False
119
[214]120        try:
[375]121                import rrdtool
[214]122
[375]123                MODRRDTOOL              = True
124
125        except ImportError:
126
127                MODRRDTOOL              = False
128
129                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobmond significantly!" )
130
131        try:
132
[292]133                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[214]134
135        except AttributeError, detail:
136
137                print 'Unknown syslog facility'
138                sys.exit( 1 )
139
[292]140        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
[214]141
[292]142        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
[214]143
[292]144        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
[214]145
[292]146        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
[214]147
[292]148        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
[214]149
[295]150        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
151
[292]152        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[214]153
[292]154        RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
[224]155
[214]156        return True
157
[17]158# What XML data types not to store
[13]159#
[17]160UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]161
[47]162# Maximum time (in seconds) a parsethread may run
163#
164PARSE_TIMEOUT = 60
165
166# Maximum time (in seconds) a storethread may run
167#
168STORE_TIMEOUT = 360
169
[8]170"""
[224]171The Job Archiving Daemon
[8]172"""
173
[214]174from types import *
175
176import DBClass
177import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
[365]178import rrdtool
[214]179
[84]180class DataSQLStore:
181
182        db_vars = None
183        dbc = None
184
185        def __init__( self, hostname, database ):
186
187                self.db_vars = DBClass.InitVars(DataBaseName=database,
188                                User='root',
189                                Host=hostname,
190                                Password='',
191                                Dictionary='true')
192
193                try:
194                        self.dbc     = DBClass.DB(self.db_vars)
195                except DBClass.DBError, details:
[169]196                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
[84]197                        sys.exit(1)
198
199        def setDatabase(self, statement):
200                ret = self.doDatabase('set', statement)
201                return ret
202               
203        def getDatabase(self, statement):
204                ret = self.doDatabase('get', statement)
205                return ret
206
207        def doDatabase(self, type, statement):
208
[365]209                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
[84]210                try:
211                        if type == 'set':
212                                result = self.dbc.Set( statement )
213                                self.dbc.Commit()
214                        elif type == 'get':
215                                result = self.dbc.Get( statement )
216                               
217                except DBClass.DBError, detail:
218                        operation = statement.split(' ')[0]
[169]219                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
[84]220                        sys.exit(1)
221
[365]222                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
[84]223                return result
224
[191]225        def getJobNodeId( self, job_id, node_id ):
226
227                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
228                if len( id ) > 0:
229
230                        if len( id[0] ) > 0 and id[0] != '':
231                       
232                                return 1
233
234                return 0
235
[84]236        def getNodeId( self, hostname ):
237
[89]238                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]239
[89]240                if len( id ) > 0:
[84]241
[89]242                        id = id[0][0]
243
[84]244                        return id
245                else:
246                        return None
247
248        def getNodeIds( self, hostnames ):
249
250                ids = [ ]
251
252                for node in hostnames:
253
254                        id = self.getNodeId( node )
255
256                        if id:
257                                ids.append( id )
258
259                return ids
260
261        def getJobId( self, jobid ):
262
263                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
264
265                if id:
[89]266                        id = id[0][0]
[84]267
268                        return id
269                else:
270                        return None
271
272        def addJob( self, job_id, jobattrs ):
273
274                if not self.getJobId( job_id ):
275
276                        self.mutateJob( 'insert', job_id, jobattrs ) 
277                else:
278                        self.mutateJob( 'update', job_id, jobattrs )
279
280        def mutateJob( self, action, job_id, jobattrs ):
281
[292]282                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
[84]283
[292]284                insert_col_str  = 'job_id'
285                insert_val_str  = "'%s'" %job_id
286                update_str      = None
[84]287
[365]288                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
[84]289
[99]290                ids = [ ]
291
[84]292                for valname, value in jobattrs.items():
293
[96]294                        if valname in job_values and value != '':
[84]295
296                                column_name = 'job_' + valname
297
298                                if action == 'insert':
299
300                                        if not insert_col_str:
301                                                insert_col_str = column_name
302                                        else:
303                                                insert_col_str = insert_col_str + ',' + column_name
304
305                                        if not insert_val_str:
306                                                insert_val_str = value
307                                        else:
308                                                insert_val_str = insert_val_str + ",'%s'" %value
309
310                                elif action == 'update':
311                                       
312                                        if not update_str:
313                                                update_str = "%s='%s'" %(column_name, value)
314                                        else:
315                                                update_str = update_str + ",%s='%s'" %(column_name, value)
316
[90]317                        elif valname == 'nodes' and value:
[84]318
[191]319                                node_valid = 1
[190]320
321                                if len(value) == 1:
322                               
[191]323                                        if jobattrs['status'] == 'Q':
[190]324
[191]325                                                node_valid = 0
[190]326
[191]327                                        else:
[190]328
[191]329                                                node_valid = 0
[190]330
[191]331                                                for node_char in str(value[0]):
[190]332
[191]333                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]334
[191]335                                                                node_valid = 1
[84]336
[191]337                                if node_valid:
338
339                                        ids = self.addNodes( value, jobattrs['domain'] )
340
[84]341                if action == 'insert':
342
343                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]344
[84]345                elif action == 'update':
346
[89]347                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
[84]348
[191]349                if len( ids ) > 0:
350                        self.addJobNodes( job_id, ids )
[190]351
[154]352        def addNodes( self, hostnames, domain ):
[84]353
[98]354                ids = [ ]
355
[84]356                for node in hostnames:
357
[292]358                        node    = '%s.%s' %( node, domain )
359                        id      = self.getNodeId( node )
[84]360       
361                        if not id:
362                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
[98]363                                id = self.getNodeId( node )
[84]364
[98]365                        ids.append( id )
366
367                return ids
368
[86]369        def addJobNodes( self, jobid, nodes ):
370
371                for node in nodes:
372
[191]373                        if not self.getJobNodeId( jobid, node ):
374
375                                self.addJobNode( jobid, node )
376
[84]377        def addJobNode( self, jobid, nodeid ):
378
379                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
380
381        def storeJobInfo( self, jobid, jobattrs ):
382
383                self.addJob( jobid, jobattrs )
384
[295]385        def checkStaleJobs( self ):
386
387                q = "SELECT * from jobs WHERE job_status != 'F'"
388
389                r = self.getDatabase( q )
390
391                if len( r ) == 0:
392
393                        return None
394
395                cleanjobs       = [ ]
396                timeoutjobs     = [ ]
397
398                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
399                cur_time        = time.time()
400
401                for row in r:
402
403                        job_id                  = row[0]
404                        job_requested_time      = row[4]
405                        job_status              = row[7]
406                        job_start_timestamp     = row[8]
407
408                        if job_status == 'Q' or not job_start_timestamp:
409
410                                cleanjobs.append( job_id )
411
412                        else:
413
414                                start_timestamp = int( job_start_timestamp )
415
416                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
417
418                                        if job_requested_time:
419
420                                                rtime_epoch     = reqtime2epoch( job_requested_time )
421                                        else:
422                                                rtime_epoch     = None
423                                       
424                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
425
426                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
427
428                for j in cleanjobs:
429
430                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
431                        self.setDatabase( q )
432
433                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
434
435                for j in timeoutjobs:
436
437                        ( i, s, r )             = j
438
439                        if r:
440                                new_end_timestamp       = int( s ) + r
441
442                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
443                        self.setDatabase( q )
444
[37]445class RRDMutator:
[63]446        """A class for performing RRD mutations"""
[37]447
[277]448        binary = None
[37]449
[365]450
[37]451        def __init__( self, binary=None ):
[63]452                """Set alternate binary if supplied"""
[37]453
454                if binary:
455                        self.binary = binary
456
457        def create( self, filename, args ):
[63]458                """Create a new rrd with args"""
459
[375]460                global MODRRDTOOL
[37]461
[375]462                if MODRRDTOOL:
463                        return self.perform( 'create', filename, args )
464                else:
465                        return self.perform( 'create', '"' + filename + '"', args )
466
[37]467        def update( self, filename, args ):
[63]468                """Update a rrd with args"""
469
[375]470                global MODRRDTOOL
[37]471
[375]472                if MODRRDTOOL:
473                        return self.perform( 'update', filename, args )
474                else:
475                        return self.perform( 'update', '"' + filename + '"', args )
476
[42]477        def grabLastUpdate( self, filename ):
[63]478                """Determine the last update time of filename rrd"""
[42]479
[375]480                global MODRRDTOOL
481
[42]482                last_update = 0
483
[375]484                if MODRRDTOOL:
[53]485
[375]486                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
[42]487
[375]488                        rrd_header      = { }
[292]489
[375]490                        try:
491                                rrd_header      = rrdtool.info( filename )
492                        except rrdtool.error, msg:
493                                debug_msg( 8, str( msg ) )
494
495                        if rrd_header.has_key( 'last_update' ):
496                                return last_update
497                        else:
498                                return 0
499
[42]500                else:
[375]501                        debug_msg( 8, self.binary + ' info ' + filename )
[42]502
[375]503                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
504
505                        for line in my_pipe.readlines():
506
507                                if line.find( 'last_update') != -1:
508
509                                        last_update = line.split( ' = ' )[1]
510
511                        if my_pipe:
512
513                                my_pipe.close()
514
515                        if last_update:
516                                return last_update
517                        else:
518                                return 0
519
520
[40]521        def perform( self, action, filename, args ):
[63]522                """Perform action on rrd filename with args"""
[37]523
[375]524                global MODRRDTOOL
525
[37]526                arg_string = None
527
[40]528                if type( args ) is not ListType:
529                        debug_msg( 8, 'Arguments needs to be of type List' )
530                        return 1
531
[37]532                for arg in args:
533
534                        if not arg_string:
535
536                                arg_string = arg
537                        else:
538                                arg_string = arg_string + ' ' + arg
539
[375]540                if MODRRDTOOL:
[37]541
[375]542                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
[292]543
[375]544                        try:
545                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
[37]546
[375]547                                if action == 'create':
[146]548
[375]549                                        rrdtool.create( str( filename ), *args )
[37]550
[375]551                                elif action == 'update':
[37]552
[375]553                                        rrdtool.update( str( filename ), *args )
[365]554
[375]555                        except rrdtool.error, msg:
[365]556
[375]557                                error_msg = str( msg )
558                                debug_msg( 8, error_msg )
559                                return 1
560
561                else:
562
563                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
564
565                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
566                        lines   = cmd.readlines()
567
568                        cmd.close()
569
570                        for line in lines:
571
572                                if line.find( 'ERROR' ) != -1:
573
574                                        error_msg = string.join( line.split( ' ' )[1:] )
575                                        debug_msg( 8, error_msg )
576                                        return 1
577
[37]578                return 0
579
[78]580class XMLProcessor:
581        """Skeleton class for XML processor's"""
582
583        def run( self ):
584                """Do main processing of XML here"""
585
586                pass
587
588class TorqueXMLProcessor( XMLProcessor ):
589        """Main class for processing XML and acting with it"""
590
[295]591        def __init__( self, XMLSource, DataStore ):
[78]592                """Setup initial XML connection and handlers"""
593
[293]594                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]595                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
596                self.myXMLSource        = XMLSource
[295]597                self.myXMLHandler       = TorqueXMLHandler( DataStore )
[287]598                self.myXMLError         = XMLErrorHandler()
[78]599
[287]600                self.config             = GangliaConfigParser( GMETAD_CONF )
601
[78]602        def run( self ):
603                """Main XML processing"""
604
[169]605                debug_msg( 1, 'torque_xml_thread(): started.' )
[87]606
[78]607                while( 1 ):
608
[287]609                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
[169]610                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
[176]611
[287]612                        my_data = self.myXMLSource.getData()
613
[176]614                        try:
[287]615                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]616                        except socket.error, msg:
617                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
618                               
[169]619                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
620                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[87]621                        time.sleep( self.config.getLowestInterval() )
[78]622
[71]623class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
[63]624        """Parse Torque's jobinfo XML from our plugin"""
625
[72]626        jobAttrs = { }
627
[295]628        def __init__( self, datastore ):
[84]629
[295]630                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
631                self.ds                 = datastore
[292]632                self.jobs_processed     = [ ]
633                self.jobs_to_store      = [ ]
[84]634
[183]635        def startDocument( self ):
636
[292]637                self.heartbeat  = 0
[365]638                self.elementct  = 0
[183]639
[63]640        def startElement( self, name, attrs ):
641                """
642                This XML will be all gmetric XML
643                so there will be no specific start/end element
644                just one XML statement with all info
645                """
646               
[73]647                jobinfo = { }
[63]648
[365]649                self.elementct  += 1
650
[199]651                if name == 'CLUSTER':
[63]652
[372]653                        self.clustername = str( attrs.get( 'NAME', "" ) )
[199]654
655                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
656
[372]657                        metricname = str( attrs.get( 'NAME', "" ) )
[63]658
[285]659                        if metricname == 'MONARCH-HEARTBEAT':
[372]660                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
[63]661
[285]662                        elif metricname.find( 'MONARCH-JOB' ) != -1:
[63]663
[292]664                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
[372]665                                val     = str( attrs.get( 'VAL', "" ) )
[63]666
[96]667                                if not job_id in self.jobs_processed:
[292]668
[96]669                                        self.jobs_processed.append( job_id )
670
[73]671                                check_change = 0
672
673                                if self.jobAttrs.has_key( job_id ):
[292]674
[73]675                                        check_change = 1
676
[63]677                                valinfo = val.split( ' ' )
678
679                                for myval in valinfo:
680
[84]681                                        if len( myval.split( '=' ) ) > 1:
[63]682
[292]683                                                valname = myval.split( '=' )[0]
684                                                value   = myval.split( '=' )[1]
[70]685
[84]686                                                if valname == 'nodes':
687                                                        value = value.split( ';' )
[72]688
[84]689                                                jobinfo[ valname ] = value
690
[73]691                                if check_change:
[182]692                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
[292]693                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
694                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
[84]695                                                if not job_id in self.jobs_to_store:
696                                                        self.jobs_to_store.append( job_id )
697
[365]698                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]699                                else:
700                                        self.jobAttrs[ job_id ] = jobinfo
[84]701
702                                        if not job_id in self.jobs_to_store:
703                                                self.jobs_to_store.append( job_id )
704
[365]705                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]706                                       
[77]707        def endDocument( self ):
[74]708                """When all metrics have gone, check if any jobs have finished"""
[72]709
[371]710                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
[365]711
[182]712                if self.heartbeat:
713                        for jobid, jobinfo in self.jobAttrs.items():
[74]714
[182]715                                # This is an old job, not in current jobinfo list anymore
716                                # it must have finished, since we _did_ get a new heartbeat
717                                #
718                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
[102]719
[184]720                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
[74]721
[182]722                                        if not jobid in self.jobs_processed:
723                                                self.jobs_processed.append( jobid )
[96]724
[182]725                                        self.jobAttrs[ jobid ]['status'] = 'F'
[360]726                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
[96]727
[182]728                                        if not jobid in self.jobs_to_store:
729                                                self.jobs_to_store.append( jobid )
[74]730
[182]731                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
[87]732
[182]733                        for jobid in self.jobs_to_store:
[295]734                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
[84]735
[184]736                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
737
738                                        if self.jobAttrs[ jobid ]['status'] == 'F':
739                                                del self.jobAttrs[ jobid ]
740
[182]741                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
[87]742
[292]743                        self.jobs_processed     = [ ]
744                        self.jobs_to_store      = [ ]
[84]745
[82]746        def setJobAttrs( self, old, new ):
747                """
748                Set new job attributes in old, but not lose existing fields
749                if old attributes doesn't have those
750                """
751
752                for valname, value in new.items():
753                        old[ valname ] = value
754
755                return old
756               
757
[73]758        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
[74]759                """
760                Check if jobinfo has changed from jobattrs[jobid]
761                if it's report time is bigger than previous one
762                and it is report time is recent (equal to heartbeat)
763                """
[72]764
[87]765                ignore_changes = [ 'reported' ]
766
[73]767                if jobattrs.has_key( jobid ):
768
769                        for valname, value in jobinfo.items():
770
[87]771                                if valname not in ignore_changes:
[73]772
[87]773                                        if jobattrs[ jobid ].has_key( valname ):
[73]774
[87]775                                                if value != jobattrs[ jobid ][ valname ]:
[73]776
[87]777                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
[360]778                                                                return True
[73]779
[87]780                                        else:
[360]781                                                return True
[87]782
[360]783                return False
[73]784
[71]785class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[63]786        """Parse Ganglia's XML"""
[3]787
[295]788        def __init__( self, config, datastore ):
[63]789                """Setup initial variables and gather info on existing rrd archive"""
790
[292]791                self.config     = config
792                self.clusters   = { }
[295]793                self.ds         = datastore
[324]794
[295]795                debug_msg( 1, 'Checking database..' )
[365]796
797                global DEBUG_LEVEL
798
799                if DEBUG_LEVEL <= 2:
800                        self.ds.checkStaleJobs()
801
[295]802                debug_msg( 1, 'Check done.' )
[296]803                debug_msg( 1, 'Checking rrd archive..' )
[293]804                self.gatherClusters()
[169]805                debug_msg( 1, 'Check done.' )
[33]806
[44]807        def gatherClusters( self ):
[63]808                """Find all existing clusters in archive dir"""
[44]809
[292]810                archive_dir     = check_dir(ARCHIVE_PATH)
[44]811
[292]812                hosts           = [ ]
[44]813
814                if os.path.exists( archive_dir ):
815
[292]816                        dirlist = os.listdir( archive_dir )
[44]817
[369]818                        for cfgcluster in ARCHIVE_DATASOURCES:
819
820                                if cfgcluster not in dirlist:
821
822                                        # Autocreate a directory for this cluster
823                                        # assume it is new
824                                        #
825                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
826
827                                        os.mkdir( cluster_dir )
828
[370]829                                        dirlist.append( cfgcluster )
830
[44]831                        for item in dirlist:
832
833                                clustername = item
834
[60]835                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]836
837                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
838
[365]839                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
840
[6]841        def startElement( self, name, attrs ):
[63]842                """Memorize appropriate data from xml start tags"""
[3]843
[7]844                if name == 'GANGLIA_XML':
[32]845
[372]846                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
847                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
[32]848
[12]849                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]850
[7]851                elif name == 'GRID':
[32]852
[372]853                        self.gridName   = str( attrs.get( 'NAME', "" ) )
854                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
[32]855
[12]856                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]857
[7]858                elif name == 'CLUSTER':
[32]859
[372]860                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
861                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
[32]862
[60]863                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]864
[34]865                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]866
[35]867                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]868
[60]869                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]870
[372]871                        self.hostName           = str( attrs.get( 'NAME', "" ) )
872                        self.hostIp             = str( attrs.get( 'IP', "" ) )
873                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
[32]874
[12]875                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]876
[60]877                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]878
[372]879                        type = str( attrs.get( 'TYPE', "" ) )
[198]880                       
881                        exclude_metric = False
882                       
883                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]884
[372]885                                orig_name = str( attrs.get( 'NAME', "" ) )
[3]886
[198]887                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
888                               
889                                        exclude_metric = True
890
891                                elif re.match( ex_metricstr, orig_name ):
892
893                                        exclude_metric = True
894
895                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
896
[292]897                                myMetric                = { }
[372]898                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
899                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
[292]900                                myMetric['time']        = self.hostReported
[3]901
[34]902                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]903
[34]904                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]905
[34]906        def storeMetrics( self ):
[63]907                """Store metrics of each cluster rrd handler"""
[9]908
[34]909                for clustername, rrdh in self.clusters.items():
[16]910
[38]911                        ret = rrdh.storeMetrics()
[9]912
[38]913                        if ret:
914                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
915                                return 1
916
917                return 0
918
[71]919class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
920
921        def error( self, exception ):
922                """Recoverable error"""
923
[169]924                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]925
926        def fatalError( self, exception ):
927                """Non-recoverable error"""
928
929                exception_str = str( exception )
930
931                # Ignore 'no element found' errors
932                if exception_str.find( 'no element found' ) != -1:
[169]933                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
[71]934                        return 0
935
[170]936                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]937                sys.exit( 1 )
938
939        def warning( self, exception ):
940                """Warning"""
941
942                debug_msg( 0, 'Warning ' + str( exception ) )
943
[78]944class XMLGatherer:
[63]945        """Setup a connection and file object to Ganglia's XML"""
[3]946
[287]947        s               = None
948        fd              = None
949        data            = None
[293]950        slot            = None
[8]951
[287]952        # Time since the last update
953        #
954        LAST_UPDATE     = 0
955
956        # Minimum interval between updates
957        #
958        MIN_UPDATE_INT  = 10
959
960        # Is a update occuring now
961        #
962        update_now      = False
963
[8]964        def __init__( self, host, port ):
[63]965                """Store host and port for connection"""
[8]966
[293]967                self.host       = host
968                self.port       = port
969                self.slot       = threading.Lock()
[3]970
[287]971                self.retrieveData()
972
973        def retrieveData( self ):
[63]974                """Setup connection to XML source"""
[8]975
[287]976                self.update_now = True
977
[293]978                self.slot.acquire()
979
[8]980                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]981
[5]982                        af, socktype, proto, canonname, sa = res
[32]983
[5]984                        try:
[32]985
[8]986                                self.s = socket.socket( af, socktype, proto )
[32]987
[5]988                        except socket.error, msg:
[32]989
[8]990                                self.s = None
[5]991                                continue
[32]992
[5]993                        try:
[32]994
[8]995                                self.s.connect( sa )
[32]996
[5]997                        except socket.error, msg:
[32]998
[70]999                                self.disconnect()
[5]1000                                continue
[32]1001
[5]1002                        break
[3]1003
[8]1004                if self.s is None:
[32]1005
[170]1006                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
[287]1007                        self.update_now = False
[33]1008                        sys.exit( 1 )
[5]1009
[287]1010                else:
[324]1011                        #self.s.send( '\n' )
[287]1012
1013                        my_fp                   = self.s.makefile( 'r' )
1014                        my_data                 = my_fp.readlines()
1015                        my_data                 = string.join( my_data, '' )
1016
1017                        self.data               = my_data
1018
1019                        self.LAST_UPDATE        = time.time()
1020
[293]1021                self.slot.release()
1022
[287]1023                self.update_now = False
1024
[33]1025        def disconnect( self ):
[63]1026                """Close socket"""
[33]1027
1028                if self.s:
[287]1029                        #self.s.shutdown( 2 )
[33]1030                        self.s.close()
1031                        self.s = None
1032
1033        def __del__( self ):
[63]1034                """Kill the socket before we leave"""
[33]1035
1036                self.disconnect()
1037
[287]1038        def reGetData( self ):
[70]1039                """Reconnect"""
[33]1040
[287]1041                while self.update_now:
1042
1043                        # Must be another update in progress:
1044                        # Wait until the update is complete
1045                        #
1046                        time.sleep( 1 )
1047
[38]1048                if self.s:
1049                        self.disconnect()
[33]1050
[287]1051                self.retrieveData()
[5]1052
[287]1053        def getData( self ):
1054
1055                """Return the XML data"""
1056
1057                # If more than MIN_UPDATE_INT seconds passed since last data update
1058                # update the XML first before returning it
1059                #
1060
1061                cur_time        = time.time()
1062
1063                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1064
1065                        self.reGetData()
1066
1067                while self.update_now:
1068
1069                        # Must be another update in progress:
1070                        # Wait until the update is complete
1071                        #
1072                        time.sleep( 1 )
1073                       
1074                return self.data
1075
[70]1076        def makeFileDescriptor( self ):
1077                """Make file descriptor that points to our socket connection"""
1078
1079                self.reconnect()
1080
1081                if self.s:
1082                        self.fd = self.s.makefile( 'r' )
1083
1084        def getFileObject( self ):
1085                """Connect, and return a file object"""
1086
[78]1087                self.makeFileDescriptor()
1088
[70]1089                if self.fd:
1090                        return self.fd
1091
[78]1092class GangliaXMLProcessor( XMLProcessor ):
[63]1093        """Main class for processing XML and acting with it"""
[5]1094
[295]1095        def __init__( self, XMLSource, DataStore ):
[63]1096                """Setup initial XML connection and handlers"""
[33]1097
[287]1098                self.config             = GangliaConfigParser( GMETAD_CONF )
[33]1099
[293]1100                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]1101                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1102                self.myXMLSource        = XMLSource
[295]1103                self.ds                 = DataStore
1104                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
[287]1105                self.myXMLError         = XMLErrorHandler()
[73]1106
[9]1107        def run( self ):
[63]1108                """Main XML processing; start a xml and storethread"""
[8]1109
[102]1110                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1111                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]1112
[36]1113                while( 1 ):
1114
[102]1115                        if not xml_thread.isAlive():
[36]1116                                # Gather XML at the same interval as gmetad
1117
1118                                # threaded call to: self.processXML()
1119                                #
[169]1120                                try:
1121                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1122                                        xml_thread.start()
[176]1123                                except thread.error, msg:
[169]1124                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1125                                        #return 1
[36]1126
[102]1127                        if not store_thread.isAlive():
[55]1128                                # Store metrics every .. sec
[36]1129
[55]1130                                # threaded call to: self.storeMetrics()
1131                                #
[169]1132                                try:
1133                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1134                                        store_thread.start()
[176]1135                                except thread.error, msg:
[169]1136                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1137                                        #return 1
[36]1138               
1139                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1140                        time.sleep( 1 ) 
1141
[33]1142        def storeMetrics( self ):
[63]1143                """Store metrics retained in memory to disk"""
[22]1144
[365]1145                global DEBUG_LEVEL
1146
[63]1147                # Store metrics somewhere between every 360 and 640 seconds
[38]1148                #
[365]1149                if DEBUG_LEVEL > 2:
1150                        #STORE_INTERVAL = 60
1151                        STORE_INTERVAL = random.randint( 360, 640 )
1152                else:
1153                        STORE_INTERVAL = random.randint( 360, 640 )
[22]1154
[169]1155                try:
1156                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1157                        store_metric_thread.start()
[176]1158                except thread.error, msg:
[169]1159                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1160                        return 1
[36]1161
[169]1162                debug_msg( 1, 'ganglia_store_thread(): started.' )
1163
1164                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]1165                time.sleep( STORE_INTERVAL )
[169]1166                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]1167
[102]1168                if store_metric_thread.isAlive():
[36]1169
[169]1170                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[136]1171                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[169]1172                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
[36]1173
[169]1174                debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]1175
1176                return 0
1177
[39]1178        def storeThread( self ):
[63]1179                """Actual metric storing thread"""
[39]1180
[169]1181                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1182                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[78]1183                ret = self.myXMLHandler.storeMetrics()
[176]1184                if ret > 0:
1185                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[169]1186                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1187                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
[39]1188               
[176]1189                return 0
[39]1190
[8]1191        def processXML( self ):
[63]1192                """Process XML"""
[8]1193
[169]1194                try:
1195                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1196                        parsethread.start()
[176]1197                except thread.error, msg:
[169]1198                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1199                        return 1
[8]1200
[169]1201                debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]1202
[169]1203                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]1204                time.sleep( float( self.config.getLowestInterval() ) ) 
[169]1205                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]1206
1207                if parsethread.isAlive():
1208
[169]1209                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[47]1210                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[169]1211                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
[36]1212
[169]1213                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]1214
1215                return 0
1216
[39]1217        def parseThread( self ):
[63]1218                """Actual parsing thread"""
[39]1219
[169]1220                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1221                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
[287]1222                #self.myXMLSource = self.myXMLGatherer.getFileObject()
1223               
1224                my_data = self.myXMLSource.getData()
[176]1225
[293]1226                #print my_data
1227
[176]1228                try:
[287]1229                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]1230                except socket.error, msg:
1231                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1232
[169]1233                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1234                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]1235
[176]1236                return 0
[39]1237
[9]1238class GangliaConfigParser:
1239
[34]1240        sources = [ ]
[9]1241
1242        def __init__( self, config ):
[63]1243                """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1244
[9]1245                self.config = config
1246                self.parseValues()
1247
[32]1248        def parseValues( self ):
[63]1249                """Parse certain values from gmetad.conf"""
[9]1250
1251                readcfg = open( self.config, 'r' )
1252
1253                for line in readcfg.readlines():
1254
1255                        if line.count( '"' ) > 1:
1256
[10]1257                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1258
[292]1259                                        source          = { }
1260                                        source['name']  = line.split( '"' )[1]
1261                                        source_words    = line.split( '"' )[2].split( ' ' )
[9]1262
1263                                        for word in source_words:
1264
1265                                                valid_interval = 1
1266
1267                                                for letter in word:
[32]1268
[9]1269                                                        if letter not in string.digits:
[32]1270
[9]1271                                                                valid_interval = 0
1272
[10]1273                                                if valid_interval and len(word) > 0:
[32]1274
[9]1275                                                        source['interval'] = word
[12]1276                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]1277       
1278                                        # No interval found, use Ganglia's default     
1279                                        if not source.has_key( 'interval' ):
1280                                                source['interval'] = 15
1281                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1282
[33]1283                                        self.sources.append( source )
[9]1284
1285        def getInterval( self, source_name ):
[63]1286                """Return interval for source_name"""
[32]1287
[9]1288                for source in self.sources:
[32]1289
[12]1290                        if source['name'] == source_name:
[32]1291
[9]1292                                return source['interval']
[32]1293
[9]1294                return None
1295
[34]1296        def getLowestInterval( self ):
[63]1297                """Return the lowest interval of all clusters"""
[34]1298
1299                lowest_interval = 0
1300
1301                for source in self.sources:
1302
1303                        if not lowest_interval or source['interval'] <= lowest_interval:
1304
1305                                lowest_interval = source['interval']
1306
1307                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1308                if lowest_interval:
1309                        return lowest_interval
1310                else:
1311                        return 15
1312
[9]1313class RRDHandler:
[63]1314        """Class for handling RRD activity"""
[9]1315
[32]1316        myMetrics = { }
[40]1317        lastStored = { }
[47]1318        timeserials = { }
[36]1319        slot = None
[32]1320
[33]1321        def __init__( self, config, cluster ):
[63]1322                """Setup initial variables"""
[78]1323
[292]1324                self.block      = 0
1325                self.cluster    = cluster
1326                self.config     = config
1327                self.slot       = threading.Lock()
1328                self.rrdm       = RRDMutator( RRDTOOL )
1329
[365]1330                global DEBUG_LEVEL
[9]1331
[365]1332                if DEBUG_LEVEL <= 2:
1333                        self.gatherLastUpdates()
1334
[42]1335        def gatherLastUpdates( self ):
[63]1336                """Populate the lastStored list, containing timestamps of all last updates"""
[42]1337
1338                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1339
1340                hosts = [ ]
1341
1342                if os.path.exists( cluster_dir ):
1343
[44]1344                        dirlist = os.listdir( cluster_dir )
[42]1345
[44]1346                        for dir in dirlist:
[42]1347
[44]1348                                hosts.append( dir )
[42]1349
1350                for host in hosts:
1351
[292]1352                        host_dir        = cluster_dir + '/' + host
1353                        dirlist         = os.listdir( host_dir )
[47]1354
1355                        for dir in dirlist:
1356
1357                                if not self.timeserials.has_key( host ):
1358
1359                                        self.timeserials[ host ] = [ ]
1360
1361                                self.timeserials[ host ].append( dir )
1362
[42]1363                        last_serial = self.getLastRrdTimeSerial( host )
[292]1364
[42]1365                        if last_serial:
1366
1367                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
[292]1368
[42]1369                                if os.path.exists( metric_dir ):
1370
[44]1371                                        dirlist = os.listdir( metric_dir )
[42]1372
[44]1373                                        for file in dirlist:
[42]1374
[44]1375                                                metricname = file.split( '.rrd' )[0]
[42]1376
[44]1377                                                if not self.lastStored.has_key( host ):
[42]1378
[44]1379                                                        self.lastStored[ host ] = { }
[42]1380
[44]1381                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1382
[32]1383        def getClusterName( self ):
[63]1384                """Return clustername"""
1385
[32]1386                return self.cluster
1387
1388        def memMetric( self, host, metric ):
[63]1389                """Store metric from host in memory"""
[32]1390
[179]1391                # <ATOMIC>
1392                #
1393                self.slot.acquire()
1394               
[34]1395                if self.myMetrics.has_key( host ):
[32]1396
[34]1397                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1398
[34]1399                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1400
[34]1401                                        if mymetric['time'] == metric['time']:
[32]1402
[34]1403                                                # Allready have this metric, abort
[179]1404                                                self.slot.release()
[34]1405                                                return 1
1406                        else:
1407                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1408                else:
[292]1409                        self.myMetrics[ host ]                          = { }
1410                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
[32]1411
[63]1412                # Push new metric onto stack
1413                # atomic code; only 1 thread at a time may access the stack
1414
[32]1415                self.myMetrics[ host ][ metric['name'] ].append( metric )
1416
[40]1417                self.slot.release()
[53]1418                #
1419                # </ATOMIC>
[40]1420
[47]1421        def makeUpdateList( self, host, metriclist ):
[63]1422                """
1423                Make a list of update values for rrdupdate
1424                but only those that we didn't store before
1425                """
[37]1426
[292]1427                update_list     = [ ]
1428                metric          = None
[37]1429
[47]1430                while len( metriclist ) > 0:
[37]1431
[53]1432                        metric = metriclist.pop( 0 )
[37]1433
[53]1434                        if self.checkStoreMetric( host, metric ):
[292]1435
[365]1436                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1437                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1438                                update_list.append( u_val )
[40]1439
[37]1440                return update_list
1441
[49]1442        def checkStoreMetric( self, host, metric ):
[63]1443                """Check if supplied metric if newer than last one stored"""
[40]1444
1445                if self.lastStored.has_key( host ):
1446
[47]1447                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]1448
[47]1449                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1450
[50]1451                                        # This is old
[40]1452                                        return 0
1453
[50]1454                return 1
1455
[54]1456        def memLastUpdate( self, host, metricname, metriclist ):
[63]1457                """
1458                Memorize the time of the latest metric from metriclist
1459                but only if it wasn't allready memorized
1460                """
[50]1461
[54]1462                if not self.lastStored.has_key( host ):
1463                        self.lastStored[ host ] = { }
1464
[50]1465                last_update_time = 0
1466
1467                for metric in metriclist:
1468
[54]1469                        if metric['name'] == metricname:
[50]1470
[54]1471                                if metric['time'] > last_update_time:
[50]1472
[54]1473                                        last_update_time = metric['time']
[40]1474
[54]1475                if self.lastStored[ host ].has_key( metricname ):
[52]1476                       
[54]1477                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]1478                                return 1
[40]1479
[54]1480                self.lastStored[ host ][ metricname ] = last_update_time
[52]1481
[33]1482        def storeMetrics( self ):
[63]1483                """
1484                Store all metrics from memory to disk
1485                and do it to the RRD's in appropriate timeperiod directory
1486                """
[33]1487
[365]1488                debug_msg( 5, "Entering storeMetrics()")
1489
1490                count_values    = 0
1491                count_metrics   = 0
1492                count_bits      = 0
1493
[33]1494                for hostname, mymetrics in self.myMetrics.items():     
1495
1496                        for metricname, mymetric in mymetrics.items():
1497
[365]1498                                count_metrics += 1
1499
1500                                for dmetric in mymetric:
1501
1502                                        count_values += 1
1503
1504                                        count_bits      += len( dmetric['time'] )
1505                                        count_bits      += len( dmetric['val'] )
1506
1507                count_bytes     = count_bits / 8
1508
1509                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1510                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1511                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1512                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1513
1514                for hostname, mymetrics in self.myMetrics.items():     
1515
1516                        for metricname, mymetric in mymetrics.items():
1517
[53]1518                                metrics_to_store = [ ]
1519
[63]1520                                # Pop metrics from stack for storing until none is left
1521                                # atomic code: only 1 thread at a time may access myMetrics
1522
[53]1523                                # <ATOMIC>
[50]1524                                #
[47]1525                                self.slot.acquire() 
[33]1526
[54]1527                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1528
[54]1529                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1530
[176]1531                                                try:
1532                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1533                                                except IndexError, msg:
1534
[179]1535                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1536                                                        # is still len 0 when the statement is executed.
1537                                                        # Just ignore indexerror's..
[176]1538                                                        pass
1539
[53]1540                                self.slot.release()
1541                                #
1542                                # </ATOMIC>
1543
[47]1544                                # Create a mapping table, each metric to the period where it should be stored
1545                                #
[53]1546                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1547
[50]1548                                update_rets = [ ]
1549
[47]1550                                for period, pmetric in metric_serial_table.items():
1551
[146]1552                                        create_ret = self.createCheck( hostname, metricname, period )   
[47]1553
1554                                        update_ret = self.update( hostname, metricname, period, pmetric )
1555
1556                                        if update_ret == 0:
1557
1558                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1559                                        else:
1560                                                debug_msg( 9, 'metric update failed' )
1561
[146]1562                                        update_rets.append( create_ret )
[50]1563                                        update_rets.append( update_ret )
[47]1564
[179]1565                                # Lets ignore errors here for now, we need to make sure last update time
1566                                # is correct!
1567                                #
1568                                #if not (1) in update_rets:
[50]1569
[179]1570                                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1571
[365]1572                debug_msg( 5, "Leaving storeMetrics()")
1573
[17]1574        def makeTimeSerial( self ):
[63]1575                """Generate a time serial. Seconds since epoch"""
[17]1576
1577                # Seconds since epoch
1578                mytime = int( time.time() )
1579
1580                return mytime
1581
[50]1582        def makeRrdPath( self, host, metricname, timeserial ):
[63]1583                """Make a RRD location/path and filename"""
[17]1584
[292]1585                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1586                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
[17]1587
1588                return rrd_dir, rrd_file
1589
[20]1590        def getLastRrdTimeSerial( self, host ):
[63]1591                """Find the last timeserial (directory) for this host"""
[17]1592
[19]1593                newest_timeserial = 0
1594
[47]1595                for dir in self.timeserials[ host ]:
[32]1596
[47]1597                        valid_dir = 1
[17]1598
[47]1599                        for letter in dir:
1600                                if letter not in string.digits:
1601                                        valid_dir = 0
[17]1602
[47]1603                        if valid_dir:
1604                                timeserial = dir
1605                                if timeserial > newest_timeserial:
1606                                        newest_timeserial = timeserial
[17]1607
1608                if newest_timeserial:
[18]1609                        return newest_timeserial
[17]1610                else:
1611                        return 0
1612
[47]1613        def determinePeriod( self, host, check_serial ):
[63]1614                """Determine to which period (directory) this time(serial) belongs"""
[47]1615
1616                period_serial = 0
1617
[56]1618                if self.timeserials.has_key( host ):
[47]1619
[56]1620                        for serial in self.timeserials[ host ]:
[47]1621
[56]1622                                if check_serial >= serial and period_serial < serial:
[47]1623
[56]1624                                        period_serial = serial
1625
[47]1626                return period_serial
1627
1628        def determineSerials( self, host, metricname, metriclist ):
1629                """
1630                Determine the correct serial and corresponding rrd to store
1631                for a list of metrics
1632                """
1633
1634                metric_serial_table = { }
1635
1636                for metric in metriclist:
1637
1638                        if metric['name'] == metricname:
1639
[292]1640                                period          = self.determinePeriod( host, metric['time'] ) 
[47]1641
[292]1642                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[47]1643
[49]1644                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1645
1646                                        # This one should get it's own new period
1647                                        period = metric['time']
[57]1648
1649                                        if not self.timeserials.has_key( host ):
1650                                                self.timeserials[ host ] = [ ]
1651
[50]1652                                        self.timeserials[ host ].append( period )
[47]1653
1654                                if not metric_serial_table.has_key( period ):
1655
[49]1656                                        metric_serial_table[ period ] = [ ]
[47]1657
1658                                metric_serial_table[ period ].append( metric )
1659
1660                return metric_serial_table
1661
[33]1662        def createCheck( self, host, metricname, timeserial ):
[63]1663                """Check if an rrd allready exists for this metric, create if not"""
[9]1664
[35]1665                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]1666               
[33]1667                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]1668
[9]1669                if not os.path.exists( rrd_dir ):
[58]1670
1671                        try:
1672                                os.makedirs( rrd_dir )
1673
[169]1674                        except os.OSError, msg:
[58]1675
1676                                if msg.find( 'File exists' ) != -1:
1677
1678                                        # Ignore exists errors
1679                                        pass
1680
1681                                else:
1682
1683                                        print msg
1684                                        return
1685
[14]1686                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]1687
[14]1688                if not os.path.exists( rrd_file ):
[9]1689
[292]1690                        interval        = self.config.getInterval( self.cluster )
1691                        heartbeat       = 8 * int( interval )
[9]1692
[292]1693                        params          = [ ]
[12]1694
[37]1695                        params.append( '--step' )
1696                        params.append( str( interval ) )
[12]1697
[37]1698                        params.append( '--start' )
[47]1699                        params.append( str( int( timeserial ) - 1 ) )
[12]1700
[37]1701                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1702                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]1703
[37]1704                        self.rrdm.create( str(rrd_file), params )
1705
[14]1706                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1707
[47]1708        def update( self, host, metricname, timeserial, metriclist ):
[63]1709                """
1710                Update rrd file for host with metricname
1711                in directory timeserial with metriclist
1712                """
[9]1713
[35]1714                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]1715
[292]1716                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
[18]1717
[292]1718                update_list             = self.makeUpdateList( host, metriclist )
[15]1719
[41]1720                if len( update_list ) > 0:
1721                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]1722
[41]1723                        if ret:
1724                                return 1
[27]1725               
[41]1726                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]1727
[36]1728                return 0
1729
[169]1730def daemon():
1731        """daemonized threading"""
[8]1732
[169]1733        # Fork the first child
1734        #
1735        pid = os.fork()
1736
1737        if pid > 0:
1738
1739                sys.exit(0)  # end parent
1740
1741        # creates a session and sets the process group ID
1742        #
1743        os.setsid()
1744
1745        # Fork the second child
1746        #
1747        pid = os.fork()
1748
1749        if pid > 0:
1750
1751                sys.exit(0)  # end parent
1752
1753        # Go to the root directory and set the umask
1754        #
1755        os.chdir('/')
1756        os.umask(0)
1757
1758        sys.stdin.close()
1759        sys.stdout.close()
1760        sys.stderr.close()
1761
[273]1762        os.open('/dev/null', os.O_RDWR)
1763        os.dup2(0, 1)
1764        os.dup2(0, 2)
[169]1765
1766        run()
1767
1768def run():
1769        """Threading start"""
1770
[287]1771        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[295]1772        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[8]1773
[295]1774        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1775        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
[287]1776
[169]1777        try:
[292]1778                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1779                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]1780
[169]1781                torque_xml_thread.start()
1782                ganglia_xml_thread.start()
1783               
[176]1784        except thread.error, msg:
[169]1785                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1786                syslog.closelog()
1787                sys.exit(1)
1788               
1789        debug_msg( 0, 'main threading started.' )
[78]1790
[169]1791def main():
1792        """Program startup"""
1793
[375]1794        global DAEMONIZE, USE_SYSLOG
1795
[214]1796        if not processArgs( sys.argv[1:] ):
1797                sys.exit( 1 )
1798
[169]1799        if( DAEMONIZE and USE_SYSLOG ):
1800                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1801
1802        if DAEMONIZE:
1803                daemon()
1804        else:
1805                run()
1806
1807#
[81]1808# Global functions
[169]1809#
[81]1810
[9]1811def check_dir( directory ):
[63]1812        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]1813
1814        if directory[-1] == '/':
1815                directory = directory[:-1]
1816
1817        return directory
1818
[295]1819def reqtime2epoch( rtime ):
1820
1821        (hours, minutes, seconds )      = rtime.split( ':' )
1822
1823        etime   = int(seconds)
1824        etime   = etime + ( int(minutes) * 60 )
1825        etime   = etime + ( int(hours) * 60 * 60 )
1826
1827        return etime
1828
[12]1829def debug_msg( level, msg ):
[169]1830        """Only print msg if correct levels"""
[12]1831
[169]1832        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1833                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1834       
1835        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1836                syslog.syslog( msg )
[12]1837
[46]1838def printTime( ):
[63]1839        """Print current time in human readable format"""
[46]1840
1841        return time.strftime("%a %d %b %Y %H:%M:%S")
1842
[63]1843# Ooohh, someone started me! Let's go..
[9]1844if __name__ == '__main__':
1845        main()
Note: See TracBrowser for help on using the repository browser.