Changeset 365 for trunk/jobarchived
- Timestamp:
- 06/13/07 10:20:05 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobarchived/jobarchived.py
r360 r365 163 163 import DBClass 164 164 import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re 165 import rrdtool 165 166 166 167 class DataSQLStore: … … 193 194 def doDatabase(self, type, statement): 194 195 195 debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) )196 debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) ) 196 197 try: 197 198 if type == 'set': … … 206 207 sys.exit(1) 207 208 208 debug_msg( 6, 'doDatabase(): result: %s' %(result) )209 debug_msg( 10, 'doDatabase(): result: %s' %(result) ) 209 210 return result 210 211 … … 272 273 update_str = None 273 274 274 debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))275 debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id)) 275 276 276 277 ids = [ ] … … 434 435 binary = None 435 436 437 436 438 def __init__( self, binary=None ): 437 439 """Set alternate binary if supplied""" … … 443 445 """Create a new rrd with args""" 444 446 445 return self.perform( 'create', '"' + filename + '"', args ) 447 #return self.perform( 'create', '"' + filename + '"', args ) 448 return self.perform( 'create', filename, args ) 446 449 447 450 def update( self, filename, args ): 448 451 """Update a rrd with args""" 449 452 450 return self.perform( 'update', '"' + filename + '"', args ) 453 #return self.perform( 'update', '"' + filename + '"', args ) 454 return self.perform( 'update', filename, args ) 451 455 452 456 def grabLastUpdate( self, filename ): … … 455 459 last_update = 0 456 460 457 debug_msg( 8, self.binary + ' info "' + filename + '"' ) 458 459 my_pipe = os.popen( self.binary + ' info "' + filename + '"' ) 460 461 for line in my_pipe.readlines(): 462 463 if line.find( 'last_update') != -1: 464 465 last_update = line.split( ' = ' )[1] 466 467 if my_pipe: 468 469 my_pipe.close() 470 471 if last_update: 461 debug_msg( 8, 'rrdtool.info( ' + filename + ' )' ) 462 463 rrd_header = { } 464 465 try: 466 rrd_header = rrdtool.info( filename ) 467 except rrdtool.error, msg: 468 debug_msg( 8, str( msg ) ) 469 470 if rrd_header.has_key( 'last_update' ): 472 471 return last_update 473 472 else: … … 491 490 arg_string = arg_string + ' ' + arg 492 491 493 debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 494 495 cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 496 lines = cmd.readlines() 497 498 cmd.close() 499 500 for line in lines: 501 502 if line.find( 'ERROR' ) != -1: 503 504 error_msg = string.join( line.split( ' ' )[1:] ) 505 debug_msg( 8, error_msg ) 506 return 1 492 debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" ) 493 494 try: 495 debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) ) 496 497 if action == 'create': 498 499 rrdtool.create( str( filename ), *args ) 500 501 elif action == 'update': 502 503 rrdtool.update( str( filename ), *args ) 504 505 except rrdtool.error, msg: 506 507 error_msg = str( msg ) 508 debug_msg( 8, error_msg ) 509 return 1 507 510 508 511 return 0 … … 566 569 567 570 self.heartbeat = 0 571 self.elementct = 0 568 572 569 573 def startElement( self, name, attrs ): … … 576 580 jobinfo = { } 577 581 582 self.elementct += 1 583 578 584 if name == 'CLUSTER': 579 585 … … 623 629 self.jobs_to_store.append( job_id ) 624 630 625 debug_msg( 6, 'jobinfo for job %s has changed' %job_id )631 debug_msg( 10, 'jobinfo for job %s has changed' %job_id ) 626 632 else: 627 633 self.jobAttrs[ job_id ] = jobinfo … … 630 636 self.jobs_to_store.append( job_id ) 631 637 632 debug_msg( 6, 'jobinfo for job %s has changed' %job_id )638 debug_msg( 10, 'jobinfo for job %s has changed' %job_id ) 633 639 634 640 def endDocument( self ): 635 641 """When all metrics have gone, check if any jobs have finished""" 642 643 debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" jobs" ) 636 644 637 645 if self.heartbeat: … … 719 727 720 728 debug_msg( 1, 'Checking database..' ) 721 self.ds.checkStaleJobs() 729 730 global DEBUG_LEVEL 731 732 if DEBUG_LEVEL <= 2: 733 self.ds.checkStaleJobs() 734 722 735 debug_msg( 1, 'Check done.' ) 723 736 debug_msg( 1, 'Checking rrd archive..' ) … … 743 756 744 757 self.clusters[ clustername ] = RRDHandler( self.config, clustername ) 758 759 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" ) 745 760 746 761 def startElement( self, name, attrs ): … … 1048 1063 """Store metrics retained in memory to disk""" 1049 1064 1065 global DEBUG_LEVEL 1066 1050 1067 # Store metrics somewhere between every 360 and 640 seconds 1051 1068 # 1052 STORE_INTERVAL = random.randint( 360, 640 ) 1069 if DEBUG_LEVEL > 2: 1070 #STORE_INTERVAL = 60 1071 STORE_INTERVAL = random.randint( 360, 640 ) 1072 else: 1073 STORE_INTERVAL = random.randint( 360, 640 ) 1053 1074 1054 1075 try: … … 1227 1248 self.rrdm = RRDMutator( RRDTOOL ) 1228 1249 1229 self.gatherLastUpdates() 1250 global DEBUG_LEVEL 1251 1252 if DEBUG_LEVEL <= 2: 1253 self.gatherLastUpdates() 1230 1254 1231 1255 def gatherLastUpdates( self ): … … 1330 1354 if self.checkStoreMetric( host, metric ): 1331 1355 1332 update_list.append( '%s:%s' %( metric['time'], metric['val'] ) ) 1356 u_val = str( metric['time'] ) + ':' + str( metric['val'] ) 1357 #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) ) 1358 update_list.append( u_val ) 1333 1359 1334 1360 return update_list … … 1380 1406 """ 1381 1407 1408 debug_msg( 5, "Entering storeMetrics()") 1409 1410 count_values = 0 1411 count_metrics = 0 1412 count_bits = 0 1413 1414 for hostname, mymetrics in self.myMetrics.items(): 1415 1416 for metricname, mymetric in mymetrics.items(): 1417 1418 count_metrics += 1 1419 1420 for dmetric in mymetric: 1421 1422 count_values += 1 1423 1424 count_bits += len( dmetric['time'] ) 1425 count_bits += len( dmetric['val'] ) 1426 1427 count_bytes = count_bits / 8 1428 1429 debug_msg( 5, "size of cluster '" + self.cluster + "': " + 1430 str( len( self.myMetrics.keys() ) ) + " hosts " + 1431 str( count_metrics ) + " metrics " + str( count_values ) + " values " + 1432 str( count_bits ) + " bits " + str( count_bytes ) + " bytes " ) 1433 1382 1434 for hostname, mymetrics in self.myMetrics.items(): 1383 1435 … … 1437 1489 1438 1490 self.memLastUpdate( hostname, metricname, metrics_to_store ) 1491 1492 debug_msg( 5, "Leaving storeMetrics()") 1439 1493 1440 1494 def makeTimeSerial( self ):
Note: See TracChangeset
for help on using the changeset viewer.