- Timestamp:
- 04/18/05 16:46:55 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r86 r87 20 20 # 9 = RRD activity, gmetad config parsing 21 21 # 8 = RRD file activity 22 # 7 = daemon threading23 22 # 6 = SQL 23 # 1 = daemon threading 24 24 # 25 25 DEBUG_LEVEL = 7 … … 219 219 ids = self.getNodeIds( node_list ) 220 220 221 self.addJobNodes( self.job_id, ids )221 self.addJobNodes( job_id, ids ) 222 222 elif action == 'update': 223 223 … … 370 370 self.myXMLHandler = TorqueXMLHandler() 371 371 self.myXMLError = XMLErrorHandler() 372 self.config = GangliaConfigParser( GMETAD_CONF ) 372 373 373 374 def run( self ): 374 375 """Main XML processing""" 375 376 377 debug_msg( 5, printTime() + ' - torquexmlthread(): started.' ) 378 376 379 while( 1 ): 377 380 378 print 'parse'379 381 self.myXMLSource = self.myXMLGatherer.getFileObject() 382 debug_msg( 1, printTime() + ' - torquexmlthread(): Parsing..' ) 380 383 xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 381 print 'sleep' 382 time.sleep( 1 ) 384 debug_msg( 1, printTime() + ' - torquexmlthread(): Done parsing.' ) 385 debug_msg( 1, printTime() + ' - torquexmlthread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) ) 386 time.sleep( self.config.getLowestInterval() ) 383 387 384 388 class TorqueXMLHandler( xml.sax.handler.ContentHandler ): … … 457 461 # it must have finished, since we _did_ get a new heartbeat 458 462 # 459 if jobinfo['reported'] < self.heartbeat :463 if jobinfo['reported'] < self.heartbeat and jobinfo['status'] == 'R' and jobid not in self.jobs_to_store: 460 464 461 465 self.jobAttrs[ jobid ]['status'] = 'F' … … 464 468 self.jobs_to_store.append( jobid ) 465 469 470 debug_msg( 1, printTime() + ' - torquexmlthread(): Storing..' ) 471 466 472 for jobid in self.jobs_to_store: 467 473 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 474 475 debug_msg( 1, printTime() + ' - torquexmlthread(): Done storing.' ) 468 476 469 477 self.jobs_to_store = [ ] … … 488 496 """ 489 497 498 ignore_changes = [ 'reported' ] 499 490 500 if jobattrs.has_key( jobid ): 491 501 492 502 for valname, value in jobinfo.items(): 493 503 494 if jobattrs[ jobid ].has_key( valname ): 495 496 if value != jobattrs[ jobid ][ valname ]: 497 498 if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat: 499 return 1 500 501 else: 502 return 1 504 if valname not in ignore_changes: 505 506 if jobattrs[ jobid ].has_key( valname ): 507 508 if value != jobattrs[ jobid ][ valname ]: 509 510 if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat: 511 return 1 512 513 else: 514 return 1 503 515 504 516 return 0 … … 776 788 """Actual metric storing thread""" 777 789 778 debug_msg( 7, printTime() + ' - storemetricthread(): started.' )779 debug_msg( 7, printTime() + ' - storemetricthread(): Storing data..' )790 debug_msg( 1, printTime() + ' - storemetricthread(): started.' ) 791 debug_msg( 1, printTime() + ' - storemetricthread(): Storing data..' ) 780 792 ret = self.myXMLHandler.storeMetrics() 781 debug_msg( 7, printTime() + ' - storemetricthread(): Done storing.' )782 debug_msg( 7, printTime() + ' - storemetricthread(): finished.' )793 debug_msg( 1, printTime() + ' - storemetricthread(): Done storing.' ) 794 debug_msg( 1, printTime() + ' - storemetricthread(): finished.' ) 783 795 784 796 return ret … … 787 799 """Process XML""" 788 800 789 debug_msg( 7, printTime() + ' - xmlthread(): started.' )801 debug_msg( 1, printTime() + ' - xmlthread(): started.' ) 790 802 791 803 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) 792 804 parsethread.start() 793 805 794 debug_msg( 7, printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )806 debug_msg( 1, printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 795 807 time.sleep( float( self.config.getLowestInterval() ) ) 796 debug_msg( 7, printTime() + ' - xmlthread(): Done sleeping.' )808 debug_msg( 1, printTime() + ' - xmlthread(): Done sleeping.' ) 797 809 798 810 if parsethread.isAlive(): 799 811 800 debug_msg( 7, printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )812 debug_msg( 1, printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' ) 801 813 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 802 814 debug_msg( 7, printTime() + ' - xmlthread(): Done waiting.' ) 803 815 804 debug_msg( 7, printTime() + ' - xmlthread(): finished.' )816 debug_msg( 1, printTime() + ' - xmlthread(): finished.' ) 805 817 806 818 return 0 … … 809 821 """Actual parsing thread""" 810 822 811 debug_msg( 7, printTime() + ' - parsethread(): started.' )812 debug_msg( 7, printTime() + ' - parsethread(): Parsing XML..' )823 debug_msg( 1, printTime() + ' - parsethread(): started.' ) 824 debug_msg( 1, printTime() + ' - parsethread(): Parsing XML..' ) 813 825 self.myXMLSource = self.myXMLGatherer.getFileObject() 814 826 ret = xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 815 debug_msg( 7, printTime() + ' - parsethread(): Done parsing.' )816 debug_msg( 7, printTime() + ' - parsethread(): finished.' )827 debug_msg( 1, printTime() + ' - parsethread(): Done parsing.' ) 828 debug_msg( 1, printTime() + ' - parsethread(): finished.' ) 817 829 818 830 return ret … … 1264 1276 1265 1277 myTProcessor = TorqueXMLProcessor() 1266 myTProcessor.run()1267 1268 sys.exit( 1 )1269 1270 1278 myGProcessor = GangliaXMLProcessor() 1271 1279 … … 1278 1286 1279 1287 torquexmlthread.start() 1280 #gangliaxmlthread.start()1288 gangliaxmlthread.start() 1281 1289 1282 1290 # Global functions
Note: See TracChangeset
for help on using the changeset viewer.