Changeset 770


Ignore:
Timestamp:
03/28/13 16:41:54 (9 years ago)
Author:
ramonb
Message:
  • cleanup
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/0.4/jobarchived/jobarchived.py

    r500 r770  
    2424import getopt, syslog, ConfigParser, sys
    2525
    26 VERSION='0.3.1'
     26VERSION='0.4+SVN'
    2727
    2828def usage( ver ):
    2929
    30         print 'jobarchived %s' %VERSION
    31 
    32         if ver:
    33                 return 0
    34 
    35         print
    36         print 'Purpose:'
    37         print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
    38         print '  and node statistics in a RRD archive'
    39         print
    40         print 'Usage:   jobarchived [OPTIONS]'
    41         print
    42         print '  -c, --config=FILE      The configuration file to use (default: /etc/jobarchived.conf)'
    43         print '  -p, --pidfile=FILE     Use pid file to store the process id'
    44         print '  -h, --help             Print help and exit'
    45         print '  -v, --version          Print version and exit'
    46         print
     30    print 'jobarchived %s' %VERSION
     31
     32    if ver:
     33        return 0
     34
     35    print
     36    print 'Purpose:'
     37    print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
     38    print '  and node statistics in a RRD archive'
     39    print
     40    print 'Usage:    jobarchived [OPTIONS]'
     41    print
     42    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobarchived.conf)'
     43    print '  -p, --pidfile=FILE    Use pid file to store the process id'
     44    print '  -h, --help        Print help and exit'
     45    print '  -v, --version        Print version and exit'
     46    print
    4747
    4848def processArgs( args ):
    4949
    50         SHORT_L = 'p:hvc:'
    51         LONG_L  = [ 'help', 'config=', 'pidfile=', 'version' ]
     50        SHORT_L    = 'p:hvc:'
     51        LONG_L    = [ 'help', 'config=', 'pidfile=', 'version' ]
    5252
    5353        config_filename = '/etc/jobarchived.conf'
    5454
    55         global PIDFILE
    56 
    57         PIDFILE = None
     55    global PIDFILE
     56
     57    PIDFILE    = None
    5858
    5959        try:
     
    7272                        config_filename = value
    7373
    74                 if opt in [ '--pidfile', '-p' ]:
    75 
    76                         PIDFILE         = value
    77 
    78                 if opt in [ '--help', '-h' ]:
    79 
    80                         usage( False )
    81                         sys.exit( 0 )
    82 
    83                 if opt in [ '--version', '-v' ]:
    84 
    85                         usage( True )
    86                         sys.exit( 0 )
    87 
    88         try:
    89                 return loadConfig( config_filename )
    90 
    91         except ConfigParser.NoOptionError, detail:
    92 
    93                 print detail
    94                 sys.exit( 1 )
     74        if opt in [ '--pidfile', '-p' ]:
     75
     76            PIDFILE         = value
     77
     78        if opt in [ '--help', '-h' ]:
     79
     80            usage( False )
     81            sys.exit( 0 )
     82
     83        if opt in [ '--version', '-v' ]:
     84
     85            usage( True )
     86            sys.exit( 0 )
     87
     88    try:
     89        return loadConfig( config_filename )
     90
     91    except ConfigParser.NoOptionError, detail:
     92
     93        print detail
     94        sys.exit( 1 )
    9595
    9696def loadConfig( filename ):
    9797
    98         def getlist( cfg_string ):
    99 
    100                 my_list = [ ]
    101 
    102                 for item_txt in cfg_string.split( ',' ):
    103 
    104                         sep_char = None
    105 
    106                         item_txt = item_txt.strip()
    107 
    108                         for s_char in [ "'", '"' ]:
    109 
    110                                 if item_txt.find( s_char ) != -1:
    111 
    112                                         if item_txt.count( s_char ) != 2:
    113 
    114                                                 print 'Missing quote: %s' %item_txt
    115                                                 sys.exit( 1 )
    116 
    117                                         else:
    118 
    119                                                 sep_char = s_char
    120                                                 break
    121 
    122                         if sep_char:
    123 
    124                                 item_txt = item_txt.split( sep_char )[1]
    125 
    126                         my_list.append( item_txt )
    127 
    128                 return my_list
    129 
    130         cfg = ConfigParser.ConfigParser()
    131 
    132         cfg.read( filename )
    133 
    134         global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
    135         global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
    136         global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
    137 
    138         ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
    139 
    140         ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
    141 
    142         DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
    143 
    144         USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
    145 
    146         SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
    147 
    148         MODRRDTOOL              = False
    149 
    150         try:
    151                 global rrdtool
    152                 import rrdtool
    153 
    154                 MODRRDTOOL              = True
    155 
    156         except ImportError:
    157 
    158                 MODRRDTOOL              = False
    159 
    160                 print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
    161 
    162                 RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
    163 
    164         try:
    165 
    166                 SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
    167 
    168         except AttributeError, detail:
    169 
    170                 print 'Unknown syslog facility'
    171                 sys.exit( 1 )
    172 
    173         GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
    174 
    175         ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
    176 
    177         ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
    178 
    179         ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
    180 
    181         JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
    182 
    183         JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
    184 
    185         DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
    186 
    187 
    188         return True
     98    def getlist( cfg_string ):
     99
     100        my_list = [ ]
     101
     102        for item_txt in cfg_string.split( ',' ):
     103
     104            sep_char = None
     105
     106            item_txt = item_txt.strip()
     107
     108            for s_char in [ "'", '"' ]:
     109
     110                if item_txt.find( s_char ) != -1:
     111
     112                    if item_txt.count( s_char ) != 2:
     113
     114                        print 'Missing quote: %s' %item_txt
     115                        sys.exit( 1 )
     116
     117                    else:
     118
     119                        sep_char = s_char
     120                        break
     121
     122            if sep_char:
     123
     124                item_txt = item_txt.split( sep_char )[1]
     125
     126            my_list.append( item_txt )
     127
     128        return my_list
     129
     130    cfg = ConfigParser.ConfigParser()
     131
     132    cfg.read( filename )
     133
     134    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
     135    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
     136    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
     137
     138    ARCHIVE_PATH        = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
     139
     140    ARCHIVE_HOURS_PER_RRD    = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
     141
     142    DEBUG_LEVEL        = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
     143
     144    USE_SYSLOG        = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
     145
     146    SYSLOG_LEVEL        = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
     147
     148    MODRRDTOOL        = False
     149
     150    try:
     151        global rrdtool
     152        import rrdtool
     153
     154        MODRRDTOOL        = True
     155
     156    except ImportError:
     157
     158        MODRRDTOOL        = False
     159
     160        print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
     161
     162        RRDTOOL            = cfg.get( 'DEFAULT', 'RRDTOOL' )
     163
     164    try:
     165
     166        SYSLOG_FACILITY    = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
     167
     168    except AttributeError, detail:
     169
     170        print 'Unknown syslog facility'
     171        sys.exit( 1 )
     172
     173    GMETAD_CONF        = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
     174
     175    ARCHIVE_XMLSOURCE    = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
     176
     177        ARCHIVE_DATASOURCES    = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
     178
     179    ARCHIVE_EXCLUDE_METRICS    = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
     180
     181    JOB_SQL_DBASE        = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
     182
     183    JOB_TIMEOUT        = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
     184
     185    DAEMONIZE        = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
     186
     187
     188    return True
    189189
    190190# What XML data types not to store
     
    209209
    210210try:
    211         from pyPgSQL import PgSQL
     211    from pyPgSQL import PgSQL
    212212
    213213except ImportError, details:
    214214
    215         print "FATAL ERROR: pyPgSQL python module not found"
    216         sys.exit( 1 )
     215    print "FATAL ERROR: pyPgSQL python module not found"
     216    sys.exit( 1 )
    217217
    218218# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
     
    354354class DataSQLStore:
    355355
    356         db_vars = None
    357         dbc = None
    358 
    359         def __init__( self, hostname, database ):
    360 
    361                 self.db_vars = InitVars(DataBaseName=database,
    362                                 User='root',
    363                                 Host=hostname,
    364                                 Password='',
    365                                 Dictionary='true')
    366 
    367                 try:
    368                         self.dbc     = DB(self.db_vars)
    369                 except DBError, details:
    370                         debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
    371                         sys.exit(1)
    372 
    373         def setDatabase(self, statement):
    374                 ret = self.doDatabase('set', statement)
    375                 return ret
    376                
    377         def getDatabase(self, statement):
    378                 ret = self.doDatabase('get', statement)
    379                 return ret
    380 
    381         def doDatabase(self, type, statement):
    382 
    383                 debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
    384                 try:
    385                         if type == 'set':
    386                                 result = self.dbc.Set( statement )
    387                                 self.dbc.Commit()
    388                         elif type == 'get':
    389                                 result = self.dbc.Get( statement )
    390                                
    391                 except DBError, detail:
    392                         operation = statement.split(' ')[0]
    393                         debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
    394                         sys.exit(1)
    395 
    396                 debug_msg( 10, 'doDatabase(): result: %s' %(result) )
    397                 return result
    398 
    399         def getJobNodeId( self, job_id, node_id ):
    400 
    401                 id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
    402                 if len( id ) > 0:
    403 
    404                         if len( id[0] ) > 0 and id[0] != '':
    405                        
    406                                 return 1
    407 
    408                 return 0
    409 
    410         def getNodeId( self, hostname ):
    411 
    412                 id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
    413 
    414                 if len( id ) > 0:
    415 
    416                         id = id[0][0]
    417 
    418                         return id
    419                 else:
    420                         return None
    421 
    422         def getNodeIds( self, hostnames ):
    423 
    424                 ids = [ ]
    425 
    426                 for node in hostnames:
    427 
    428                         id = self.getNodeId( node )
    429 
    430                         if id:
    431                                 ids.append( id )
    432 
    433                 return ids
    434 
    435         def getJobId( self, jobid ):
    436 
    437                 id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
    438 
    439                 if id:
    440                         id = id[0][0]
    441 
    442                         return id
    443                 else:
    444                         return None
    445 
    446         def addJob( self, job_id, jobattrs ):
    447 
    448                 if not self.getJobId( job_id ):
    449 
    450                         self.mutateJob( 'insert', job_id, jobattrs )
    451                 else:
    452                         self.mutateJob( 'update', job_id, jobattrs )
    453 
    454         def mutateJob( self, action, job_id, jobattrs ):
    455 
    456                 job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
    457 
    458                 insert_col_str  = 'job_id'
    459                 insert_val_str  = "'%s'" %job_id
    460                 update_str      = None
    461 
    462                 debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
    463 
    464                 ids = [ ]
    465 
    466                 for valname, value in jobattrs.items():
    467 
    468                         if valname in job_values and value != '':
    469 
    470                                 column_name = 'job_' + valname
    471 
    472                                 if action == 'insert':
    473 
    474                                         if not insert_col_str:
    475                                                 insert_col_str = column_name
    476                                         else:
    477                                                 insert_col_str = insert_col_str + ',' + column_name
    478 
    479                                         if not insert_val_str:
    480                                                 insert_val_str = value
    481                                         else:
    482                                                 insert_val_str = insert_val_str + ",'%s'" %value
    483 
    484                                 elif action == 'update':
    485                                        
    486                                         if not update_str:
    487                                                 update_str = "%s='%s'" %(column_name, value)
    488                                         else:
    489                                                 update_str = update_str + ",%s='%s'" %(column_name, value)
    490 
    491                         elif valname == 'nodes' and value:
    492 
    493                                 node_valid = 1
    494 
    495                                 if len(value) == 1:
    496                                
    497                                         if jobattrs['status'] == 'Q':
    498 
    499                                                 node_valid = 0
    500 
    501                                         else:
    502 
    503                                                 node_valid = 0
    504 
    505                                                 for node_char in str(value[0]):
    506 
    507                                                         if string.find( string.digits, node_char ) != -1 and not node_valid:
    508 
    509                                                                 node_valid = 1
    510 
    511                                 if node_valid:
    512 
    513                                         ids = self.addNodes( value, jobattrs['domain'] )
    514 
    515                 if action == 'insert':
    516 
    517                         self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
    518 
    519                 elif action == 'update':
    520 
    521                         self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
    522 
    523                 if len( ids ) > 0:
    524                         self.addJobNodes( job_id, ids )
    525 
    526         def addNodes( self, hostnames, domain ):
    527 
    528                 ids = [ ]
    529 
    530                 for node in hostnames:
    531 
    532                         node    = '%s.%s' %( node, domain )
    533                         id      = self.getNodeId( node )
    534        
    535                         if not id:
    536                                 self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
    537                                 id = self.getNodeId( node )
    538 
    539                         ids.append( id )
    540 
    541                 return ids
    542 
    543         def addJobNodes( self, jobid, nodes ):
    544 
    545                 for node in nodes:
    546 
    547                         if not self.getJobNodeId( jobid, node ):
    548 
    549                                 self.addJobNode( jobid, node )
    550 
    551         def addJobNode( self, jobid, nodeid ):
    552 
    553                 self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
    554 
    555         def storeJobInfo( self, jobid, jobattrs ):
    556 
    557                 self.addJob( jobid, jobattrs )
    558 
    559         def checkStaleJobs( self ):
    560 
    561                 # Locate all jobs in the database that are not set to finished
    562                 #
    563                 q = "SELECT * from jobs WHERE job_status != 'F'"
    564 
    565                 r = self.getDatabase( q )
    566 
    567                 if len( r ) == 0:
    568 
    569                         return None
    570 
    571                 cleanjobs       = [ ]
    572                 timeoutjobs     = [ ]
    573 
    574                 jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
    575                 cur_time        = time.time()
    576 
    577                 for row in r:
    578 
    579                         job_id                  = row[0]
    580                         job_requested_time      = row[4]
    581                         job_status              = row[7]
    582                         job_start_timestamp     = row[8]
    583 
    584                         # If it was set to queued and we didn't see it started
    585                         # there's not point in keeping it around
    586                         #
    587                         if job_status == 'Q' or not job_start_timestamp:
    588 
    589                                 cleanjobs.append( job_id )
    590 
    591                         else:
    592 
    593                                 start_timestamp = int( job_start_timestamp )
    594 
    595                                 # If it was set to running longer than JOB_TIMEOUT
    596                                 # close the job: it probably finished while we were not running
    597                                 #
    598                                 if ( cur_time - start_timestamp ) > jobtimeout_sec:
    599 
    600                                         if job_requested_time:
    601 
    602                                                 rtime_epoch     = reqtime2epoch( job_requested_time )
    603                                         else:
    604                                                 rtime_epoch     = None
    605                                        
    606                                         timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
    607 
    608                 debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
    609 
    610                 # Purge these from database
    611                 #
    612                 for j in cleanjobs:
    613 
    614                         q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
    615                         self.setDatabase( q )
    616 
    617                 debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
    618 
    619                 # Close these jobs in the database
    620                 # update the stop_timestamp to: start_timestamp + requested wallclock
    621                 # and set state: finished
    622                 #
    623                 for j in timeoutjobs:
    624 
    625                         ( i, s, r )             = j
    626 
    627                         if r:
    628                                 new_end_timestamp       = int( s ) + r
    629 
    630                         q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
    631                         self.setDatabase( q )
     356    db_vars = None
     357    dbc = None
     358
     359    def __init__( self, hostname, database ):
     360
     361        self.db_vars = InitVars(DataBaseName=database,
     362                User='root',
     363                Host=hostname,
     364                Password='',
     365                Dictionary='true')
     366
     367        try:
     368            self.dbc     = DB(self.db_vars)
     369        except DBError, details:
     370            debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
     371            sys.exit(1)
     372
     373    def setDatabase(self, statement):
     374        ret = self.doDatabase('set', statement)
     375        return ret
     376       
     377    def getDatabase(self, statement):
     378        ret = self.doDatabase('get', statement)
     379        return ret
     380
     381    def doDatabase(self, type, statement):
     382
     383        debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
     384        try:
     385            if type == 'set':
     386                result = self.dbc.Set( statement )
     387                self.dbc.Commit()
     388            elif type == 'get':
     389                result = self.dbc.Get( statement )
     390               
     391        except DBError, detail:
     392            operation = statement.split(' ')[0]
     393            debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
     394            sys.exit(1)
     395
     396        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
     397        return result
     398
     399    def getJobNodeId( self, job_id, node_id ):
     400
     401        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
     402        if len( id ) > 0:
     403
     404            if len( id[0] ) > 0 and id[0] != '':
     405           
     406                return 1
     407
     408        return 0
     409
     410    def getNodeId( self, hostname ):
     411
     412        id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
     413
     414        if len( id ) > 0:
     415
     416            id = id[0][0]
     417
     418            return id
     419        else:
     420            return None
     421
     422    def getNodeIds( self, hostnames ):
     423
     424        ids = [ ]
     425
     426        for node in hostnames:
     427
     428            id = self.getNodeId( node )
     429
     430            if id:
     431                ids.append( id )
     432
     433        return ids
     434
     435    def getJobId( self, jobid ):
     436
     437        id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
     438
     439        if id:
     440            id = id[0][0]
     441
     442            return id
     443        else:
     444            return None
     445
     446    def addJob( self, job_id, jobattrs ):
     447
     448        if not self.getJobId( job_id ):
     449
     450            self.mutateJob( 'insert', job_id, jobattrs )
     451        else:
     452            self.mutateJob( 'update', job_id, jobattrs )
     453
     454    def mutateJob( self, action, job_id, jobattrs ):
     455
     456        job_values    = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
     457
     458        insert_col_str    = 'job_id'
     459        insert_val_str    = "'%s'" %job_id
     460        update_str    = None
     461
     462        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
     463
     464        ids = [ ]
     465
     466        for valname, value in jobattrs.items():
     467
     468            if valname in job_values and value != '':
     469
     470                column_name = 'job_' + valname
     471
     472                if action == 'insert':
     473
     474                    if not insert_col_str:
     475                        insert_col_str = column_name
     476                    else:
     477                        insert_col_str = insert_col_str + ',' + column_name
     478
     479                    if not insert_val_str:
     480                        insert_val_str = value
     481                    else:
     482                        insert_val_str = insert_val_str + ",'%s'" %value
     483
     484                elif action == 'update':
     485                   
     486                    if not update_str:
     487                        update_str = "%s='%s'" %(column_name, value)
     488                    else:
     489                        update_str = update_str + ",%s='%s'" %(column_name, value)
     490
     491            elif valname == 'nodes' and value:
     492
     493                node_valid = 1
     494
     495                if len(value) == 1:
     496               
     497                    if jobattrs['status'] == 'Q':
     498
     499                        node_valid = 0
     500
     501                    else:
     502
     503                        node_valid = 0
     504
     505                        for node_char in str(value[0]):
     506
     507                            if string.find( string.digits, node_char ) != -1 and not node_valid:
     508
     509                                node_valid = 1
     510
     511                if node_valid:
     512
     513                    ids = self.addNodes( value, jobattrs['domain'] )
     514
     515        if action == 'insert':
     516
     517            self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
     518
     519        elif action == 'update':
     520
     521            self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
     522
     523        if len( ids ) > 0:
     524            self.addJobNodes( job_id, ids )
     525
     526    def addNodes( self, hostnames, domain ):
     527
     528        ids = [ ]
     529
     530        for node in hostnames:
     531
     532            node    = '%s.%s' %( node, domain )
     533            id    = self.getNodeId( node )
     534   
     535            if not id:
     536                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
     537                id = self.getNodeId( node )
     538
     539            ids.append( id )
     540
     541        return ids
     542
     543    def addJobNodes( self, jobid, nodes ):
     544
     545        for node in nodes:
     546
     547            if not self.getJobNodeId( jobid, node ):
     548
     549                self.addJobNode( jobid, node )
     550
     551    def addJobNode( self, jobid, nodeid ):
     552
     553        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
     554
     555    def storeJobInfo( self, jobid, jobattrs ):
     556
     557        self.addJob( jobid, jobattrs )
     558
     559    def checkStaleJobs( self ):
     560
     561        # Locate all jobs in the database that are not set to finished
     562        #
     563        q = "SELECT * from jobs WHERE job_status != 'F'"
     564
     565        r = self.getDatabase( q )
     566
     567        if len( r ) == 0:
     568
     569            return None
     570
     571        cleanjobs    = [ ]
     572        timeoutjobs    = [ ]
     573
     574        jobtimeout_sec    = JOB_TIMEOUT * (60 * 60)
     575        cur_time    = time.time()
     576
     577        for row in r:
     578
     579            job_id            = row[0]
     580            job_requested_time    = row[4]
     581            job_status        = row[7]
     582            job_start_timestamp    = row[8]
     583
     584            # If it was set to queued and we didn't see it started
     585            # there's not point in keeping it around
     586            #
     587            if job_status == 'Q' or not job_start_timestamp:
     588
     589                cleanjobs.append( job_id )
     590
     591            else:
     592
     593                start_timestamp = int( job_start_timestamp )
     594
     595                # If it was set to running longer than JOB_TIMEOUT
     596                # close the job: it probably finished while we were not running
     597                #
     598                if ( cur_time - start_timestamp ) > jobtimeout_sec:
     599
     600                    if job_requested_time:
     601
     602                        rtime_epoch    = reqtime2epoch( job_requested_time )
     603                    else:
     604                        rtime_epoch    = None
     605                   
     606                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
     607
     608        debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
     609
     610        # Purge these from database
     611        #
     612        for j in cleanjobs:
     613
     614            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
     615            self.setDatabase( q )
     616
     617        debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
     618
     619        # Close these jobs in the database
     620        # update the stop_timestamp to: start_timestamp + requested wallclock
     621        # and set state: finished
     622        #
     623        for j in timeoutjobs:
     624
     625            ( i, s, r )        = j
     626
     627            if r:
     628                new_end_timestamp    = int( s ) + r
     629
     630            q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
     631            self.setDatabase( q )
    632632
    633633class RRDMutator:
    634         """A class for performing RRD mutations"""
    635 
    636         binary = None
    637 
    638         def __init__( self, binary=None ):
    639                 """Set alternate binary if supplied"""
    640 
    641                 if binary:
    642                         self.binary = binary
    643 
    644         def create( self, filename, args ):
    645                 """Create a new rrd with args"""
    646 
    647                 global MODRRDTOOL
    648 
    649                 if MODRRDTOOL:
    650                         return self.perform( 'create', filename, args )
    651                 else:
    652                         return self.perform( 'create', '"' + filename + '"', args )
    653 
    654         def update( self, filename, args ):
    655                 """Update a rrd with args"""
    656 
    657                 global MODRRDTOOL
    658 
    659                 if MODRRDTOOL:
    660                         return self.perform( 'update', filename, args )
    661                 else:
    662                         return self.perform( 'update', '"' + filename + '"', args )
    663 
    664         def grabLastUpdate( self, filename ):
    665                 """Determine the last update time of filename rrd"""
    666 
    667                 global MODRRDTOOL
    668 
    669                 last_update = 0
    670 
    671                 # Use the py-rrdtool module if it's available on this system
    672                 #
    673                 if MODRRDTOOL:
    674 
    675                         debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
    676 
    677                         rrd_header      = { }
    678 
    679                         try:
    680                                 rrd_header      = rrdtool.info( filename )
    681                         except rrdtool.error, msg:
    682                                 debug_msg( 8, str( msg ) )
    683 
    684                         if rrd_header.has_key( 'last_update' ):
    685                                 return last_update
    686                         else:
    687                                 return 0
    688 
    689                 # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
    690                 # DEPRECATED (slow!)
    691                 #
    692                 else:
    693                         debug_msg( 8, self.binary + ' info ' + filename )
    694 
    695                         my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
    696 
    697                         for line in my_pipe.readlines():
    698 
    699                                 if line.find( 'last_update') != -1:
    700 
    701                                         last_update = line.split( ' = ' )[1]
    702 
    703                         if my_pipe:
    704 
    705                                 my_pipe.close()
    706 
    707                         if last_update:
    708                                 return last_update
    709                         else:
    710                                 return 0
    711 
    712 
    713         def perform( self, action, filename, args ):
    714                 """Perform action on rrd filename with args"""
    715 
    716                 global MODRRDTOOL
    717 
    718                 arg_string = None
    719 
    720                 if type( args ) is not ListType:
    721                         debug_msg( 8, 'Arguments needs to be of type List' )
    722                         return 1
    723 
    724                 for arg in args:
    725 
    726                         if not arg_string:
    727 
    728                                 arg_string = arg
    729                         else:
    730                                 arg_string = arg_string + ' ' + arg
    731 
    732                 if MODRRDTOOL:
    733 
    734                         debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
    735 
    736                         try:
    737                                 debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
    738 
    739                                 if action == 'create':
    740 
    741                                         rrdtool.create( str( filename ), *args )
    742 
    743                                 elif action == 'update':
    744 
    745                                         rrdtool.update( str( filename ), *args )
    746 
    747                         except rrdtool.error, msg:
    748 
    749                                 error_msg = str( msg )
    750                                 debug_msg( 8, error_msg )
    751                                 return 1
    752 
    753                 else:
    754 
    755                         debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
    756 
    757                         cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
    758                         lines   = cmd.readlines()
    759 
    760                         cmd.close()
    761 
    762                         for line in lines:
    763 
    764                                 if line.find( 'ERROR' ) != -1:
    765 
    766                                         error_msg = string.join( line.split( ' ' )[1:] )
    767                                         debug_msg( 8, error_msg )
    768                                         return 1
    769 
    770                 return 0
     634    """A class for performing RRD mutations"""
     635
     636    binary = None
     637
     638    def __init__( self, binary=None ):
     639        """Set alternate binary if supplied"""
     640
     641        if binary:
     642            self.binary = binary
     643
     644    def create( self, filename, args ):
     645        """Create a new rrd with args"""
     646
     647        global MODRRDTOOL
     648
     649        if MODRRDTOOL:
     650            return self.perform( 'create', filename, args )
     651        else:
     652            return self.perform( 'create', '"' + filename + '"', args )
     653
     654    def update( self, filename, args ):
     655        """Update a rrd with args"""
     656
     657        global MODRRDTOOL
     658
     659        if MODRRDTOOL:
     660            return self.perform( 'update', filename, args )
     661        else:
     662            return self.perform( 'update', '"' + filename + '"', args )
     663
     664    def grabLastUpdate( self, filename ):
     665        """Determine the last update time of filename rrd"""
     666
     667        global MODRRDTOOL
     668
     669        last_update = 0
     670
     671        # Use the py-rrdtool module if it's available on this system
     672        #
     673        if MODRRDTOOL:
     674
     675            debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
     676
     677            rrd_header     = { }
     678
     679            try:
     680                rrd_header    = rrdtool.info( filename )
     681            except rrdtool.error, msg:
     682                debug_msg( 8, str( msg ) )
     683
     684            if rrd_header.has_key( 'last_update' ):
     685                return last_update
     686            else:
     687                return 0
     688
     689        # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
     690        # DEPRECATED (slow!)
     691        #
     692        else:
     693            debug_msg( 8, self.binary + ' info ' + filename )
     694
     695            my_pipe        = os.popen( self.binary + ' info "' + filename + '"' )
     696
     697            for line in my_pipe.readlines():
     698
     699                if line.find( 'last_update') != -1:
     700
     701                    last_update = line.split( ' = ' )[1]
     702
     703            if my_pipe:
     704
     705                my_pipe.close()
     706
     707            if last_update:
     708                return last_update
     709            else:
     710                return 0
     711
     712
     713    def perform( self, action, filename, args ):
     714        """Perform action on rrd filename with args"""
     715
     716        global MODRRDTOOL
     717
     718        arg_string = None
     719
     720        if type( args ) is not ListType:
     721            debug_msg( 8, 'Arguments needs to be of type List' )
     722            return 1
     723
     724        for arg in args:
     725
     726            if not arg_string:
     727
     728                arg_string = arg
     729            else:
     730                arg_string = arg_string + ' ' + arg
     731
     732        if MODRRDTOOL:
     733
     734            debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
     735
     736            try:
     737                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
     738
     739                if action == 'create':
     740
     741                    rrdtool.create( str( filename ), *args )
     742
     743                elif action == 'update':
     744
     745                    rrdtool.update( str( filename ), *args )
     746
     747            except rrdtool.error, msg:
     748
     749                error_msg = str( msg )
     750                debug_msg( 8, error_msg )
     751                return 1
     752
     753        else:
     754
     755            debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
     756
     757            cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
     758            lines   = cmd.readlines()
     759
     760            cmd.close()
     761
     762            for line in lines:
     763
     764                if line.find( 'ERROR' ) != -1:
     765
     766                    error_msg = string.join( line.split( ' ' )[1:] )
     767                    debug_msg( 8, error_msg )
     768                    return 1
     769
     770        return 0
    771771
    772772class XMLProcessor:
    773         """Skeleton class for XML processor's"""
    774 
    775         def run( self ):
    776                 """Do main processing of XML here"""
    777 
    778                 pass
     773    """Skeleton class for XML processor's"""
     774
     775    def run( self ):
     776        """Do main processing of XML here"""
     777
     778        pass
    779779
    780780class TorqueXMLProcessor( XMLProcessor ):
    781         """Main class for processing XML and acting with it"""
    782 
    783         def __init__( self, XMLSource, DataStore ):
    784                 """Setup initial XML connection and handlers"""
    785 
    786                 self.myXMLSource        = XMLSource
    787                 self.myXMLHandler       = TorqueXMLHandler( DataStore )
    788                 self.myXMLError         = XMLErrorHandler()
    789 
    790                 self.config             = GangliaConfigParser( GMETAD_CONF )
    791 
    792         def run( self ):
    793                 """Main XML processing"""
    794 
    795                 debug_msg( 1, 'torque_xml_thread(): started.' )
    796 
    797                 while( 1 ):
    798 
    799                         #self.myXMLSource = self.mXMLGatherer.getFileObject()
    800                         debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' )
    801 
    802                         my_data = self.myXMLSource.getData()
    803 
    804                         debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
    805 
    806                         if my_data:
    807                                 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
    808 
    809                                 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    810 
    811                                 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
    812                                
    813                         debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
    814                         time.sleep( self.config.getLowestInterval() )
     781    """Main class for processing XML and acting with it"""
     782
     783    def __init__( self, XMLSource, DataStore ):
     784        """Setup initial XML connection and handlers"""
     785
     786        self.myXMLSource    = XMLSource
     787        self.myXMLHandler    = TorqueXMLHandler( DataStore )
     788        self.myXMLError        = XMLErrorHandler()
     789
     790        self.config        = GangliaConfigParser( GMETAD_CONF )
     791
     792    def run( self ):
     793        """Main XML processing"""
     794
     795        debug_msg( 1, 'torque_xml_thread(): started.' )
     796
     797        while( 1 ):
     798
     799            #self.myXMLSource = self.mXMLGatherer.getFileObject()
     800            debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' )
     801
     802            my_data    = self.myXMLSource.getData()
     803
     804            debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
     805
     806            if my_data:
     807                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
     808
     809                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
     810
     811                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
     812               
     813            debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
     814            time.sleep( self.config.getLowestInterval() )
    815815
    816816class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
    817         """Parse Torque's jobinfo XML from our plugin"""
    818 
    819         jobAttrs = { }
    820 
    821         def __init__( self, datastore ):
    822 
    823                 #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
    824                 self.ds                 = datastore
    825                 self.jobs_processed     = [ ]
    826                 self.jobs_to_store      = [ ]
    827 
    828         def startDocument( self ):
    829 
    830                 self.heartbeat  = 0
    831                 self.elementct  = 0
    832 
    833         def startElement( self, name, attrs ):
    834                 """
    835                 This XML will be all gmetric XML
    836                 so there will be no specific start/end element
    837                 just one XML statement with all info
    838                 """
    839                
    840                 jobinfo = { }
    841 
    842                 self.elementct  += 1
    843 
    844                 if name == 'CLUSTER':
    845 
    846                         self.clustername = str( attrs.get( 'NAME', "" ) )
    847 
    848                 elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
    849 
    850                         metricname = str( attrs.get( 'NAME', "" ) )
    851 
    852                         if metricname == 'MONARCH-HEARTBEAT':
    853                                 self.heartbeat = str( attrs.get( 'VAL', "" ) )
    854 
    855                         elif metricname.find( 'MONARCH-JOB' ) != -1:
    856 
    857                                 job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
    858                                 val     = str( attrs.get( 'VAL', "" ) )
    859 
    860                                 if not job_id in self.jobs_processed:
    861 
    862                                         self.jobs_processed.append( job_id )
    863 
    864                                 check_change = 0
    865 
    866                                 if self.jobAttrs.has_key( job_id ):
    867 
    868                                         check_change = 1
    869 
    870                                 valinfo = val.split( ' ' )
    871 
    872                                 for myval in valinfo:
    873 
    874                                         if len( myval.split( '=' ) ) > 1:
    875 
    876                                                 valname = myval.split( '=' )[0]
    877                                                 value   = myval.split( '=' )[1]
    878 
    879                                                 if valname == 'nodes':
    880                                                         value = value.split( ';' )
    881 
    882                                                 jobinfo[ valname ] = value
    883 
    884                                 if check_change:
    885                                         if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
    886                                                 self.jobAttrs[ job_id ]['stop_timestamp']       = ''
    887                                                 self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
    888                                                 if not job_id in self.jobs_to_store:
    889                                                         self.jobs_to_store.append( job_id )
    890 
    891                                                 debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
    892                                 else:
    893                                         self.jobAttrs[ job_id ] = jobinfo
    894 
    895                                         if not job_id in self.jobs_to_store:
    896                                                 self.jobs_to_store.append( job_id )
    897 
    898                                         debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
    899                                        
    900         def endDocument( self ):
    901                 """When all metrics have gone, check if any jobs have finished"""
    902 
    903                 debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
    904 
    905                 if self.heartbeat:
    906                         for jobid, jobinfo in self.jobAttrs.items():
    907 
    908                                 # This is an old job, not in current jobinfo list anymore
    909                                 # it must have finished, since we _did_ get a new heartbeat
    910                                 #
    911                                 mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
    912 
    913                                 if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
    914 
    915                                         if not jobid in self.jobs_processed:
    916                                                 self.jobs_processed.append( jobid )
    917 
    918                                         self.jobAttrs[ jobid ]['status'] = 'F'
    919                                         self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
    920 
    921                                         if not jobid in self.jobs_to_store:
    922                                                 self.jobs_to_store.append( jobid )
    923 
    924                         debug_msg( 1, 'torque_xml_thread(): Storing..' )
    925 
    926                         for jobid in self.jobs_to_store:
    927                                 if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
    928 
    929                                         self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
    930 
    931                                         if self.jobAttrs[ jobid ]['status'] == 'F':
    932                                                 del self.jobAttrs[ jobid ]
    933 
    934                         debug_msg( 1, 'torque_xml_thread(): Done storing.' )
    935 
    936                         self.jobs_processed     = [ ]
    937                         self.jobs_to_store      = [ ]
    938 
    939         def setJobAttrs( self, old, new ):
    940                 """
    941                 Set new job attributes in old, but not lose existing fields
    942                 if old attributes doesn't have those
    943                 """
    944 
    945                 for valname, value in new.items():
    946                         old[ valname ] = value
    947 
    948                 return old
    949                
    950 
    951         def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
    952                 """
    953                 Check if jobinfo has changed from jobattrs[jobid]
    954                 if it's report time is bigger than previous one
    955                 and it is report time is recent (equal to heartbeat)
    956                 """
    957 
    958                 ignore_changes = [ 'reported' ]
    959 
    960                 if jobattrs.has_key( jobid ):
    961 
    962                         for valname, value in jobinfo.items():
    963 
    964                                 if valname not in ignore_changes:
    965 
    966                                         if jobattrs[ jobid ].has_key( valname ):
    967 
    968                                                 if value != jobattrs[ jobid ][ valname ]:
    969 
    970                                                         if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
    971                                                                 return True
    972 
    973                                         else:
    974                                                 return True
    975 
    976                 return False
     817    """Parse Torque's jobinfo XML from our plugin"""
     818
     819    jobAttrs = { }
     820
     821    def __init__( self, datastore ):
     822
     823        #self.ds            = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
     824        self.ds            = datastore
     825        self.jobs_processed    = [ ]
     826        self.jobs_to_store    = [ ]
     827
     828    def startDocument( self ):
     829
     830        self.heartbeat    = 0
     831        self.elementct    = 0
     832
     833    def startElement( self, name, attrs ):
     834        """
     835        This XML will be all gmetric XML
     836        so there will be no specific start/end element
     837        just one XML statement with all info
     838        """
     839       
     840        jobinfo = { }
     841
     842        self.elementct    += 1
     843
     844        if name == 'CLUSTER':
     845
     846            self.clustername = str( attrs.get( 'NAME', "" ) )
     847
     848        elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
     849
     850            metricname = str( attrs.get( 'NAME', "" ) )
     851
     852            if metricname == 'MONARCH-HEARTBEAT':
     853                self.heartbeat = str( attrs.get( 'VAL', "" ) )
     854
     855            elif metricname.find( 'MONARCH-JOB' ) != -1:
     856
     857                job_id    = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
     858                val    = str( attrs.get( 'VAL', "" ) )
     859
     860                if not job_id in self.jobs_processed:
     861
     862                    self.jobs_processed.append( job_id )
     863
     864                check_change = 0
     865
     866                if self.jobAttrs.has_key( job_id ):
     867
     868                    check_change = 1
     869
     870                valinfo = val.split( ' ' )
     871
     872                for myval in valinfo:
     873
     874                    if len( myval.split( '=' ) ) > 1:
     875
     876                        valname    = myval.split( '=' )[0]
     877                        value    = myval.split( '=' )[1]
     878
     879                        if valname == 'nodes':
     880                            value = value.split( ';' )
     881
     882                        jobinfo[ valname ] = value
     883
     884                if check_change:
     885                    if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
     886                        self.jobAttrs[ job_id ]['stop_timestamp']    = ''
     887                        self.jobAttrs[ job_id ]                = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
     888                        if not job_id in self.jobs_to_store:
     889                            self.jobs_to_store.append( job_id )
     890
     891                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
     892                else:
     893                    self.jobAttrs[ job_id ] = jobinfo
     894
     895                    if not job_id in self.jobs_to_store:
     896                        self.jobs_to_store.append( job_id )
     897
     898                    debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
     899                   
     900    def endDocument( self ):
     901        """When all metrics have gone, check if any jobs have finished"""
     902
     903        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
     904
     905        if self.heartbeat:
     906            for jobid, jobinfo in self.jobAttrs.items():
     907
     908                # This is an old job, not in current jobinfo list anymore
     909                # it must have finished, since we _did_ get a new heartbeat
     910                #
     911                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
     912
     913                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
     914
     915                    if not jobid in self.jobs_processed:
     916                        self.jobs_processed.append( jobid )
     917
     918                    self.jobAttrs[ jobid ]['status'] = 'F'
     919                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
     920
     921                    if not jobid in self.jobs_to_store:
     922                        self.jobs_to_store.append( jobid )
     923
     924            debug_msg( 1, 'torque_xml_thread(): Storing..' )
     925
     926            for jobid in self.jobs_to_store:
     927                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
     928
     929                    self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
     930
     931                    if self.jobAttrs[ jobid ]['status'] == 'F':
     932                        del self.jobAttrs[ jobid ]
     933
     934            debug_msg( 1, 'torque_xml_thread(): Done storing.' )
     935
     936            self.jobs_processed    = [ ]
     937            self.jobs_to_store    = [ ]
     938
     939    def setJobAttrs( self, old, new ):
     940        """
     941        Set new job attributes in old, but not lose existing fields
     942        if old attributes doesn't have those
     943        """
     944
     945        for valname, value in new.items():
     946            old[ valname ] = value
     947
     948        return old
     949       
     950
     951    def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
     952        """
     953        Check if jobinfo has changed from jobattrs[jobid]
     954        if it's report time is bigger than previous one
     955        and it is report time is recent (equal to heartbeat)
     956        """
     957
     958        ignore_changes = [ 'reported' ]
     959
     960        if jobattrs.has_key( jobid ):
     961
     962            for valname, value in jobinfo.items():
     963
     964                if valname not in ignore_changes:
     965
     966                    if jobattrs[ jobid ].has_key( valname ):
     967
     968                        if value != jobattrs[ jobid ][ valname ]:
     969
     970                            if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
     971                                return True
     972
     973                    else:
     974                        return True
     975
     976        return False
    977977
    978978class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
    979         """Parse Ganglia's XML"""
    980 
    981         def __init__( self, config, datastore ):
    982                 """Setup initial variables and gather info on existing rrd archive"""
    983 
    984                 self.config     = config
    985                 self.clusters   = { }
    986                 self.ds         = datastore
    987 
    988                 debug_msg( 1, 'Checking database..' )
    989 
    990                 global DEBUG_LEVEL
    991 
    992                 if DEBUG_LEVEL <= 2:
    993                         self.ds.checkStaleJobs()
    994 
    995                 debug_msg( 1, 'Check done.' )
    996                 debug_msg( 1, 'Checking rrd archive..' )
    997                 self.gatherClusters()
    998                 debug_msg( 1, 'Check done.' )
    999 
    1000         def gatherClusters( self ):
    1001                 """Find all existing clusters in archive dir"""
    1002 
    1003                 archive_dir     = check_dir(ARCHIVE_PATH)
    1004 
    1005                 hosts           = [ ]
    1006 
    1007                 if os.path.exists( archive_dir ):
    1008 
    1009                         dirlist = os.listdir( archive_dir )
    1010 
    1011                         for cfgcluster in ARCHIVE_DATASOURCES:
    1012 
    1013                                 if cfgcluster not in dirlist:
    1014 
    1015                                         # Autocreate a directory for this cluster
    1016                                         # assume it is new
    1017                                         #
    1018                                         cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
    1019 
    1020                                         os.mkdir( cluster_dir )
    1021 
    1022                                         dirlist.append( cfgcluster )
    1023 
    1024                         for item in dirlist:
    1025 
    1026                                 clustername = item
    1027 
    1028                                 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
    1029 
    1030                                         self.clusters[ clustername ] = RRDHandler( self.config, clustername )
    1031 
    1032                 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
    1033 
    1034         def startElement( self, name, attrs ):
    1035                 """Memorize appropriate data from xml start tags"""
    1036 
    1037                 if name == 'GANGLIA_XML':
    1038 
    1039                         self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
    1040                         self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
    1041 
    1042                         debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
    1043 
    1044                 elif name == 'GRID':
    1045 
    1046                         self.gridName   = str( attrs.get( 'NAME', "" ) )
    1047                         self.time       = str( attrs.get( 'LOCALTIME', "" ) )
    1048 
    1049                         debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
    1050 
    1051                 elif name == 'CLUSTER':
    1052 
    1053                         self.clusterName        = str( attrs.get( 'NAME', "" ) )
    1054                         self.time               = str( attrs.get( 'LOCALTIME', "" ) )
    1055 
    1056                         if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
    1057 
    1058                                 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
    1059 
    1060                                 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
    1061 
    1062                 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
    1063 
    1064                         self.hostName           = str( attrs.get( 'NAME', "" ) )
    1065                         self.hostIp             = str( attrs.get( 'IP', "" ) )
    1066                         self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
    1067 
    1068                         debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
    1069 
    1070                 elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
    1071 
    1072                         type = str( attrs.get( 'TYPE', "" ) )
    1073                        
    1074                         exclude_metric = False
    1075                        
    1076                         for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
    1077 
    1078                                 orig_name = str( attrs.get( 'NAME', "" ) )
    1079 
    1080                                 if string.lower( orig_name ) == string.lower( ex_metricstr ):
    1081                                
    1082                                         exclude_metric = True
    1083 
    1084                                 elif re.match( ex_metricstr, orig_name ):
    1085 
    1086                                         exclude_metric = True
    1087 
    1088                         if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
    1089 
    1090                                 myMetric                = { }
    1091                                 myMetric['name']        = str( attrs.get( 'NAME', "" ) )
    1092                                 myMetric['val']         = str( attrs.get( 'VAL', "" ) )
    1093                                 myMetric['time']        = self.hostReported
    1094 
    1095                                 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
    1096 
    1097                                 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
    1098 
    1099         def storeMetrics( self ):
    1100                 """Store metrics of each cluster rrd handler"""
    1101 
    1102                 for clustername, rrdh in self.clusters.items():
    1103 
    1104                         ret = rrdh.storeMetrics()
    1105 
    1106                         if ret:
    1107                                 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
    1108                                 return 1
    1109 
    1110                 return 0
     979    """Parse Ganglia's XML"""
     980
     981    def __init__( self, config, datastore ):
     982        """Setup initial variables and gather info on existing rrd archive"""
     983
     984        self.config    = config
     985        self.clusters    = { }
     986        self.ds        = datastore
     987
     988        debug_msg( 1, 'Checking database..' )
     989
     990        global DEBUG_LEVEL
     991
     992        if DEBUG_LEVEL <= 2:
     993            self.ds.checkStaleJobs()
     994
     995        debug_msg( 1, 'Check done.' )
     996        debug_msg( 1, 'Checking rrd archive..' )
     997        self.gatherClusters()
     998        debug_msg( 1, 'Check done.' )
     999
     1000    def gatherClusters( self ):
     1001        """Find all existing clusters in archive dir"""
     1002
     1003        archive_dir    = check_dir(ARCHIVE_PATH)
     1004
     1005        hosts        = [ ]
     1006
     1007        if os.path.exists( archive_dir ):
     1008
     1009            dirlist    = os.listdir( archive_dir )
     1010
     1011            for cfgcluster in ARCHIVE_DATASOURCES:
     1012
     1013                if cfgcluster not in dirlist:
     1014
     1015                    # Autocreate a directory for this cluster
     1016                    # assume it is new
     1017                    #
     1018                    cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
     1019
     1020                    os.mkdir( cluster_dir )
     1021
     1022                    dirlist.append( cfgcluster )
     1023
     1024            for item in dirlist:
     1025
     1026                clustername = item
     1027
     1028                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
     1029
     1030                    self.clusters[ clustername ] = RRDHandler( self.config, clustername )
     1031
     1032        debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
     1033
     1034    def startElement( self, name, attrs ):
     1035        """Memorize appropriate data from xml start tags"""
     1036
     1037        if name == 'GANGLIA_XML':
     1038
     1039            self.XMLSource        = str( attrs.get( 'SOURCE', "" ) )
     1040            self.gangliaVersion    = str( attrs.get( 'VERSION', "" ) )
     1041
     1042            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
     1043
     1044        elif name == 'GRID':
     1045
     1046            self.gridName    = str( attrs.get( 'NAME', "" ) )
     1047            self.time    = str( attrs.get( 'LOCALTIME', "" ) )
     1048
     1049            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
     1050
     1051        elif name == 'CLUSTER':
     1052
     1053            self.clusterName    = str( attrs.get( 'NAME', "" ) )
     1054            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
     1055
     1056            if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
     1057
     1058                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
     1059
     1060                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
     1061
     1062        elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
     1063
     1064            self.hostName        = str( attrs.get( 'NAME', "" ) )
     1065            self.hostIp        = str( attrs.get( 'IP', "" ) )
     1066            self.hostReported    = str( attrs.get( 'REPORTED', "" ) )
     1067
     1068            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
     1069
     1070        elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
     1071
     1072            type = str( attrs.get( 'TYPE', "" ) )
     1073           
     1074            exclude_metric = False
     1075           
     1076            for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
     1077
     1078                orig_name = str( attrs.get( 'NAME', "" ) )
     1079
     1080                if string.lower( orig_name ) == string.lower( ex_metricstr ):
     1081               
     1082                    exclude_metric = True
     1083
     1084                elif re.match( ex_metricstr, orig_name ):
     1085
     1086                    exclude_metric = True
     1087
     1088            if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
     1089
     1090                myMetric        = { }
     1091                myMetric['name']    = str( attrs.get( 'NAME', "" ) )
     1092                myMetric['val']        = str( attrs.get( 'VAL', "" ) )
     1093                myMetric['time']    = self.hostReported
     1094
     1095                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
     1096
     1097                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
     1098
     1099    def storeMetrics( self ):
     1100        """Store metrics of each cluster rrd handler"""
     1101
     1102        for clustername, rrdh in self.clusters.items():
     1103
     1104            ret = rrdh.storeMetrics()
     1105
     1106            if ret:
     1107                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
     1108                return 1
     1109
     1110        return 0
    11111111
    11121112class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
    11131113
    1114         def error( self, exception ):
    1115                 """Recoverable error"""
    1116 
    1117                 debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
    1118 
    1119         def fatalError( self, exception ):
    1120                 """Non-recoverable error"""
    1121 
    1122                 exception_str = str( exception )
    1123 
    1124                 # Ignore 'no element found' errors
    1125                 if exception_str.find( 'no element found' ) != -1:
    1126                         debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
    1127                         return 0
    1128 
    1129                 debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
    1130                 sys.exit( 1 )
    1131 
    1132         def warning( self, exception ):
    1133                 """Warning"""
    1134 
    1135                 debug_msg( 0, 'Warning ' + str( exception ) )
     1114    def error( self, exception ):
     1115        """Recoverable error"""
     1116
     1117        debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
     1118
     1119    def fatalError( self, exception ):
     1120        """Non-recoverable error"""
     1121
     1122        exception_str = str( exception )
     1123
     1124        # Ignore 'no element found' errors
     1125        if exception_str.find( 'no element found' ) != -1:
     1126            debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
     1127            return 0
     1128
     1129        debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
     1130        sys.exit( 1 )
     1131
     1132    def warning( self, exception ):
     1133        """Warning"""
     1134
     1135        debug_msg( 0, 'Warning ' + str( exception ) )
    11361136
    11371137class XMLGatherer:
    1138         """Setup a connection and file object to Ganglia's XML"""
    1139 
    1140         s               = None
    1141         fd              = None
    1142         data            = None
    1143         slot            = None
    1144 
    1145         # Time since the last update
    1146         #
    1147         LAST_UPDATE     = 0
    1148 
    1149         # Minimum interval between updates
    1150         #
    1151         MIN_UPDATE_INT  = 10
    1152 
    1153         # Is a update occuring now
    1154         #
    1155         update_now      = False
    1156 
    1157         def __init__( self, host, port ):
    1158                 """Store host and port for connection"""
    1159 
    1160                 self.host       = host
    1161                 self.port       = port
    1162                 self.slot       = threading.Lock()
    1163 
    1164                 self.retrieveData()
    1165 
    1166         def retrieveData( self ):
    1167                 """Setup connection to XML source"""
    1168 
    1169                 self.update_now = True
    1170 
    1171                 self.slot.acquire()
    1172 
    1173                 self.data       = None
    1174 
    1175                 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
    1176 
    1177                         af, socktype, proto, canonname, sa = res
    1178 
    1179                         try:
    1180 
    1181                                 self.s = socket.socket( af, socktype, proto )
    1182 
    1183                         except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
    1184 
    1185                                 self.s = None
    1186                                 continue
    1187 
    1188                         try:
    1189 
    1190                                 self.s.connect( sa )
    1191 
    1192                         except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
    1193 
    1194                                 self.disconnect()
    1195                                 continue
    1196 
    1197                         break
    1198 
    1199                 if self.s is None:
    1200 
    1201                         debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
    1202                         self.update_now = False
    1203                         #sys.exit( 1 )
    1204 
    1205                 else:
    1206                         #self.s.send( '\n' )
    1207 
    1208                         my_fp                   = self.s.makefile( 'r' )
    1209                         my_data                 = my_fp.readlines()
    1210                         my_data                 = string.join( my_data, '' )
    1211 
    1212                         self.data               = my_data
    1213 
    1214                         self.LAST_UPDATE        = time.time()
    1215 
    1216                 self.slot.release()
    1217 
    1218                 self.update_now = False
    1219 
    1220         def disconnect( self ):
    1221                 """Close socket"""
    1222 
    1223                 if self.s:
    1224                         #self.s.shutdown( 2 )
    1225                         self.s.close()
    1226                         self.s = None
    1227 
    1228         def __del__( self ):
    1229                 """Kill the socket before we leave"""
    1230 
    1231                 self.disconnect()
    1232 
    1233         def reGetData( self ):
    1234                 """Reconnect"""
    1235 
    1236                 while self.update_now:
    1237 
    1238                         # Must be another update in progress:
    1239                         # Wait until the update is complete
    1240                         #
    1241                         time.sleep( 1 )
    1242 
    1243                 if self.s:
    1244                         self.disconnect()
    1245 
    1246                 self.retrieveData()
    1247 
    1248         def getData( self ):
    1249 
    1250                 """Return the XML data"""
    1251 
    1252                 # If more than MIN_UPDATE_INT seconds passed since last data update
    1253                 # update the XML first before returning it
    1254                 #
    1255 
    1256                 cur_time        = time.time()
    1257 
    1258                 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
    1259 
    1260                         self.reGetData()
    1261 
    1262                 while self.update_now:
    1263 
    1264                         # Must be another update in progress:
    1265                         # Wait until the update is complete
    1266                         #
    1267                         time.sleep( 1 )
    1268                        
    1269                 return self.data
    1270 
    1271         def makeFileDescriptor( self ):
    1272                 """Make file descriptor that points to our socket connection"""
    1273 
    1274                 self.reconnect()
    1275 
    1276                 if self.s:
    1277                         self.fd = self.s.makefile( 'r' )
    1278 
    1279         def getFileObject( self ):
    1280                 """Connect, and return a file object"""
    1281 
    1282                 self.makeFileDescriptor()
    1283 
    1284                 if self.fd:
    1285                         return self.fd
     1138    """Setup a connection and file object to Ganglia's XML"""
     1139
     1140    s        = None
     1141    fd        = None
     1142    data        = None
     1143    slot        = None
     1144
     1145    # Time since the last update
     1146    #
     1147    LAST_UPDATE    = 0
     1148
     1149    # Minimum interval between updates
     1150    #
     1151    MIN_UPDATE_INT    = 10
     1152
     1153    # Is a update occuring now
     1154    #
     1155    update_now    = False
     1156
     1157    def __init__( self, host, port ):
     1158        """Store host and port for connection"""
     1159
     1160        self.host    = host
     1161        self.port    = port
     1162        self.slot       = threading.Lock()
     1163
     1164        self.retrieveData()
     1165
     1166    def retrieveData( self ):
     1167        """Setup connection to XML source"""
     1168
     1169        self.update_now    = True
     1170
     1171        self.slot.acquire()
     1172
     1173        self.data    = None
     1174
     1175        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
     1176
     1177            af, socktype, proto, canonname, sa = res
     1178
     1179            try:
     1180
     1181                self.s = socket.socket( af, socktype, proto )
     1182
     1183            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
     1184
     1185                self.s = None
     1186                continue
     1187
     1188                try:
     1189
     1190                self.s.connect( sa )
     1191
     1192                except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
     1193
     1194                self.disconnect()
     1195                continue
     1196
     1197                break
     1198
     1199        if self.s is None:
     1200
     1201            debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
     1202            self.update_now    = False
     1203            #sys.exit( 1 )
     1204
     1205        else:
     1206            #self.s.send( '\n' )
     1207
     1208            my_fp            = self.s.makefile( 'r' )
     1209            my_data            = my_fp.readlines()
     1210            my_data            = string.join( my_data, '' )
     1211
     1212            self.data        = my_data
     1213
     1214            self.LAST_UPDATE    = time.time()
     1215
     1216        self.slot.release()
     1217
     1218        self.update_now    = False
     1219
     1220    def disconnect( self ):
     1221        """Close socket"""
     1222
     1223        if self.s:
     1224            #self.s.shutdown( 2 )
     1225            self.s.close()
     1226            self.s = None
     1227
     1228    def __del__( self ):
     1229        """Kill the socket before we leave"""
     1230
     1231        self.disconnect()
     1232
     1233    def reGetData( self ):
     1234        """Reconnect"""
     1235
     1236        while self.update_now:
     1237
     1238            # Must be another update in progress:
     1239            # Wait until the update is complete
     1240            #
     1241            time.sleep( 1 )
     1242
     1243        if self.s:
     1244            self.disconnect()
     1245
     1246        self.retrieveData()
     1247
     1248    def getData( self ):
     1249
     1250        """Return the XML data"""
     1251
     1252        # If more than MIN_UPDATE_INT seconds passed since last data update
     1253        # update the XML first before returning it
     1254        #
     1255
     1256        cur_time    = time.time()
     1257
     1258        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
     1259
     1260            self.reGetData()
     1261
     1262        while self.update_now:
     1263
     1264            # Must be another update in progress:
     1265            # Wait until the update is complete
     1266            #
     1267            time.sleep( 1 )
     1268           
     1269        return self.data
     1270
     1271    def makeFileDescriptor( self ):
     1272        """Make file descriptor that points to our socket connection"""
     1273
     1274        self.reconnect()
     1275
     1276        if self.s:
     1277            self.fd = self.s.makefile( 'r' )
     1278
     1279    def getFileObject( self ):
     1280        """Connect, and return a file object"""
     1281
     1282        self.makeFileDescriptor()
     1283
     1284        if self.fd:
     1285            return self.fd
    12861286
    12871287class GangliaXMLProcessor( XMLProcessor ):
    1288         """Main class for processing XML and acting with it"""
    1289 
    1290         def __init__( self, XMLSource, DataStore ):
    1291                 """Setup initial XML connection and handlers"""
    1292 
    1293                 self.config             = GangliaConfigParser( GMETAD_CONF )
    1294 
    1295                 #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    1296                 #self.myXMLSource       = self.myXMLGatherer.getFileObject()
    1297                 self.myXMLSource        = XMLSource
    1298                 self.ds                 = DataStore
    1299                 self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
    1300                 self.myXMLError         = XMLErrorHandler()
    1301 
    1302         def run( self ):
    1303                 """Main XML processing; start a xml and storethread"""
    1304 
    1305                 xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
    1306                 store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
    1307 
    1308                 while( 1 ):
    1309 
    1310                         if not xml_thread.isAlive():
    1311                                 # Gather XML at the same interval as gmetad
    1312 
    1313                                 # threaded call to: self.processXML()
    1314                                 #
    1315                                 try:
    1316                                         xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
    1317                                         xml_thread.start()
    1318                                 except thread.error, msg:
    1319                                         debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
    1320                                         #return 1
    1321 
    1322                         if not store_thread.isAlive():
    1323                                 # Store metrics every .. sec
    1324 
    1325                                 # threaded call to: self.storeMetrics()
    1326                                 #
    1327                                 try:
    1328                                         store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
    1329                                         store_thread.start()
    1330                                 except thread.error, msg:
    1331                                         debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
    1332                                         #return 1
    1333                
    1334                         # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
    1335                         time.sleep( 1 )
    1336 
    1337         def storeMetrics( self ):
    1338                 """Store metrics retained in memory to disk"""
    1339 
    1340                 global DEBUG_LEVEL
    1341 
    1342                 # Store metrics somewhere between every 360 and 640 seconds
    1343                 #
    1344                 if DEBUG_LEVEL > 2:
    1345                         #STORE_INTERVAL = 60
    1346                         STORE_INTERVAL = random.randint( 360, 640 )
    1347                 else:
    1348                         STORE_INTERVAL = random.randint( 360, 640 )
    1349 
    1350                 try:
    1351                         store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
    1352                         store_metric_thread.start()
    1353                 except thread.error, msg:
    1354                         debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
    1355                         return 1
    1356 
    1357                 debug_msg( 1, 'ganglia_store_thread(): started.' )
    1358 
    1359                 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
    1360                 time.sleep( STORE_INTERVAL )
    1361                 debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
    1362 
    1363                 if store_metric_thread.isAlive():
    1364 
    1365                         debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
    1366                         store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    1367                         debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
    1368 
    1369                 debug_msg( 1, 'ganglia_store_thread(): finished.' )
    1370 
    1371                 return 0
    1372 
    1373         def storeThread( self ):
    1374                 """Actual metric storing thread"""
    1375 
    1376                 debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
    1377                 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
    1378                 ret = self.myXMLHandler.storeMetrics()
    1379                 if ret > 0:
    1380                         debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
    1381                 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
    1382                 debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
    1383                
    1384                 return 0
    1385 
    1386         def processXML( self ):
    1387                 """Process XML"""
    1388 
    1389                 try:
    1390                         parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
    1391                         parsethread.start()
    1392                 except thread.error, msg:
    1393                         debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
    1394                         return 1
    1395 
    1396                 debug_msg( 1, 'ganglia_xml_thread(): started.' )
    1397 
    1398                 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
    1399                 time.sleep( float( self.config.getLowestInterval() ) ) 
    1400                 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
    1401 
    1402                 if parsethread.isAlive():
    1403 
    1404                         debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
    1405                         parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    1406                         debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
    1407 
    1408                 debug_msg( 1, 'ganglia_xml_thread(): finished.' )
    1409 
    1410                 return 0
    1411 
    1412         def parseThread( self ):
    1413                 """Actual parsing thread"""
    1414 
    1415                 debug_msg( 1, 'ganglia_parse_thread(): started.' )
    1416                 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
    1417                
    1418                 my_data = self.myXMLSource.getData()
    1419 
    1420                 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
    1421 
    1422                 if my_data:
    1423                         debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
    1424                         xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    1425                         debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
    1426 
    1427                 debug_msg( 1, 'ganglia_parse_thread(): finished.' )
    1428 
    1429                 return 0
     1288    """Main class for processing XML and acting with it"""
     1289
     1290    def __init__( self, XMLSource, DataStore ):
     1291        """Setup initial XML connection and handlers"""
     1292
     1293        self.config        = GangliaConfigParser( GMETAD_CONF )
     1294
     1295        #self.myXMLGatherer    = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     1296        #self.myXMLSource    = self.myXMLGatherer.getFileObject()
     1297        self.myXMLSource    = XMLSource
     1298        self.ds            = DataStore
     1299        self.myXMLHandler    = GangliaXMLHandler( self.config, self.ds )
     1300        self.myXMLError        = XMLErrorHandler()
     1301
     1302    def run( self ):
     1303        """Main XML processing; start a xml and storethread"""
     1304
     1305        xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
     1306        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
     1307
     1308        while( 1 ):
     1309
     1310            if not xml_thread.isAlive():
     1311                # Gather XML at the same interval as gmetad
     1312
     1313                # threaded call to: self.processXML()
     1314                #
     1315                try:
     1316                    xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
     1317                    xml_thread.start()
     1318                except thread.error, msg:
     1319                    debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
     1320                    #return 1
     1321
     1322            if not store_thread.isAlive():
     1323                # Store metrics every .. sec
     1324
     1325                # threaded call to: self.storeMetrics()
     1326                #
     1327                try:
     1328                    store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
     1329                    store_thread.start()
     1330                except thread.error, msg:
     1331                    debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
     1332                    #return 1
     1333       
     1334            # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
     1335            time.sleep( 1 )   
     1336
     1337    def storeMetrics( self ):
     1338        """Store metrics retained in memory to disk"""
     1339
     1340        global DEBUG_LEVEL
     1341
     1342        # Store metrics somewhere between every 360 and 640 seconds
     1343        #
     1344        if DEBUG_LEVEL > 2:
     1345            #STORE_INTERVAL = 60
     1346            STORE_INTERVAL = random.randint( 360, 640 )
     1347        else:
     1348            STORE_INTERVAL = random.randint( 360, 640 )
     1349
     1350        try:
     1351            store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
     1352            store_metric_thread.start()
     1353        except thread.error, msg:
     1354            debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
     1355            return 1
     1356
     1357        debug_msg( 1, 'ganglia_store_thread(): started.' )
     1358
     1359        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
     1360        time.sleep( STORE_INTERVAL )
     1361        debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
     1362
     1363        if store_metric_thread.isAlive():
     1364
     1365            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
     1366            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
     1367            debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
     1368
     1369        debug_msg( 1, 'ganglia_store_thread(): finished.' )
     1370
     1371        return 0
     1372
     1373    def storeThread( self ):
     1374        """Actual metric storing thread"""
     1375
     1376        debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
     1377        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
     1378        ret = self.myXMLHandler.storeMetrics()
     1379        if ret > 0:
     1380            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
     1381        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
     1382        debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
     1383       
     1384        return 0
     1385
     1386    def processXML( self ):
     1387        """Process XML"""
     1388
     1389        try:
     1390            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
     1391            parsethread.start()
     1392        except thread.error, msg:
     1393            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
     1394            return 1
     1395
     1396        debug_msg( 1, 'ganglia_xml_thread(): started.' )
     1397
     1398        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
     1399        time.sleep( float( self.config.getLowestInterval() ) )   
     1400        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
     1401
     1402        if parsethread.isAlive():
     1403
     1404            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
     1405            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
     1406            debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
     1407
     1408        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
     1409
     1410        return 0
     1411
     1412    def parseThread( self ):
     1413        """Actual parsing thread"""
     1414
     1415        debug_msg( 1, 'ganglia_parse_thread(): started.' )
     1416        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
     1417       
     1418        my_data    = self.myXMLSource.getData()
     1419
     1420        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
     1421
     1422        if my_data:
     1423            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
     1424            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
     1425            debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
     1426
     1427        debug_msg( 1, 'ganglia_parse_thread(): finished.' )
     1428
     1429        return 0
    14301430
    14311431class GangliaConfigParser:
    14321432
    1433         sources = [ ]
    1434 
    1435         def __init__( self, config ):
    1436                 """Parse some stuff from our gmetad's config, such as polling interval"""
    1437 
    1438                 self.config = config
    1439                 self.parseValues()
    1440 
    1441         def parseValues( self ):
    1442                 """Parse certain values from gmetad.conf"""
    1443 
    1444                 readcfg = open( self.config, 'r' )
    1445 
    1446                 for line in readcfg.readlines():
    1447 
    1448                         if line.count( '"' ) > 1:
    1449 
    1450                                 if line.find( 'data_source' ) != -1 and line[0] != '#':
    1451 
    1452                                         source          = { }
    1453                                         source['name']  = line.split( '"' )[1]
    1454                                         source_words    = line.split( '"' )[2].split( ' ' )
    1455 
    1456                                         for word in source_words:
    1457 
    1458                                                 valid_interval = 1
    1459 
    1460                                                 for letter in word:
    1461 
    1462                                                         if letter not in string.digits:
    1463 
    1464                                                                 valid_interval = 0
    1465 
    1466                                                 if valid_interval and len(word) > 0:
    1467 
    1468                                                         source['interval'] = word
    1469                                                         debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
    1470        
    1471                                         # No interval found, use Ganglia's default     
    1472                                         if not source.has_key( 'interval' ):
    1473                                                 source['interval'] = 15
    1474                                                 debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
    1475 
    1476                                         self.sources.append( source )
    1477 
    1478         def getInterval( self, source_name ):
    1479                 """Return interval for source_name"""
    1480 
    1481                 for source in self.sources:
    1482 
    1483                         if source['name'] == source_name:
    1484 
    1485                                 return source['interval']
    1486 
    1487                 return None
    1488 
    1489         def getLowestInterval( self ):
    1490                 """Return the lowest interval of all clusters"""
    1491 
    1492                 lowest_interval = 0
    1493 
    1494                 for source in self.sources:
    1495 
    1496                         if not lowest_interval or source['interval'] <= lowest_interval:
    1497 
    1498                                 lowest_interval = source['interval']
    1499 
    1500                 # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
    1501                 if lowest_interval:
    1502                         return lowest_interval
    1503                 else:
    1504                         return 15
     1433    sources = [ ]
     1434
     1435    def __init__( self, config ):
     1436        """Parse some stuff from our gmetad's config, such as polling interval"""
     1437
     1438        self.config = config
     1439        self.parseValues()
     1440
     1441    def parseValues( self ):
     1442        """Parse certain values from gmetad.conf"""
     1443
     1444        readcfg = open( self.config, 'r' )
     1445
     1446        for line in readcfg.readlines():
     1447
     1448            if line.count( '"' ) > 1:
     1449
     1450                if line.find( 'data_source' ) != -1 and line[0] != '#':
     1451
     1452                    source        = { }
     1453                    source['name']    = line.split( '"' )[1]
     1454                    source_words    = line.split( '"' )[2].split( ' ' )
     1455
     1456                    for word in source_words:
     1457
     1458                        valid_interval = 1
     1459
     1460                        for letter in word:
     1461
     1462                            if letter not in string.digits:
     1463
     1464                                valid_interval = 0
     1465
     1466                        if valid_interval and len(word) > 0:
     1467
     1468                            source['interval'] = word
     1469                            debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
     1470   
     1471                    # No interval found, use Ganglia's default   
     1472                    if not source.has_key( 'interval' ):
     1473                        source['interval'] = 15
     1474                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
     1475
     1476                    self.sources.append( source )
     1477
     1478    def getInterval( self, source_name ):
     1479        """Return interval for source_name"""
     1480
     1481        for source in self.sources:
     1482
     1483            if source['name'] == source_name:
     1484
     1485                return source['interval']
     1486
     1487        return None
     1488
     1489    def getLowestInterval( self ):
     1490        """Return the lowest interval of all clusters"""
     1491
     1492        lowest_interval = 0
     1493
     1494        for source in self.sources:
     1495
     1496            if not lowest_interval or source['interval'] <= lowest_interval:
     1497
     1498                lowest_interval = source['interval']
     1499
     1500        # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
     1501        if lowest_interval:
     1502            return lowest_interval
     1503        else:
     1504            return 15
    15051505
    15061506class RRDHandler:
    1507         """Class for handling RRD activity"""
    1508 
    1509         myMetrics = { }
    1510         lastStored = { }
    1511         timeserials = { }
    1512         slot = None
    1513 
    1514         def __init__( self, config, cluster ):
    1515                 """Setup initial variables"""
    1516 
    1517                 global MODRRDTOOL
    1518 
    1519                 self.block      = 0
    1520                 self.cluster    = cluster
    1521                 self.config     = config
    1522                 self.slot       = threading.Lock()
    1523 
    1524                 if MODRRDTOOL:
    1525 
    1526                         self.rrdm       = RRDMutator()
    1527                 else:
    1528                         self.rrdm       = RRDMutator( RRDTOOL )
    1529 
    1530                 global DEBUG_LEVEL
    1531 
    1532                 if DEBUG_LEVEL <= 2:
    1533                         self.gatherLastUpdates()
    1534 
    1535         def gatherLastUpdates( self ):
    1536                 """Populate the lastStored list, containing timestamps of all last updates"""
    1537 
    1538                 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
    1539 
    1540                 hosts = [ ]
    1541 
    1542                 if os.path.exists( cluster_dir ):
    1543 
    1544                         dirlist = os.listdir( cluster_dir )
    1545 
    1546                         for dir in dirlist:
    1547 
    1548                                 hosts.append( dir )
    1549 
    1550                 for host in hosts:
    1551 
    1552                         host_dir        = cluster_dir + '/' + host
    1553                         dirlist         = os.listdir( host_dir )
    1554 
    1555                         for dir in dirlist:
    1556 
    1557                                 if not self.timeserials.has_key( host ):
    1558 
    1559                                         self.timeserials[ host ] = [ ]
    1560 
    1561                                 self.timeserials[ host ].append( dir )
    1562 
    1563                         last_serial = self.getLastRrdTimeSerial( host )
    1564 
    1565                         if last_serial:
    1566 
    1567                                 metric_dir = cluster_dir + '/' + host + '/' + last_serial
    1568 
    1569                                 if os.path.exists( metric_dir ):
    1570 
    1571                                         dirlist = os.listdir( metric_dir )
    1572 
    1573                                         for file in dirlist:
    1574 
    1575                                                 metricname = file.split( '.rrd' )[0]
    1576 
    1577                                                 if not self.lastStored.has_key( host ):
    1578 
    1579                                                         self.lastStored[ host ] = { }
    1580 
    1581                                                 self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
    1582 
    1583         def getClusterName( self ):
    1584                 """Return clustername"""
    1585 
    1586                 return self.cluster
    1587 
    1588         def memMetric( self, host, metric ):
    1589                 """Store metric from host in memory"""
    1590 
    1591                 # <ATOMIC>
    1592                 #
    1593                 self.slot.acquire()
    1594                
    1595                 if self.myMetrics.has_key( host ):
    1596 
    1597                         if self.myMetrics[ host ].has_key( metric['name'] ):
    1598 
    1599                                 for mymetric in self.myMetrics[ host ][ metric['name'] ]:
    1600 
    1601                                         if mymetric['time'] == metric['time']:
    1602 
    1603                                                 # Allready have this metric, abort
    1604                                                 self.slot.release()
    1605                                                 return 1
    1606                         else:
    1607                                 self.myMetrics[ host ][ metric['name'] ] = [ ]
    1608                 else:
    1609                         self.myMetrics[ host ]                          = { }
    1610                         self.myMetrics[ host ][ metric['name'] ]        = [ ]
    1611 
    1612                 # Push new metric onto stack
    1613                 # atomic code; only 1 thread at a time may access the stack
    1614 
    1615                 self.myMetrics[ host ][ metric['name'] ].append( metric )
    1616 
    1617                 self.slot.release()
    1618                 #
    1619                 # </ATOMIC>
    1620 
    1621         def makeUpdateList( self, host, metriclist ):
    1622                 """
    1623                 Make a list of update values for rrdupdate
    1624                 but only those that we didn't store before
    1625                 """
    1626 
    1627                 update_list     = [ ]
    1628                 metric          = None
    1629 
    1630                 while len( metriclist ) > 0:
    1631 
    1632                         metric = metriclist.pop( 0 )
    1633 
    1634                         if self.checkStoreMetric( host, metric ):
    1635 
    1636                                 u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
    1637                                 #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
    1638                                 update_list.append( u_val )
    1639 
    1640                 return update_list
    1641 
    1642         def checkStoreMetric( self, host, metric ):
    1643                 """Check if supplied metric if newer than last one stored"""
    1644 
    1645                 if self.lastStored.has_key( host ):
    1646 
    1647                         if self.lastStored[ host ].has_key( metric['name'] ):
    1648 
    1649                                 if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
    1650 
    1651                                         # This is old
    1652                                         return 0
    1653 
    1654                 return 1
    1655 
    1656         def memLastUpdate( self, host, metricname, metriclist ):
    1657                 """
    1658                 Memorize the time of the latest metric from metriclist
    1659                 but only if it wasn't allready memorized
    1660                 """
    1661 
    1662                 if not self.lastStored.has_key( host ):
    1663                         self.lastStored[ host ] = { }
    1664 
    1665                 last_update_time = 0
    1666 
    1667                 for metric in metriclist:
    1668 
    1669                         if metric['name'] == metricname:
    1670 
    1671                                 if metric['time'] > last_update_time:
    1672 
    1673                                         last_update_time = metric['time']
    1674 
    1675                 if self.lastStored[ host ].has_key( metricname ):
    1676                        
    1677                         if last_update_time <= self.lastStored[ host ][ metricname ]:
    1678                                 return 1
    1679 
    1680                 self.lastStored[ host ][ metricname ] = last_update_time
    1681 
    1682         def storeMetrics( self ):
    1683                 """
    1684                 Store all metrics from memory to disk
    1685                 and do it to the RRD's in appropriate timeperiod directory
    1686                 """
    1687 
    1688                 debug_msg( 5, "Entering storeMetrics()")
    1689 
    1690                 count_values    = 0
    1691                 count_metrics   = 0
    1692                 count_bits      = 0
    1693 
    1694                 for hostname, mymetrics in self.myMetrics.items():     
    1695 
    1696                         for metricname, mymetric in mymetrics.items():
    1697 
    1698                                 count_metrics += 1
    1699 
    1700                                 for dmetric in mymetric:
    1701 
    1702                                         count_values += 1
    1703 
    1704                                         count_bits      += len( dmetric['time'] )
    1705                                         count_bits      += len( dmetric['val'] )
    1706 
    1707                 count_bytes     = count_bits / 8
    1708 
    1709                 debug_msg( 5, "size of cluster '" + self.cluster + "': " +
    1710                         str( len( self.myMetrics.keys() ) ) + " hosts " +
    1711                         str( count_metrics ) + " metrics " + str( count_values ) + " values " +
    1712                         str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
    1713 
    1714                 for hostname, mymetrics in self.myMetrics.items():     
    1715 
    1716                         for metricname, mymetric in mymetrics.items():
    1717 
    1718                                 metrics_to_store = [ ]
    1719 
    1720                                 # Pop metrics from stack for storing until none is left
    1721                                 # atomic code: only 1 thread at a time may access myMetrics
    1722 
    1723                                 # <ATOMIC>
    1724                                 #
    1725                                 self.slot.acquire()
    1726 
    1727                                 while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
    1728 
    1729                                         if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
    1730 
    1731                                                 try:
    1732                                                         metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
    1733                                                 except IndexError, msg:
    1734 
    1735                                                         # Somehow sometimes myMetrics[ hostname ][ metricname ]
    1736                                                         # is still len 0 when the statement is executed.
    1737                                                         # Just ignore indexerror's..
    1738                                                         pass
    1739 
    1740                                 self.slot.release()
    1741                                 #
    1742                                 # </ATOMIC>
    1743 
    1744                                 # Create a mapping table, each metric to the period where it should be stored
    1745                                 #
    1746                                 metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
    1747 
    1748                                 update_rets = [ ]
    1749 
    1750                                 for period, pmetric in metric_serial_table.items():
    1751 
    1752                                         create_ret = self.createCheck( hostname, metricname, period )   
    1753 
    1754                                         update_ret = self.update( hostname, metricname, period, pmetric )
    1755 
    1756                                         if update_ret == 0:
    1757 
    1758                                                 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
    1759                                         else:
    1760                                                 debug_msg( 9, 'metric update failed' )
    1761 
    1762                                         update_rets.append( create_ret )
    1763                                         update_rets.append( update_ret )
    1764 
    1765                                 # Lets ignore errors here for now, we need to make sure last update time
    1766                                 # is correct!
    1767                                 #
    1768                                 #if not (1) in update_rets:
    1769 
    1770                                 self.memLastUpdate( hostname, metricname, metrics_to_store )
    1771 
    1772                 debug_msg( 5, "Leaving storeMetrics()")
    1773 
    1774         def makeTimeSerial( self ):
    1775                 """Generate a time serial. Seconds since epoch"""
    1776 
    1777                 # Seconds since epoch
    1778                 mytime = int( time.time() )
    1779 
    1780                 return mytime
    1781 
    1782         def makeRrdPath( self, host, metricname, timeserial ):
    1783                 """Make a RRD location/path and filename"""
    1784 
    1785                 rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
    1786                 rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
    1787 
    1788                 return rrd_dir, rrd_file
    1789 
    1790         def getLastRrdTimeSerial( self, host ):
    1791                 """Find the last timeserial (directory) for this host"""
    1792 
    1793                 newest_timeserial = 0
    1794 
    1795                 for dir in self.timeserials[ host ]:
    1796 
    1797                         valid_dir = 1
    1798 
    1799                         for letter in dir:
    1800                                 if letter not in string.digits:
    1801                                         valid_dir = 0
    1802 
    1803                         if valid_dir:
    1804                                 timeserial = dir
    1805                                 if timeserial > newest_timeserial:
    1806                                         newest_timeserial = timeserial
    1807 
    1808                 if newest_timeserial:
    1809                         return newest_timeserial
    1810                 else:
    1811                         return 0
    1812 
    1813         def determinePeriod( self, host, check_serial ):
    1814                 """Determine to which period (directory) this time(serial) belongs"""
    1815 
    1816                 period_serial = 0
    1817 
    1818                 if self.timeserials.has_key( host ):
    1819 
    1820                         for serial in self.timeserials[ host ]:
    1821 
    1822                                 if check_serial >= serial and period_serial < serial:
    1823 
    1824                                         period_serial = serial
    1825 
    1826                 return period_serial
    1827 
    1828         def determineSerials( self, host, metricname, metriclist ):
    1829                 """
    1830                 Determine the correct serial and corresponding rrd to store
    1831                 for a list of metrics
    1832                 """
    1833 
    1834                 metric_serial_table = { }
    1835 
    1836                 for metric in metriclist:
    1837 
    1838                         if metric['name'] == metricname:
    1839 
    1840                                 period          = self.determinePeriod( host, metric['time'] ) 
    1841 
    1842                                 archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
    1843 
    1844                                 if (int( metric['time'] ) - int( period ) ) > archive_secs:
    1845 
    1846                                         # This one should get it's own new period
    1847                                         period = metric['time']
    1848 
    1849                                         if not self.timeserials.has_key( host ):
    1850                                                 self.timeserials[ host ] = [ ]
    1851 
    1852                                         self.timeserials[ host ].append( period )
    1853 
    1854                                 if not metric_serial_table.has_key( period ):
    1855 
    1856                                         metric_serial_table[ period ] = [ ]
    1857 
    1858                                 metric_serial_table[ period ].append( metric )
    1859 
    1860                 return metric_serial_table
    1861 
    1862         def createCheck( self, host, metricname, timeserial ):
    1863                 """Check if an rrd allready exists for this metric, create if not"""
    1864 
    1865                 debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
    1866                
    1867                 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
    1868 
    1869                 if not os.path.exists( rrd_dir ):
    1870 
    1871                         try:
    1872                                 os.makedirs( rrd_dir )
    1873 
    1874                         except os.OSError, msg:
    1875 
    1876                                 if msg.find( 'File exists' ) != -1:
    1877 
    1878                                         # Ignore exists errors
    1879                                         pass
    1880 
    1881                                 else:
    1882 
    1883                                         print msg
    1884                                         return
    1885 
    1886                         debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
    1887 
    1888                 if not os.path.exists( rrd_file ):
    1889 
    1890                         interval        = self.config.getInterval( self.cluster )
    1891                         heartbeat       = 8 * int( interval )
    1892 
    1893                         params          = [ ]
    1894 
    1895                         params.append( '--step' )
    1896                         params.append( str( interval ) )
    1897 
    1898                         params.append( '--start' )
    1899                         params.append( str( int( timeserial ) - 1 ) )
    1900 
    1901                         params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
    1902                         params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
    1903 
    1904                         self.rrdm.create( str(rrd_file), params )
    1905 
    1906                         debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
    1907 
    1908         def update( self, host, metricname, timeserial, metriclist ):
    1909                 """
    1910                 Update rrd file for host with metricname
    1911                 in directory timeserial with metriclist
    1912                 """
    1913 
    1914                 debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
    1915 
    1916                 rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
    1917 
    1918                 update_list             = self.makeUpdateList( host, metriclist )
    1919 
    1920                 if len( update_list ) > 0:
    1921                         ret = self.rrdm.update( str(rrd_file), update_list )
    1922 
    1923                         if ret:
    1924                                 return 1
    1925                
    1926                         debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
    1927 
    1928                 return 0
     1507    """Class for handling RRD activity"""
     1508
     1509    myMetrics = { }
     1510    lastStored = { }
     1511    timeserials = { }
     1512    slot = None
     1513
     1514    def __init__( self, config, cluster ):
     1515        """Setup initial variables"""
     1516
     1517        global MODRRDTOOL
     1518
     1519        self.block    = 0
     1520        self.cluster    = cluster
     1521        self.config    = config
     1522        self.slot    = threading.Lock()
     1523
     1524        if MODRRDTOOL:
     1525
     1526            self.rrdm    = RRDMutator()
     1527        else:
     1528            self.rrdm    = RRDMutator( RRDTOOL )
     1529
     1530        global DEBUG_LEVEL
     1531
     1532        if DEBUG_LEVEL <= 2:
     1533            self.gatherLastUpdates()
     1534
     1535    def gatherLastUpdates( self ):
     1536        """Populate the lastStored list, containing timestamps of all last updates"""
     1537
     1538        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
     1539
     1540        hosts = [ ]
     1541
     1542        if os.path.exists( cluster_dir ):
     1543
     1544            dirlist = os.listdir( cluster_dir )
     1545
     1546            for dir in dirlist:
     1547
     1548                hosts.append( dir )
     1549
     1550        for host in hosts:
     1551
     1552            host_dir    = cluster_dir + '/' + host
     1553            dirlist        = os.listdir( host_dir )
     1554
     1555            for dir in dirlist:
     1556
     1557                if not self.timeserials.has_key( host ):
     1558
     1559                    self.timeserials[ host ] = [ ]
     1560
     1561                self.timeserials[ host ].append( dir )
     1562
     1563            last_serial = self.getLastRrdTimeSerial( host )
     1564
     1565            if last_serial:
     1566
     1567                metric_dir = cluster_dir + '/' + host + '/' + last_serial
     1568
     1569                if os.path.exists( metric_dir ):
     1570
     1571                    dirlist = os.listdir( metric_dir )
     1572
     1573                    for file in dirlist:
     1574
     1575                        metricname = file.split( '.rrd' )[0]
     1576
     1577                        if not self.lastStored.has_key( host ):
     1578
     1579                            self.lastStored[ host ] = { }
     1580
     1581                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
     1582
     1583    def getClusterName( self ):
     1584        """Return clustername"""
     1585
     1586        return self.cluster
     1587
     1588    def memMetric( self, host, metric ):
     1589        """Store metric from host in memory"""
     1590
     1591        # <ATOMIC>
     1592        #
     1593        self.slot.acquire()
     1594       
     1595        if self.myMetrics.has_key( host ):
     1596
     1597            if self.myMetrics[ host ].has_key( metric['name'] ):
     1598
     1599                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
     1600
     1601                    if mymetric['time'] == metric['time']:
     1602
     1603                        # Allready have this metric, abort
     1604                        self.slot.release()
     1605                        return 1
     1606            else:
     1607                self.myMetrics[ host ][ metric['name'] ] = [ ]
     1608        else:
     1609            self.myMetrics[ host ]                = { }
     1610            self.myMetrics[ host ][ metric['name'] ]    = [ ]
     1611
     1612        # Push new metric onto stack
     1613        # atomic code; only 1 thread at a time may access the stack
     1614
     1615        self.myMetrics[ host ][ metric['name'] ].append( metric )
     1616
     1617        self.slot.release()
     1618        #
     1619        # </ATOMIC>
     1620
     1621    def makeUpdateList( self, host, metriclist ):
     1622        """
     1623        Make a list of update values for rrdupdate
     1624        but only those that we didn't store before
     1625        """
     1626
     1627        update_list    = [ ]
     1628        metric        = None
     1629
     1630        while len( metriclist ) > 0:
     1631
     1632            metric = metriclist.pop( 0 )
     1633
     1634            if self.checkStoreMetric( host, metric ):
     1635
     1636                u_val    = str( metric['time'] ) + ':' + str( metric['val'] )
     1637                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
     1638                update_list.append( u_val )
     1639
     1640        return update_list
     1641
     1642    def checkStoreMetric( self, host, metric ):
     1643        """Check if supplied metric if newer than last one stored"""
     1644
     1645        if self.lastStored.has_key( host ):
     1646
     1647            if self.lastStored[ host ].has_key( metric['name'] ):
     1648
     1649                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
     1650
     1651                    # This is old
     1652                    return 0
     1653
     1654        return 1
     1655
     1656    def memLastUpdate( self, host, metricname, metriclist ):
     1657        """
     1658        Memorize the time of the latest metric from metriclist
     1659        but only if it wasn't allready memorized
     1660        """
     1661
     1662        if not self.lastStored.has_key( host ):
     1663            self.lastStored[ host ] = { }
     1664
     1665        last_update_time = 0
     1666
     1667        for metric in metriclist:
     1668
     1669            if metric['name'] == metricname:
     1670
     1671                if metric['time'] > last_update_time:
     1672
     1673                    last_update_time = metric['time']
     1674
     1675        if self.lastStored[ host ].has_key( metricname ):
     1676           
     1677            if last_update_time <= self.lastStored[ host ][ metricname ]:
     1678                return 1
     1679
     1680        self.lastStored[ host ][ metricname ] = last_update_time
     1681
     1682    def storeMetrics( self ):
     1683        """
     1684        Store all metrics from memory to disk
     1685        and do it to the RRD's in appropriate timeperiod directory
     1686        """
     1687
     1688        debug_msg( 5, "Entering storeMetrics()")
     1689
     1690        count_values    = 0
     1691        count_metrics    = 0
     1692        count_bits    = 0
     1693
     1694        for hostname, mymetrics in self.myMetrics.items():   
     1695
     1696            for metricname, mymetric in mymetrics.items():
     1697
     1698                count_metrics += 1
     1699
     1700                for dmetric in mymetric:
     1701
     1702                    count_values += 1
     1703
     1704                    count_bits    += len( dmetric['time'] )
     1705                    count_bits    += len( dmetric['val'] )
     1706
     1707        count_bytes    = count_bits / 8
     1708
     1709        debug_msg( 5, "size of cluster '" + self.cluster + "': " +
     1710            str( len( self.myMetrics.keys() ) ) + " hosts " +
     1711            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
     1712            str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
     1713
     1714        for hostname, mymetrics in self.myMetrics.items():   
     1715
     1716            for metricname, mymetric in mymetrics.items():
     1717
     1718                metrics_to_store = [ ]
     1719
     1720                # Pop metrics from stack for storing until none is left
     1721                # atomic code: only 1 thread at a time may access myMetrics
     1722
     1723                # <ATOMIC>
     1724                #
     1725                self.slot.acquire()
     1726
     1727                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     1728
     1729                    if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     1730
     1731                        try:
     1732                            metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
     1733                        except IndexError, msg:
     1734
     1735                            # Somehow sometimes myMetrics[ hostname ][ metricname ]
     1736                            # is still len 0 when the statement is executed.
     1737                            # Just ignore indexerror's..
     1738                            pass
     1739
     1740                self.slot.release()
     1741                #
     1742                # </ATOMIC>
     1743
     1744                # Create a mapping table, each metric to the period where it should be stored
     1745                #
     1746                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
     1747
     1748                update_rets = [ ]
     1749
     1750                for period, pmetric in metric_serial_table.items():
     1751
     1752                    create_ret = self.createCheck( hostname, metricname, period )   
     1753
     1754                    update_ret = self.update( hostname, metricname, period, pmetric )
     1755
     1756                    if update_ret == 0:
     1757
     1758                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
     1759                    else:
     1760                        debug_msg( 9, 'metric update failed' )
     1761
     1762                    update_rets.append( create_ret )
     1763                    update_rets.append( update_ret )
     1764
     1765                # Lets ignore errors here for now, we need to make sure last update time
     1766                # is correct!
     1767                #
     1768                #if not (1) in update_rets:
     1769
     1770                self.memLastUpdate( hostname, metricname, metrics_to_store )
     1771
     1772        debug_msg( 5, "Leaving storeMetrics()")
     1773
     1774    def makeTimeSerial( self ):
     1775        """Generate a time serial. Seconds since epoch"""
     1776
     1777        # Seconds since epoch
     1778        mytime = int( time.time() )
     1779
     1780        return mytime
     1781
     1782    def makeRrdPath( self, host, metricname, timeserial ):
     1783        """Make a RRD location/path and filename"""
     1784
     1785        rrd_dir        = '%s/%s/%s/%s'    %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
     1786        rrd_file    = '%s/%s.rrd'    %( rrd_dir, metricname )
     1787
     1788        return rrd_dir, rrd_file
     1789
     1790    def getLastRrdTimeSerial( self, host ):
     1791        """Find the last timeserial (directory) for this host"""
     1792
     1793        newest_timeserial = 0
     1794
     1795        for dir in self.timeserials[ host ]:
     1796
     1797            valid_dir = 1
     1798
     1799            for letter in dir:
     1800                if letter not in string.digits:
     1801                    valid_dir = 0
     1802
     1803            if valid_dir:
     1804                timeserial = dir
     1805                if timeserial > newest_timeserial:
     1806                    newest_timeserial = timeserial
     1807
     1808        if newest_timeserial:
     1809            return newest_timeserial
     1810        else:
     1811            return 0
     1812
     1813    def determinePeriod( self, host, check_serial ):
     1814        """Determine to which period (directory) this time(serial) belongs"""
     1815
     1816        period_serial = 0
     1817
     1818        if self.timeserials.has_key( host ):
     1819
     1820            for serial in self.timeserials[ host ]:
     1821
     1822                if check_serial >= serial and period_serial < serial:
     1823
     1824                    period_serial = serial
     1825
     1826        return period_serial
     1827
     1828    def determineSerials( self, host, metricname, metriclist ):
     1829        """
     1830        Determine the correct serial and corresponding rrd to store
     1831        for a list of metrics
     1832        """
     1833
     1834        metric_serial_table = { }
     1835
     1836        for metric in metriclist:
     1837
     1838            if metric['name'] == metricname:
     1839
     1840                period        = self.determinePeriod( host, metric['time'] )   
     1841
     1842                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
     1843
     1844                if (int( metric['time'] ) - int( period ) ) > archive_secs:
     1845
     1846                    # This one should get it's own new period
     1847                    period = metric['time']
     1848
     1849                    if not self.timeserials.has_key( host ):
     1850                        self.timeserials[ host ] = [ ]
     1851
     1852                    self.timeserials[ host ].append( period )
     1853
     1854                if not metric_serial_table.has_key( period ):
     1855
     1856                    metric_serial_table[ period ] = [ ]
     1857
     1858                metric_serial_table[ period ].append( metric )
     1859
     1860        return metric_serial_table
     1861
     1862    def createCheck( self, host, metricname, timeserial ):
     1863        """Check if an rrd allready exists for this metric, create if not"""
     1864
     1865        debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
     1866       
     1867        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
     1868
     1869        if not os.path.exists( rrd_dir ):
     1870
     1871            try:
     1872                os.makedirs( rrd_dir )
     1873
     1874            except os.OSError, msg:
     1875
     1876                if msg.find( 'File exists' ) != -1:
     1877
     1878                    # Ignore exists errors
     1879                    pass
     1880
     1881                else:
     1882
     1883                    print msg
     1884                    return
     1885
     1886            debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
     1887
     1888        if not os.path.exists( rrd_file ):
     1889
     1890            interval    = self.config.getInterval( self.cluster )
     1891            heartbeat    = 8 * int( interval )
     1892
     1893            params        = [ ]
     1894
     1895            params.append( '--step' )
     1896            params.append( str( interval ) )
     1897
     1898            params.append( '--start' )
     1899            params.append( str( int( timeserial ) - 1 ) )
     1900
     1901            params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
     1902            params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
     1903
     1904            self.rrdm.create( str(rrd_file), params )
     1905
     1906            debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
     1907
     1908    def update( self, host, metricname, timeserial, metriclist ):
     1909        """
     1910        Update rrd file for host with metricname
     1911        in directory timeserial with metriclist
     1912        """
     1913
     1914        debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
     1915
     1916        rrd_dir, rrd_file    = self.makeRrdPath( host, metricname, timeserial )
     1917
     1918        update_list        = self.makeUpdateList( host, metriclist )
     1919
     1920        if len( update_list ) > 0:
     1921            ret = self.rrdm.update( str(rrd_file), update_list )
     1922
     1923            if ret:
     1924                return 1
     1925       
     1926            debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
     1927
     1928        return 0
    19291929
    19301930def daemon():
    1931         """daemonized threading"""
    1932 
    1933         # Fork the first child
    1934         #
    1935         pid = os.fork()
    1936 
    1937         if pid > 0:
    1938 
    1939                 sys.exit(0)  # end parent
    1940 
    1941         # creates a session and sets the process group ID
    1942         #
    1943         os.setsid()
    1944 
    1945         # Fork the second child
    1946         #
    1947         pid = os.fork()
    1948 
    1949         if pid > 0:
    1950 
    1951                 sys.exit(0)  # end parent
    1952 
    1953         write_pidfile()
    1954 
    1955         # Go to the root directory and set the umask
    1956         #
    1957         os.chdir('/')
    1958         os.umask(0)
    1959 
    1960         sys.stdin.close()
    1961         sys.stdout.close()
    1962         sys.stderr.close()
    1963 
    1964         os.open('/dev/null', os.O_RDWR)
    1965         os.dup2(0, 1)
    1966         os.dup2(0, 2)
    1967 
    1968         run()
     1931    """daemonized threading"""
     1932
     1933    # Fork the first child
     1934    #
     1935    pid = os.fork()
     1936
     1937    if pid > 0:
     1938
     1939        sys.exit(0)  # end parent
     1940
     1941    # creates a session and sets the process group ID
     1942    #
     1943    os.setsid()
     1944
     1945    # Fork the second child
     1946    #
     1947    pid = os.fork()
     1948
     1949    if pid > 0:
     1950
     1951        sys.exit(0)  # end parent
     1952
     1953    write_pidfile()
     1954
     1955    # Go to the root directory and set the umask
     1956    #
     1957    os.chdir('/')
     1958    os.umask(0)
     1959
     1960    sys.stdin.close()
     1961    sys.stdout.close()
     1962    sys.stderr.close()
     1963
     1964    os.open('/dev/null', os.O_RDWR)
     1965    os.dup2(0, 1)
     1966    os.dup2(0, 2)
     1967
     1968    run()
    19691969
    19701970def run():
    1971         """Threading start"""
    1972 
    1973         config          = GangliaConfigParser( GMETAD_CONF )
    1974         s_timeout       = int( config.getLowestInterval() - 1 )
    1975 
    1976         socket.setdefaulttimeout( s_timeout )
    1977 
    1978         myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    1979         myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
    1980 
    1981         myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
    1982         myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
    1983 
    1984         try:
    1985                 torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
    1986                 ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
    1987 
    1988                 torque_xml_thread.start()
    1989                 ganglia_xml_thread.start()
    1990                
    1991         except thread.error, msg:
    1992                 debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
    1993                 syslog.closelog()
    1994                 sys.exit(1)
    1995                
    1996         debug_msg( 0, 'main threading started.' )
     1971    """Threading start"""
     1972
     1973    config        = GangliaConfigParser( GMETAD_CONF )
     1974    s_timeout    = int( config.getLowestInterval() - 1 )
     1975
     1976    socket.setdefaulttimeout( s_timeout )
     1977
     1978    myXMLSource        = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     1979    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
     1980
     1981    myTorqueProcessor    = TorqueXMLProcessor( myXMLSource, myDataStore )
     1982    myGangliaProcessor    = GangliaXMLProcessor( myXMLSource, myDataStore )
     1983
     1984    try:
     1985        torque_xml_thread    = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
     1986        ganglia_xml_thread    = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
     1987
     1988        torque_xml_thread.start()
     1989        ganglia_xml_thread.start()
     1990       
     1991    except thread.error, msg:
     1992        debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
     1993        syslog.closelog()
     1994        sys.exit(1)
     1995       
     1996    debug_msg( 0, 'main threading started.' )
    19971997
    19981998def main():
    1999         """Program startup"""
    2000 
    2001         global DAEMONIZE, USE_SYSLOG
    2002 
    2003         if not processArgs( sys.argv[1:] ):
    2004                 sys.exit( 1 )
    2005 
    2006         if( DAEMONIZE and USE_SYSLOG ):
    2007                 syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
    2008 
    2009         if DAEMONIZE:
    2010                 daemon()
    2011         else:
    2012                 run()
     1999    """Program startup"""
     2000
     2001    global DAEMONIZE, USE_SYSLOG
     2002
     2003    if not processArgs( sys.argv[1:] ):
     2004        sys.exit( 1 )
     2005
     2006    if( DAEMONIZE and USE_SYSLOG ):
     2007        syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
     2008
     2009    if DAEMONIZE:
     2010        daemon()
     2011    else:
     2012        run()
    20132013
    20142014#
     
    20172017
    20182018def check_dir( directory ):
    2019         """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
    2020 
    2021         if directory[-1] == '/':
    2022                 directory = directory[:-1]
    2023 
    2024         return directory
     2019    """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
     2020
     2021    if directory[-1] == '/':
     2022        directory = directory[:-1]
     2023
     2024    return directory
    20252025
    20262026def reqtime2epoch( rtime ):
    20272027
    2028         (hours, minutes, seconds )      = rtime.split( ':' )
    2029 
    2030         etime   = int(seconds)
    2031         etime   = etime + ( int(minutes) * 60 )
    2032         etime   = etime + ( int(hours) * 60 * 60 )
    2033 
    2034         return etime
     2028    (hours, minutes, seconds )    = rtime.split( ':' )
     2029
     2030    etime    = int(seconds)
     2031    etime    = etime + ( int(minutes) * 60 )
     2032    etime    = etime + ( int(hours) * 60 * 60 )
     2033
     2034    return etime
    20352035
    20362036def debug_msg( level, msg ):
    2037         """Only print msg if correct levels"""
    2038 
    2039         if (not DAEMONIZE and DEBUG_LEVEL >= level):
    2040                 sys.stderr.write( printTime() + ' - ' + msg + '\n' )
    2041        
    2042         if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
    2043                 syslog.syslog( msg )
     2037    """Only print msg if correct levels"""
     2038
     2039    if (not DAEMONIZE and DEBUG_LEVEL >= level):
     2040        sys.stderr.write( printTime() + ' - ' + msg + '\n' )
     2041   
     2042    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
     2043        syslog.syslog( msg )
    20442044
    20452045def printTime( ):
    2046         """Print current time in human readable format"""
    2047 
    2048         return time.strftime("%a %d %b %Y %H:%M:%S")
     2046    """Print current time in human readable format"""
     2047
     2048    return time.strftime("%a %d %b %Y %H:%M:%S")
    20492049
    20502050def write_pidfile():
    20512051
    2052         # Write pidfile if PIDFILE exists
    2053         if PIDFILE:
    2054 
    2055                 pid     = os.getpid()
    2056 
    2057                 pidfile = open(PIDFILE, 'w')
    2058 
    2059                 pidfile.write( str( pid ) )
    2060                 pidfile.close()
     2052    # Write pidfile if PIDFILE exists
     2053    if PIDFILE:
     2054
     2055        pid     = os.getpid()
     2056
     2057        pidfile = open(PIDFILE, 'w')
     2058
     2059        pidfile.write( str( pid ) )
     2060        pidfile.close()
    20612061
    20622062# Ooohh, someone started me! Let's go..
    20632063#
    20642064if __name__ == '__main__':
    2065         main()
     2065    main()
Note: See TracChangeset for help on using the changeset viewer.