Changeset 857


Ignore:
Timestamp:
05/14/13 16:38:34 (11 years ago)
Author:
ramonb
Message:

jobarchived.py:

  • split checkStaleJobs() into checkTimedoutJobs() and checkStaleJobs()
  • check stale and timedout jobs on startup
  • implemented regular database Housekeeping: after every 20x JobXML iterations: check for timed out jobs
  • changed RRD store thread timewindow from 360-640 seconds to 300-600 seconds
  • closes #164
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/1.0/jobarchived/jobarchived.py

    r855 r857  
    568568        return self.addJob( jobid, jobattrs )
    569569
    570     def checkStaleJobs( self ):
     570    def checkTimedoutJobs( self ):
     571
     572        debug_msg( 1, 'Housekeeping: checking database for timed out jobs..' )
    571573
    572574        # Locate all jobs in the database that are not set to finished
     
    580582            return None
    581583
    582         cleanjobs    = [ ]
    583         timeoutjobs    = [ ]
    584 
    585         jobtimeout_sec    = JOB_TIMEOUT * (60 * 60)
    586         cur_time    = time.time()
     584        timeoutjobs  = [ ]
     585
     586        jobtimeout_sec = JOB_TIMEOUT * (60 * 60)
     587        cur_time       = time.time()
    587588
    588589        for row in r:
    589590
    590             job_id            = row[0]
    591             job_requested_time    = row[4]
    592             job_status        = row[7]
    593             job_start_timestamp    = row[8]
     591            job_id              = row[0]
     592            job_requested_time  = row[4]
     593            job_status          = row[7]
     594            job_start_timestamp = row[8]
    594595
    595596            # If it was set to queued and we didn't see it started
    596597            # there's not point in keeping it around
    597598            #
    598             if job_status == 'Q' or not job_start_timestamp:
    599 
    600                 cleanjobs.append( job_id )
    601 
    602             else:
     599            if job_status == 'R' and job_start_timestamp:
    603600
    604601                start_timestamp = int( job_start_timestamp )
     
    617614                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
    618615
    619         debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
    620 
    621         # Purge these from database
    622         #
    623         for j in cleanjobs:
    624 
    625             q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
    626             self.setDatabase( q )
    627 
    628         debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
     616        debug_msg( 1, 'Housekeeping: Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
     617
     618        ret_jobids_clean = [ ]
    629619
    630620        # Close these jobs in the database
     
    639629                new_end_timestamp    = int( s ) + r
    640630
    641             q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
     631                q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
     632                self.setDatabase( q )
     633            else:
     634
     635                # Requested walltime unknown: cannot guess end time: sorry delete them
     636                q = "DELETE FROM jobs WHERE job_id = '" + str( i ) + "'"
     637                self.setDatabase( q )
     638
     639            ret_jobids_clean.append( i )
     640
     641        debug_msg( 1, 'Housekeeping: done.' )
     642
     643        return ret_jobids_clean
     644
     645    def checkStaleJobs( self ):
     646
     647        debug_msg( 1, 'Housekeeping: checking database for stale jobs..' )
     648
     649        # Locate all jobs in the database that are not set to finished
     650        #
     651        q = "SELECT * from jobs WHERE job_status != 'F'"
     652
     653        r = self.getDatabase( q )
     654
     655        if len( r ) == 0:
     656
     657            return None
     658
     659        cleanjobs      = [ ]
     660
     661        cur_time       = time.time()
     662
     663        for row in r:
     664
     665            job_id              = row[0]
     666            job_requested_time  = row[4]
     667            job_status          = row[7]
     668            job_start_timestamp = row[8]
     669
     670            # If it was set to queued and we didn't see it started
     671            # there's not point in keeping it around
     672            #
     673            if job_status == 'Q' or not job_start_timestamp:
     674
     675                cleanjobs.append( job_id )
     676
     677        debug_msg( 1, 'Housekeeping: Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
     678
     679        # Purge these from database
     680        #
     681        for j in cleanjobs:
     682
     683            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
    642684            self.setDatabase( q )
     685
     686        debug_msg( 1, 'Housekeeping: done.' )
     687
     688        return cleanjobs
    643689
    644690class RRDMutator:
     
    848894        self.jobAttrsSaved   = { }
    849895
     896        self.iteration       = 0
     897
     898        self.ds.checkTimedoutJobs()
     899        self.ds.checkStaleJobs()
     900
    850901        debug_msg( 1, "XML: Handler created" )
    851902
     
    855906        self.heartbeat      = 0
    856907        self.elementct      = 0
    857 
    858         debug_msg( 1, "XML: Start document" )
     908        self.iteration      = self.iteration + 1
     909
     910        if self.iteration > 20:
     911
     912            timedout_jobs = self.ds.checkTimedoutJobs()
     913            self.iteration = 0
     914
     915            for j in timedout_jobs:
     916
     917                del self.jobAttrs[ j ]
     918                del self.jobAttrsSaved[ j ]
     919
     920        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
    859921
    860922    def startElement( self, name, attrs ):
     
    10661128        """Setup initial variables and gather info on existing rrd archive"""
    10671129
    1068         self.config    = config
    1069         self.clusters    = { }
    1070         self.ds        = datastore
    1071 
    1072         debug_msg( 1, 'Checking database..' )
    1073 
    1074         global DEBUG_LEVEL
    1075 
    1076         if DEBUG_LEVEL <= 2:
    1077             self.ds.checkStaleJobs()
    1078 
    1079         debug_msg( 1, 'Check done.' )
    1080         debug_msg( 1, 'Checking rrd archive..' )
     1130        self.config          = config
     1131        self.clusters        = { }
     1132        self.ds              = datastore
     1133
     1134        debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' )
    10811135        self.gatherClusters()
    1082         debug_msg( 1, 'Check done.' )
     1136        debug_msg( 1, 'Housekeeping: RRD check complete.' )
    10831137
    10841138    def gatherClusters( self ):
     
    14261480            STORE_INTERVAL = 60
    14271481        else:
    1428             STORE_INTERVAL = random.randint( 360, 640 )
     1482            STORE_INTERVAL = random.randint( 300, 600 )
    14291483
    14301484        try:
Note: See TracChangeset for help on using the changeset viewer.