Changeset 295


Ignore:
Timestamp:
03/30/07 12:39:42 (14 years ago)
Author:
bastiaans
Message:

jobarchived/jobarchived.conf-dist:

  • added JOB_TIMEOUT option

jobarchived/jobarchived.py:

  • added database checking of stale/timed out jobs
  • stop archiving 'Q' state jobs in the database
Location:
trunk/jobarchived
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobarchived/jobarchived.conf-dist

    r290 r295  
    7777JOB_SQL_DBASE                   : localhost/jobarchive
    7878
     79# Timeout for jobs in archive
     80#
     81# Assume job has already finished while jobarchived was not running
     82# after this amount of hours: the it will be finished anyway in the database
     83#
     84JOB_TIMEOUT                     : 168
     85
    7986# Location of rrdtool binary
    8087#
  • trunk/jobarchived/jobarchived.py

    r293 r295  
    104104        cfg.read( filename )
    105105
    106         global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL
     106        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT
    107107
    108108        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
     
    134134
    135135        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
     136
     137        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
    136138
    137139        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
     
    367369                self.addJob( jobid, jobattrs )
    368370
     371        def checkStaleJobs( self ):
     372
     373                q = "SELECT * from jobs WHERE job_status != 'F'"
     374
     375                r = self.getDatabase( q )
     376
     377                if len( r ) == 0:
     378
     379                        return None
     380
     381                cleanjobs       = [ ]
     382                timeoutjobs     = [ ]
     383
     384                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
     385                cur_time        = time.time()
     386
     387                for row in r:
     388
     389                        job_id                  = row[0]
     390                        job_requested_time      = row[4]
     391                        job_status              = row[7]
     392                        job_start_timestamp     = row[8]
     393
     394                        if job_status == 'Q' or not job_start_timestamp:
     395
     396                                cleanjobs.append( job_id )
     397
     398                        else:
     399
     400                                start_timestamp = int( job_start_timestamp )
     401
     402                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
     403
     404                                        if job_requested_time:
     405
     406                                                rtime_epoch     = reqtime2epoch( job_requested_time )
     407                                        else:
     408                                                rtime_epoch     = None
     409                                       
     410                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
     411
     412                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
     413
     414                for j in cleanjobs:
     415
     416                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
     417                        self.setDatabase( q )
     418
     419                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
     420
     421                for j in timeoutjobs:
     422
     423                        ( i, s, r )             = j
     424
     425                        if r:
     426                                new_end_timestamp       = int( s ) + r
     427
     428                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
     429                        self.setDatabase( q )
     430
    369431class RRDMutator:
    370432        """A class for performing RRD mutations"""
     
    457519        """Main class for processing XML and acting with it"""
    458520
    459         def __init__( self, XMLSource ):
     521        def __init__( self, XMLSource, DataStore ):
    460522                """Setup initial XML connection and handlers"""
    461523
     
    463525                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
    464526                self.myXMLSource        = XMLSource
    465                 self.myXMLHandler       = TorqueXMLHandler()
     527                self.myXMLHandler       = TorqueXMLHandler( DataStore )
    466528                self.myXMLError         = XMLErrorHandler()
    467529
     
    494556        jobAttrs = { }
    495557
    496         def __init__( self ):
    497 
    498                 self.ds                 = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
     558        def __init__( self, datastore ):
     559
     560                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
     561                self.ds                 = datastore
    499562                self.jobs_processed     = [ ]
    500563                self.jobs_to_store      = [ ]
     
    594657
    595658                        for jobid in self.jobs_to_store:
    596                                 if self.jobAttrs[ jobid ]['status'] in [ 'R', 'Q', 'F' ]:
     659                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
    597660
    598661                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
     
    648711        """Parse Ganglia's XML"""
    649712
    650         def __init__( self, config ):
     713        def __init__( self, config, datastore ):
    651714                """Setup initial variables and gather info on existing rrd archive"""
    652715
    653716                self.config     = config
    654717                self.clusters   = { }
     718                self.ds         = datastore
     719                debug_msg( 1, 'Checking database..' )
     720                self.ds.checkStaleJobs()
     721                debug_msg( 1, 'Check done.' )
    655722                debug_msg( 1, 'Checking existing toga rrd archive..' )
    656723                self.gatherClusters()
     
    930997        """Main class for processing XML and acting with it"""
    931998
    932         def __init__( self, XMLSource ):
     999        def __init__( self, XMLSource, DataStore ):
    9331000                """Setup initial XML connection and handlers"""
    9341001
     
    9381005                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
    9391006                self.myXMLSource        = XMLSource
    940                 self.myXMLHandler       = GangliaXMLHandler( self.config )
     1007                self.ds                 = DataStore
     1008                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
    9411009                self.myXMLError         = XMLErrorHandler()
    9421010
     
    15671635
    15681636        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    1569 
    1570         myTorqueProcessor       = TorqueXMLProcessor( myXMLSource )
    1571         myGangliaProcessor      = GangliaXMLProcessor( myXMLSource )
     1637        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
     1638
     1639        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
     1640        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
    15721641
    15731642        try:
     
    16111680        return directory
    16121681
     1682def reqtime2epoch( rtime ):
     1683
     1684        (hours, minutes, seconds )      = rtime.split( ':' )
     1685
     1686        etime   = int(seconds)
     1687        etime   = etime + ( int(minutes) * 60 )
     1688        etime   = etime + ( int(hours) * 60 * 60 )
     1689
     1690        return etime
     1691
    16131692def debug_msg( level, msg ):
    16141693        """Only print msg if correct levels"""
Note: See TracChangeset for help on using the changeset viewer.