Changeset 295
- Timestamp:
- 03/30/07 12:39:42 (16 years ago)
- Location:
- trunk/jobarchived
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobarchived/jobarchived.conf-dist
r290 r295 77 77 JOB_SQL_DBASE : localhost/jobarchive 78 78 79 # Timeout for jobs in archive 80 # 81 # Assume job has already finished while jobarchived was not running 82 # after this amount of hours: the it will be finished anyway in the database 83 # 84 JOB_TIMEOUT : 168 85 79 86 # Location of rrdtool binary 80 87 # -
trunk/jobarchived/jobarchived.py
r293 r295 104 104 cfg.read( filename ) 105 105 106 global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL 106 global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT 107 107 108 108 ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' ) … … 134 134 135 135 JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' ) 136 137 JOB_TIMEOUT = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' ) 136 138 137 139 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) … … 367 369 self.addJob( jobid, jobattrs ) 368 370 371 def checkStaleJobs( self ): 372 373 q = "SELECT * from jobs WHERE job_status != 'F'" 374 375 r = self.getDatabase( q ) 376 377 if len( r ) == 0: 378 379 return None 380 381 cleanjobs = [ ] 382 timeoutjobs = [ ] 383 384 jobtimeout_sec = JOB_TIMEOUT * (60 * 60) 385 cur_time = time.time() 386 387 for row in r: 388 389 job_id = row[0] 390 job_requested_time = row[4] 391 job_status = row[7] 392 job_start_timestamp = row[8] 393 394 if job_status == 'Q' or not job_start_timestamp: 395 396 cleanjobs.append( job_id ) 397 398 else: 399 400 start_timestamp = int( job_start_timestamp ) 401 402 if ( cur_time - start_timestamp ) > jobtimeout_sec: 403 404 if job_requested_time: 405 406 rtime_epoch = reqtime2epoch( job_requested_time ) 407 else: 408 rtime_epoch = None 409 410 timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) ) 411 412 debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' ) 413 414 for j in cleanjobs: 415 416 q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'" 417 self.setDatabase( q ) 418 419 debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' ) 420 421 for j in timeoutjobs: 422 423 ( i, s, r ) = j 424 425 if r: 426 new_end_timestamp = int( s ) + r 427 428 q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'" 429 self.setDatabase( q ) 430 369 431 class RRDMutator: 370 432 """A class for performing RRD mutations""" … … 457 519 """Main class for processing XML and acting with it""" 458 520 459 def __init__( self, XMLSource ):521 def __init__( self, XMLSource, DataStore ): 460 522 """Setup initial XML connection and handlers""" 461 523 … … 463 525 #self.myXMLSource = self.myXMLGatherer.getFileObject() 464 526 self.myXMLSource = XMLSource 465 self.myXMLHandler = TorqueXMLHandler( )527 self.myXMLHandler = TorqueXMLHandler( DataStore ) 466 528 self.myXMLError = XMLErrorHandler() 467 529 … … 494 556 jobAttrs = { } 495 557 496 def __init__( self ): 497 498 self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 558 def __init__( self, datastore ): 559 560 #self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 561 self.ds = datastore 499 562 self.jobs_processed = [ ] 500 563 self.jobs_to_store = [ ] … … 594 657 595 658 for jobid in self.jobs_to_store: 596 if self.jobAttrs[ jobid ]['status'] in [ 'R', ' Q', 'F' ]:659 if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]: 597 660 598 661 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) … … 648 711 """Parse Ganglia's XML""" 649 712 650 def __init__( self, config ):713 def __init__( self, config, datastore ): 651 714 """Setup initial variables and gather info on existing rrd archive""" 652 715 653 716 self.config = config 654 717 self.clusters = { } 718 self.ds = datastore 719 debug_msg( 1, 'Checking database..' ) 720 self.ds.checkStaleJobs() 721 debug_msg( 1, 'Check done.' ) 655 722 debug_msg( 1, 'Checking existing toga rrd archive..' ) 656 723 self.gatherClusters() … … 930 997 """Main class for processing XML and acting with it""" 931 998 932 def __init__( self, XMLSource ):999 def __init__( self, XMLSource, DataStore ): 933 1000 """Setup initial XML connection and handlers""" 934 1001 … … 938 1005 #self.myXMLSource = self.myXMLGatherer.getFileObject() 939 1006 self.myXMLSource = XMLSource 940 self.myXMLHandler = GangliaXMLHandler( self.config ) 1007 self.ds = DataStore 1008 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds ) 941 1009 self.myXMLError = XMLErrorHandler() 942 1010 … … 1567 1635 1568 1636 myXMLSource = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 1569 1570 myTorqueProcessor = TorqueXMLProcessor( myXMLSource ) 1571 myGangliaProcessor = GangliaXMLProcessor( myXMLSource ) 1637 myDataStore = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 1638 1639 myTorqueProcessor = TorqueXMLProcessor( myXMLSource, myDataStore ) 1640 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 1572 1641 1573 1642 try: … … 1611 1680 return directory 1612 1681 1682 def reqtime2epoch( rtime ): 1683 1684 (hours, minutes, seconds ) = rtime.split( ':' ) 1685 1686 etime = int(seconds) 1687 etime = etime + ( int(minutes) * 60 ) 1688 etime = etime + ( int(hours) * 60 * 60 ) 1689 1690 return etime 1691 1613 1692 def debug_msg( level, msg ): 1614 1693 """Only print msg if correct levels"""
Note: See TracChangeset
for help on using the changeset viewer.