- Timestamp:
- 05/14/13 16:38:34 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.0/jobarchived/jobarchived.py
r855 r857 568 568 return self.addJob( jobid, jobattrs ) 569 569 570 def checkStaleJobs( self ): 570 def checkTimedoutJobs( self ): 571 572 debug_msg( 1, 'Housekeeping: checking database for timed out jobs..' ) 571 573 572 574 # Locate all jobs in the database that are not set to finished … … 580 582 return None 581 583 582 cleanjobs = [ ] 583 timeoutjobs = [ ] 584 585 jobtimeout_sec = JOB_TIMEOUT * (60 * 60) 586 cur_time = time.time() 584 timeoutjobs = [ ] 585 586 jobtimeout_sec = JOB_TIMEOUT * (60 * 60) 587 cur_time = time.time() 587 588 588 589 for row in r: 589 590 590 job_id = row[0]591 job_requested_time 592 job_status = row[7]593 job_start_timestamp 591 job_id = row[0] 592 job_requested_time = row[4] 593 job_status = row[7] 594 job_start_timestamp = row[8] 594 595 595 596 # If it was set to queued and we didn't see it started 596 597 # there's not point in keeping it around 597 598 # 598 if job_status == 'Q' or not job_start_timestamp: 599 600 cleanjobs.append( job_id ) 601 602 else: 599 if job_status == 'R' and job_start_timestamp: 603 600 604 601 start_timestamp = int( job_start_timestamp ) … … 617 614 timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) ) 618 615 619 debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' ) 620 621 # Purge these from database 622 # 623 for j in cleanjobs: 624 625 q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'" 626 self.setDatabase( q ) 627 628 debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' ) 616 debug_msg( 1, 'Housekeeping: Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' ) 617 618 ret_jobids_clean = [ ] 629 619 630 620 # Close these jobs in the database … … 639 629 new_end_timestamp = int( s ) + r 640 630 641 q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'" 631 q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'" 632 self.setDatabase( q ) 633 else: 634 635 # Requested walltime unknown: cannot guess end time: sorry delete them 636 q = "DELETE FROM jobs WHERE job_id = '" + str( i ) + "'" 637 self.setDatabase( q ) 638 639 ret_jobids_clean.append( i ) 640 641 debug_msg( 1, 'Housekeeping: done.' ) 642 643 return ret_jobids_clean 644 645 def checkStaleJobs( self ): 646 647 debug_msg( 1, 'Housekeeping: checking database for stale jobs..' ) 648 649 # Locate all jobs in the database that are not set to finished 650 # 651 q = "SELECT * from jobs WHERE job_status != 'F'" 652 653 r = self.getDatabase( q ) 654 655 if len( r ) == 0: 656 657 return None 658 659 cleanjobs = [ ] 660 661 cur_time = time.time() 662 663 for row in r: 664 665 job_id = row[0] 666 job_requested_time = row[4] 667 job_status = row[7] 668 job_start_timestamp = row[8] 669 670 # If it was set to queued and we didn't see it started 671 # there's not point in keeping it around 672 # 673 if job_status == 'Q' or not job_start_timestamp: 674 675 cleanjobs.append( job_id ) 676 677 debug_msg( 1, 'Housekeeping: Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' ) 678 679 # Purge these from database 680 # 681 for j in cleanjobs: 682 683 q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'" 642 684 self.setDatabase( q ) 685 686 debug_msg( 1, 'Housekeeping: done.' ) 687 688 return cleanjobs 643 689 644 690 class RRDMutator: … … 848 894 self.jobAttrsSaved = { } 849 895 896 self.iteration = 0 897 898 self.ds.checkTimedoutJobs() 899 self.ds.checkStaleJobs() 900 850 901 debug_msg( 1, "XML: Handler created" ) 851 902 … … 855 906 self.heartbeat = 0 856 907 self.elementct = 0 857 858 debug_msg( 1, "XML: Start document" ) 908 self.iteration = self.iteration + 1 909 910 if self.iteration > 20: 911 912 timedout_jobs = self.ds.checkTimedoutJobs() 913 self.iteration = 0 914 915 for j in timedout_jobs: 916 917 del self.jobAttrs[ j ] 918 del self.jobAttrsSaved[ j ] 919 920 debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) ) 859 921 860 922 def startElement( self, name, attrs ): … … 1066 1128 """Setup initial variables and gather info on existing rrd archive""" 1067 1129 1068 self.config = config 1069 self.clusters = { } 1070 self.ds = datastore 1071 1072 debug_msg( 1, 'Checking database..' ) 1073 1074 global DEBUG_LEVEL 1075 1076 if DEBUG_LEVEL <= 2: 1077 self.ds.checkStaleJobs() 1078 1079 debug_msg( 1, 'Check done.' ) 1080 debug_msg( 1, 'Checking rrd archive..' ) 1130 self.config = config 1131 self.clusters = { } 1132 self.ds = datastore 1133 1134 debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' ) 1081 1135 self.gatherClusters() 1082 debug_msg( 1, ' Check done.' )1136 debug_msg( 1, 'Housekeeping: RRD check complete.' ) 1083 1137 1084 1138 def gatherClusters( self ): … … 1426 1480 STORE_INTERVAL = 60 1427 1481 else: 1428 STORE_INTERVAL = random.randint( 3 60, 640 )1482 STORE_INTERVAL = random.randint( 300, 600 ) 1429 1483 1430 1484 try:
Note: See TracChangeset
for help on using the changeset viewer.