Changeset 102 for trunk/daemon/togad.py
- Timestamp:
- 05/12/05 10:21:25 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r100 r102 23 23 # 1 = daemon threading 24 24 # 25 DEBUG_LEVEL = 725 DEBUG_LEVEL = 1 26 26 27 27 # Where is the gmetad.conf located … … 388 388 """Main XML processing""" 389 389 390 debug_msg( 5, printTime() + ' - torquexmlthread(): started.' )390 debug_msg( 1, printTime() + ' - torque_xml_thread(): started.' ) 391 391 392 392 while( 1 ): 393 393 394 394 self.myXMLSource = self.myXMLGatherer.getFileObject() 395 debug_msg( 1, printTime() + ' - torque xmlthread(): Parsing..' )395 debug_msg( 1, printTime() + ' - torque_xml_thread(): Parsing..' ) 396 396 xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 397 debug_msg( 1, printTime() + ' - torque xmlthread(): Done parsing.' )398 debug_msg( 1, printTime() + ' - torque xmlthread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )397 debug_msg( 1, printTime() + ' - torque_xml_thread(): Done parsing.' ) 398 debug_msg( 1, printTime() + ' - torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) ) 399 399 time.sleep( self.config.getLowestInterval() ) 400 400 … … 462 462 self.jobs_to_store.append( job_id ) 463 463 464 debug_msg( 0, 'jobinfo for job %s has changed' %job_id )464 debug_msg( 6, 'jobinfo for job %s has changed' %job_id ) 465 465 else: 466 466 self.jobAttrs[ job_id ] = jobinfo … … 469 469 self.jobs_to_store.append( job_id ) 470 470 471 debug_msg( 0, 'jobinfo for job %s has changed' %job_id )471 debug_msg( 6, 'jobinfo for job %s has changed' %job_id ) 472 472 473 473 def endDocument( self ): … … 480 480 # 481 481 mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] ) 482 482 483 if mytime < self.heartbeat and jobid not in self.jobs_processed and jobinfo['status'] == 'R': 483 484 … … 491 492 self.jobs_to_store.append( jobid ) 492 493 493 debug_msg( 1, printTime() + ' - torque xmlthread(): Storing..' )494 debug_msg( 1, printTime() + ' - torque_xml_thread(): Storing..' ) 494 495 495 496 for jobid in self.jobs_to_store: 496 497 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 497 498 498 debug_msg( 1, printTime() + ' - torque xmlthread(): Done storing.' )499 debug_msg( 1, printTime() + ' - torque_xml_thread(): Done storing.' ) 499 500 500 501 self.jobs_processed = [ ] … … 759 760 """Main XML processing; start a xml and storethread""" 760 761 761 xml thread = threading.Thread( None, self.processXML, 'xmlthread' )762 store thread = threading.Thread( None, self.storeMetrics, 'storethread' )762 xml_thread = threading.Thread( None, self.processXML, 'xmlthread' ) 763 store_thread = threading.Thread( None, self.storeMetrics, 'storethread' ) 763 764 764 765 while( 1 ): 765 766 766 if not xml thread.isAlive():767 if not xml_thread.isAlive(): 767 768 # Gather XML at the same interval as gmetad 768 769 769 770 # threaded call to: self.processXML() 770 771 # 771 xml thread = threading.Thread( None, self.processXML, 'xmlthread' )772 xml thread.start()773 774 if not store thread.isAlive():772 xml_thread = threading.Thread( None, self.processXML, 'xml_thread' ) 773 xml_thread.start() 774 775 if not store_thread.isAlive(): 775 776 # Store metrics every .. sec 776 777 777 778 # threaded call to: self.storeMetrics() 778 779 # 779 store thread = threading.Thread( None, self.storeMetrics, 'storethread' )780 store thread.start()780 store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' ) 781 store_thread.start() 781 782 782 783 # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway … … 786 787 """Store metrics retained in memory to disk""" 787 788 788 debug_msg( 7, printTime() + ' - storethread(): started.' )789 debug_msg( 1, printTime() + ' - ganglia_store_thread(): started.' ) 789 790 790 791 # Store metrics somewhere between every 360 and 640 seconds … … 792 793 STORE_INTERVAL = random.randint( 360, 640 ) 793 794 794 store thread = threading.Thread( None, self.storeThread, 'storemetricthread' )795 store thread.start()796 797 debug_msg( 7, printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )795 store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' ) 796 store_metric_thread.start() 797 798 debug_msg( 1, printTime() + ' - ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 798 799 time.sleep( STORE_INTERVAL ) 799 debug_msg( 7, printTime() + ' - storethread(): Done sleeping.' )800 801 if store thread.isAlive():802 803 debug_msg( 7, printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )800 debug_msg( 1, printTime() + ' - ganglia_store_thread(): Done sleeping.' ) 801 802 if store_metric_thread.isAlive(): 803 804 debug_msg( 1, printTime() + ' - ganglia_store_thread(): storemetricthread() still running, waiting to finish..' ) 804 805 storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 805 debug_msg( 7, printTime() + ' - storethread(): Done waiting.' )806 807 debug_msg( 7, printTime() + ' - storethread(): finished.' )806 debug_msg( 1, printTime() + ' - ganglia_store_thread(): Done waiting.' ) 807 808 debug_msg( 1, printTime() + ' - ganglia_store_thread(): finished.' ) 808 809 809 810 return 0 … … 812 813 """Actual metric storing thread""" 813 814 814 debug_msg( 1, printTime() + ' - storemetricthread(): started.' )815 debug_msg( 1, printTime() + ' - storemetricthread(): Storing data..' )815 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): started.' ) 816 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): Storing data..' ) 816 817 ret = self.myXMLHandler.storeMetrics() 817 debug_msg( 1, printTime() + ' - storemetricthread(): Done storing.' )818 debug_msg( 1, printTime() + ' - storemetricthread(): finished.' )818 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): Done storing.' ) 819 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): finished.' ) 819 820 820 821 return ret … … 828 829 parsethread.start() 829 830 830 debug_msg( 1, printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )831 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 831 832 time.sleep( float( self.config.getLowestInterval() ) ) 832 debug_msg( 1, printTime() + ' - xmlthread(): Done sleeping.' )833 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Done sleeping.' ) 833 834 834 835 if parsethread.isAlive(): 835 836 836 debug_msg( 1, printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )837 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): parsethread() still running, waiting to finish..' ) 837 838 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 838 debug_msg( 7, printTime() + ' - xmlthread(): Done waiting.' )839 840 debug_msg( 1, printTime() + ' - xmlthread(): finished.' )839 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Done waiting.' ) 840 841 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): finished.' ) 841 842 842 843 return 0 … … 845 846 """Actual parsing thread""" 846 847 847 debug_msg( 1, printTime() + ' - parsethread(): started.' )848 debug_msg( 1, printTime() + ' - parsethread(): Parsing XML..' )848 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): started.' ) 849 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): Parsing XML..' ) 849 850 self.myXMLSource = self.myXMLGatherer.getFileObject() 850 851 ret = xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 851 debug_msg( 1, printTime() + ' - parsethread(): Done parsing.' )852 debug_msg( 1, printTime() + ' - parsethread(): finished.' )852 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): Done parsing.' ) 853 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): finished.' ) 853 854 854 855 return ret … … 1299 1300 """Program startup""" 1300 1301 1301 myT Processor = TorqueXMLProcessor()1302 myG Processor = GangliaXMLProcessor()1302 myTorqueProcessor = TorqueXMLProcessor() 1303 myGangliaProcessor = GangliaXMLProcessor() 1303 1304 1304 1305 if DAEMONIZE: 1305 torque xmlthread = threading.Thread( None, myTProcessor.daemon, 'tprocxmlthread' )1306 ganglia xmlthread = threading.Thread( None, myGProcessor.daemon, 'gprocxmlthread' )1306 torque_xml_thread = threading.Thread( None, myTorqueProcessor.daemon, 'torque_proc_thread' ) 1307 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.daemon, 'ganglia_proc_thread' ) 1307 1308 else: 1308 torque xmlthread = threading.Thread( None, myTProcessor.run, 'tprocxmlthread' )1309 ganglia xmlthread = threading.Thread( None, myGProcessor.run, 'gprocxmlthread' )1310 1311 torque xmlthread.start()1312 ganglia xmlthread.start()1309 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' ) 1310 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 1311 1312 torque_xml_thread.start() 1313 ganglia_xml_thread.start() 1313 1314 1314 1315 # Global functions
Note: See TracChangeset
for help on using the changeset viewer.