Changeset 169 for trunk/daemon
- Timestamp:
- 07/12/05 15:44:44 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r155 r169 5 5 import socket 6 6 import sys 7 import syslog 7 8 import string 8 9 import os … … 14 15 import DBClass 15 16 16 # Specify debugging level here ;17 # Specify debugging level here (only when _not_ DAEMONIZE) 17 18 # 18 19 # 11 = XML: metrics … … 22 23 # 6 = SQL 23 24 # 1 = daemon threading 24 # 25 # 0 = errors 26 # 27 # default: 0 25 28 DEBUG_LEVEL = 1 29 30 # Enable logging to syslog? 31 # 32 USE_SYSLOG = 1 33 34 # What level msg'es should be logged to syslog? 35 # 36 # default: lvl 0 (errors) 37 # 38 SYSLOG_LEVEL = 0 39 40 # Which facility to use in syslog 41 # 42 # Syntax I.e.: 43 # LOG_KERN, LOG_USER, LOG_MAIL, LOG_DAEMON, LOG_AUTH, LOG_LPR, 44 # LOG_NEWS, LOG_UUCP, LOG_CRON and LOG_LOCAL0 to LOG_LOCAL7 45 # 46 SYSLOG_FACILITY = syslog.LOG_DAEMON 26 47 27 48 # Where is the gmetad.conf located … … 58 79 # Wether or not to run as a daemon in background 59 80 # 60 DAEMONIZE = 081 DAEMONIZE = 1 61 82 62 83 ###################### … … 103 124 self.dbc = DBClass.DB(self.db_vars) 104 125 except DBClass.DBError, details: 105 print 'Error in connection to db: %s' %details126 debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) ) 106 127 sys.exit(1) 107 128 … … 126 147 except DBClass.DBError, detail: 127 148 operation = statement.split(' ')[0] 128 print "%s operation on database failed while performing\n'%s'\n%s"\ 129 %(operation, statement, detail) 149 debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) ) 130 150 sys.exit(1) 131 151 … … 335 355 """Skeleton class for XML processor's""" 336 356 337 def daemon( self ):338 """Run as daemon forever"""339 340 # Fork the first child341 #342 pid = os.fork()343 344 if pid > 0:345 346 sys.exit(0) # end parent347 348 # creates a session and sets the process group ID349 #350 os.setsid()351 352 # Fork the second child353 #354 pid = os.fork()355 356 if pid > 0:357 358 sys.exit(0) # end parent359 360 # Go to the root directory and set the umask361 #362 os.chdir('/')363 os.umask(0)364 365 sys.stdin.close()366 sys.stdout.close()367 sys.stderr.close()368 369 os.open('/dev/null', 0)370 os.dup(0)371 os.dup(0)372 373 self.run()374 375 357 def run( self ): 376 358 """Do main processing of XML here""" … … 393 375 """Main XML processing""" 394 376 395 debug_msg( 1, printTime() + ' -torque_xml_thread(): started.' )377 debug_msg( 1, 'torque_xml_thread(): started.' ) 396 378 397 379 while( 1 ): 398 380 399 381 self.myXMLSource = self.myXMLGatherer.getFileObject() 400 debug_msg( 1, printTime() + ' -torque_xml_thread(): Parsing..' )382 debug_msg( 1, 'torque_xml_thread(): Parsing..' ) 401 383 xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 402 debug_msg( 1, printTime() + ' -torque_xml_thread(): Done parsing.' )403 debug_msg( 1, printTime() + ' -torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )384 debug_msg( 1, 'torque_xml_thread(): Done parsing.' ) 385 debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) ) 404 386 time.sleep( self.config.getLowestInterval() ) 405 387 … … 497 479 self.jobs_to_store.append( jobid ) 498 480 499 debug_msg( 1, printTime() + ' -torque_xml_thread(): Storing..' )481 debug_msg( 1, 'torque_xml_thread(): Storing..' ) 500 482 501 483 for jobid in self.jobs_to_store: … … 503 485 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 504 486 505 debug_msg( 1, printTime() + ' -torque_xml_thread(): Done storing.' )487 debug_msg( 1, 'torque_xml_thread(): Done storing.' ) 506 488 507 489 self.jobs_processed = [ ] … … 555 537 self.config = config 556 538 self.clusters = { } 557 debug_msg( 0, printTime() + ' -Checking existing toga rrd archive..' )539 debug_msg( 1, 'Checking existing toga rrd archive..' ) 558 540 self.gatherClusters() 559 debug_msg( 0, printTime() + ' -Check done.' )541 debug_msg( 1, 'Check done.' ) 560 542 561 543 def gatherClusters( self ): … … 647 629 """Recoverable error""" 648 630 649 debug_msg( 0, 'Recoverable error ' + str( exception ))631 debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' ) 650 632 651 633 def fatalError( self, exception ): … … 656 638 # Ignore 'no element found' errors 657 639 if exception_str.find( 'no element found' ) != -1: 658 debug_msg( 1, 'No XML data found: probably socket not (re)connected.' )640 debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' ) 659 641 return 0 660 642 661 debug_msg( 0, 'Non-recoverable error ' + str( exception ) )643 debug_msg( 0, 'Non-recoverable XML error ' + str( exception ) ) 662 644 sys.exit( 1 ) 663 645 … … 710 692 if self.s is None: 711 693 712 debug_msg( 0, ' Could not open socket' )694 debug_msg( 0, 'ERROR: Could not open socket or unable to connect to datasource!' ) 713 695 sys.exit( 1 ) 714 696 … … 776 758 # threaded call to: self.processXML() 777 759 # 778 xml_thread = threading.Thread( None, self.processXML, 'xml_thread' ) 779 xml_thread.start() 760 try: 761 xml_thread = threading.Thread( None, self.processXML, 'xml_thread' ) 762 xml_thread.start() 763 except threading.error, msg: 764 debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg)) 765 #return 1 780 766 781 767 if not store_thread.isAlive(): … … 784 770 # threaded call to: self.storeMetrics() 785 771 # 786 store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' ) 787 store_thread.start() 772 try: 773 store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' ) 774 store_thread.start() 775 except threading.error, msg: 776 debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg)) 777 #return 1 788 778 789 779 # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway … … 793 783 """Store metrics retained in memory to disk""" 794 784 795 debug_msg( 1, printTime() + ' - ganglia_store_thread(): started.' )796 797 785 # Store metrics somewhere between every 360 and 640 seconds 798 786 # 799 787 STORE_INTERVAL = random.randint( 360, 640 ) 800 788 801 store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' ) 802 store_metric_thread.start() 803 804 debug_msg( 1, printTime() + ' - ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 789 try: 790 store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' ) 791 store_metric_thread.start() 792 except threading.error, msg: 793 debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) ) 794 return 1 795 796 debug_msg( 1, 'ganglia_store_thread(): started.' ) 797 798 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 805 799 time.sleep( STORE_INTERVAL ) 806 debug_msg( 1, printTime() + ' -ganglia_store_thread(): Done sleeping.' )800 debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' ) 807 801 808 802 if store_metric_thread.isAlive(): 809 803 810 debug_msg( 1, printTime() + ' -ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )804 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' ) 811 805 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 812 debug_msg( 1, printTime() + ' -ganglia_store_thread(): Done waiting.' )813 814 debug_msg( 1, printTime() + ' -ganglia_store_thread(): finished.' )806 debug_msg( 1, 'ganglia_store_thread(): Done waiting.' ) 807 808 debug_msg( 1, 'ganglia_store_thread(): finished.' ) 815 809 816 810 return 0 … … 819 813 """Actual metric storing thread""" 820 814 821 debug_msg( 1, printTime() + ' -ganglia_store_metric_thread(): started.' )822 debug_msg( 1, printTime() + ' -ganglia_store_metric_thread(): Storing data..' )815 debug_msg( 1, 'ganglia_store_metric_thread(): started.' ) 816 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' ) 823 817 ret = self.myXMLHandler.storeMetrics() 824 debug_msg( 1, printTime() + ' -ganglia_store_metric_thread(): Done storing.' )825 debug_msg( 1, printTime() + ' -ganglia_store_metric_thread(): finished.' )818 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' ) 819 debug_msg( 1, 'ganglia_store_metric_thread(): finished.' ) 826 820 827 821 return ret … … 830 824 """Process XML""" 831 825 832 debug_msg( 1, printTime() + ' - xmlthread(): started.' ) 833 834 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) 835 parsethread.start() 836 837 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 826 try: 827 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) 828 parsethread.start() 829 except threading.error, msg: 830 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) ) 831 return 1 832 833 debug_msg( 1, 'ganglia_xml_thread(): started.' ) 834 835 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 838 836 time.sleep( float( self.config.getLowestInterval() ) ) 839 debug_msg( 1, printTime() + ' -ganglia_xml_thread(): Done sleeping.' )837 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' ) 840 838 841 839 if parsethread.isAlive(): 842 840 843 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): parsethread() still running, waiting to finish..')841 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT ) 844 842 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 845 debug_msg( 1, printTime() + ' -ganglia_xml_thread(): Done waiting.' )846 847 debug_msg( 1, printTime() + ' -ganglia_xml_thread(): finished.' )843 debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' ) 844 845 debug_msg( 1, 'ganglia_xml_thread(): finished.' ) 848 846 849 847 return 0 … … 852 850 """Actual parsing thread""" 853 851 854 debug_msg( 1, printTime() + ' -ganglia_parse_thread(): started.' )855 debug_msg( 1, printTime() + ' -ganglia_parse_thread(): Parsing XML..' )852 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 853 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 856 854 self.myXMLSource = self.myXMLGatherer.getFileObject() 857 855 ret = xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 858 debug_msg( 1, printTime() + ' -ganglia_parse_thread(): Done parsing.' )859 debug_msg( 1, printTime() + ' -ganglia_parse_thread(): finished.' )856 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 857 debug_msg( 1, 'ganglia_parse_thread(): finished.' ) 860 858 861 859 return ret … … 1248 1246 os.makedirs( rrd_dir ) 1249 1247 1250 except OSError, msg:1248 except os.OSError, msg: 1251 1249 1252 1250 if msg.find( 'File exists' ) != -1: … … 1304 1302 return 0 1305 1303 1304 def daemon(): 1305 """daemonized threading""" 1306 1307 print 'daemon 1' 1308 # Fork the first child 1309 # 1310 pid = os.fork() 1311 1312 print 'daemon 2' 1313 if pid > 0: 1314 1315 sys.exit(0) # end parent 1316 1317 print 'daemon 3' 1318 # creates a session and sets the process group ID 1319 # 1320 os.setsid() 1321 1322 # Fork the second child 1323 # 1324 pid = os.fork() 1325 1326 print 'daemon 4' 1327 if pid > 0: 1328 1329 sys.exit(0) # end parent 1330 1331 print 'daemon 5' 1332 # Go to the root directory and set the umask 1333 # 1334 os.chdir('/') 1335 os.umask(0) 1336 1337 print 'daemon 6' 1338 sys.stdin.close() 1339 sys.stdout.close() 1340 sys.stderr.close() 1341 1342 os.open('/dev/null', 0) 1343 os.dup(0) 1344 os.dup(0) 1345 1346 debug_msg( 0, 'daemon started.' ) 1347 run() 1348 1349 def run(): 1350 """Threading start""" 1351 1352 myTorqueProcessor = TorqueXMLProcessor() 1353 myGangliaProcessor = GangliaXMLProcessor() 1354 1355 try: 1356 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' ) 1357 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 1358 1359 torque_xml_thread.start() 1360 ganglia_xml_thread.start() 1361 1362 except threading.error, msg: 1363 debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) ) 1364 syslog.closelog() 1365 sys.exit(1) 1366 1367 debug_msg( 0, 'main threading started.' ) 1368 1306 1369 def main(): 1307 1370 """Program startup""" 1308 1371 1309 myTorqueProcessor = TorqueXMLProcessor()1310 myGangliaProcessor = GangliaXMLProcessor()1372 if( DAEMONIZE and USE_SYSLOG ): 1373 syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY ) 1311 1374 1312 1375 if DAEMONIZE: 1313 torque_xml_thread = threading.Thread( None, myTorqueProcessor.daemon, 'torque_proc_thread' ) 1314 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.daemon, 'ganglia_proc_thread' ) 1376 daemon() 1315 1377 else: 1316 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' ) 1317 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 1318 1319 torque_xml_thread.start() 1320 ganglia_xml_thread.start() 1321 1378 run() 1379 1380 # 1322 1381 # Global functions 1382 # 1323 1383 1324 1384 def check_dir( directory ): … … 1331 1391 1332 1392 def debug_msg( level, msg ): 1333 """Only print msg if it is not below our debug level""" 1334 1335 if (DEBUG_LEVEL >= level): 1336 sys.stderr.write( msg + '\n' ) 1393 """Only print msg if correct levels""" 1394 1395 if (not DAEMONIZE and DEBUG_LEVEL >= level): 1396 sys.stderr.write( printTime() + ' - ' + msg + '\n' ) 1397 1398 if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level): 1399 syslog.syslog( msg ) 1337 1400 1338 1401 def printTime( ):
Note: See TracChangeset
for help on using the changeset viewer.