Changeset 169


Ignore:
Timestamp:
07/12/05 15:44:44 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Added syslog() abilities (for daemon)
  • Fixed daemon() mode (working now)
  • Added threading exception handling
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r155 r169  
    55import socket
    66import sys
     7import syslog
    78import string
    89import os
     
    1415import DBClass
    1516
    16 # Specify debugging level here;
     17# Specify debugging level here (only when _not_ DAEMONIZE)
    1718#
    1819# 11 = XML: metrics
     
    2223# 6  = SQL
    2324# 1  = daemon threading
    24 #
     25# 0  = errors
     26#
     27# default: 0
    2528DEBUG_LEVEL = 1
     29
     30# Enable logging to syslog?
     31#
     32USE_SYSLOG = 1
     33
     34# What level msg'es should be logged to syslog?
     35#
     36# default: lvl 0 (errors)
     37#
     38SYSLOG_LEVEL = 0
     39
     40# Which facility to use in syslog
     41#
     42# Syntax I.e.:
     43#       LOG_KERN, LOG_USER, LOG_MAIL, LOG_DAEMON, LOG_AUTH, LOG_LPR,
     44#       LOG_NEWS, LOG_UUCP, LOG_CRON and LOG_LOCAL0 to LOG_LOCAL7
     45#
     46SYSLOG_FACILITY = syslog.LOG_DAEMON
    2647
    2748# Where is the gmetad.conf located
     
    5879# Wether or not to run as a daemon in background
    5980#
    60 DAEMONIZE = 0
     81DAEMONIZE = 1
    6182
    6283######################
     
    103124                        self.dbc     = DBClass.DB(self.db_vars)
    104125                except DBClass.DBError, details:
    105                         print 'Error in connection to db: %s' %details
     126                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
    106127                        sys.exit(1)
    107128
     
    126147                except DBClass.DBError, detail:
    127148                        operation = statement.split(' ')[0]
    128                         print "%s operation on database failed while performing\n'%s'\n%s"\
    129                                 %(operation, statement, detail)
     149                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
    130150                        sys.exit(1)
    131151
     
    335355        """Skeleton class for XML processor's"""
    336356
    337         def daemon( self ):
    338                 """Run as daemon forever"""
    339 
    340                 # Fork the first child
    341                 #
    342                 pid = os.fork()
    343 
    344                 if pid > 0:
    345 
    346                         sys.exit(0)  # end parent
    347 
    348                 # creates a session and sets the process group ID
    349                 #
    350                 os.setsid()
    351 
    352                 # Fork the second child
    353                 #
    354                 pid = os.fork()
    355 
    356                 if pid > 0:
    357 
    358                         sys.exit(0)  # end parent
    359 
    360                 # Go to the root directory and set the umask
    361                 #
    362                 os.chdir('/')
    363                 os.umask(0)
    364 
    365                 sys.stdin.close()
    366                 sys.stdout.close()
    367                 sys.stderr.close()
    368 
    369                 os.open('/dev/null', 0)
    370                 os.dup(0)
    371                 os.dup(0)
    372 
    373                 self.run()
    374 
    375357        def run( self ):
    376358                """Do main processing of XML here"""
     
    393375                """Main XML processing"""
    394376
    395                 debug_msg( 1, printTime() + ' - torque_xml_thread(): started.' )
     377                debug_msg( 1, 'torque_xml_thread(): started.' )
    396378
    397379                while( 1 ):
    398380
    399381                        self.myXMLSource = self.myXMLGatherer.getFileObject()
    400                         debug_msg( 1, printTime() + ' - torque_xml_thread(): Parsing..' )
     382                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
    401383                        xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
    402                         debug_msg( 1, printTime() + ' - torque_xml_thread(): Done parsing.' )
    403                         debug_msg( 1, printTime() + ' - torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
     384                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
     385                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
    404386                        time.sleep( self.config.getLowestInterval() )
    405387
     
    497479                                        self.jobs_to_store.append( jobid )
    498480
    499                 debug_msg( 1, printTime() + ' - torque_xml_thread(): Storing..' )
     481                debug_msg( 1, 'torque_xml_thread(): Storing..' )
    500482
    501483                for jobid in self.jobs_to_store:
     
    503485                                self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )   
    504486
    505                 debug_msg( 1, printTime() + ' - torque_xml_thread(): Done storing.' )
     487                debug_msg( 1, 'torque_xml_thread(): Done storing.' )
    506488
    507489                self.jobs_processed = [ ]
     
    555537                self.config = config
    556538                self.clusters = { }
    557                 debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
     539                debug_msg( 1, 'Checking existing toga rrd archive..' )
    558540                self.gatherClusters()
    559                 debug_msg( 0, printTime() + ' - Check done.' )
     541                debug_msg( 1, 'Check done.' )
    560542
    561543        def gatherClusters( self ):
     
    647629                """Recoverable error"""
    648630
    649                 debug_msg( 0, 'Recoverable error ' + str( exception ) )
     631                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
    650632
    651633        def fatalError( self, exception ):
     
    656638                # Ignore 'no element found' errors
    657639                if exception_str.find( 'no element found' ) != -1:
    658                         debug_msg( 1, 'No XML data found: probably socket not (re)connected.' )
     640                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
    659641                        return 0
    660642
    661                 debug_msg( 0, 'Non-recoverable error ' + str( exception ) )
     643                debug_msg( 0, 'Non-recoverable XML error ' + str( exception ) )
    662644                sys.exit( 1 )
    663645
     
    710692                if self.s is None:
    711693
    712                         debug_msg( 0, 'Could not open socket' )
     694                        debug_msg( 0, 'ERROR: Could not open socket or unable to connect to datasource!' )
    713695                        sys.exit( 1 )
    714696
     
    776758                                # threaded call to: self.processXML()
    777759                                #
    778                                 xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
    779                                 xml_thread.start()
     760                                try:
     761                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
     762                                        xml_thread.start()
     763                                except threading.error, msg:
     764                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
     765                                        #return 1
    780766
    781767                        if not store_thread.isAlive():
     
    784770                                # threaded call to: self.storeMetrics()
    785771                                #
    786                                 store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
    787                                 store_thread.start()
     772                                try:
     773                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
     774                                        store_thread.start()
     775                                except threading.error, msg:
     776                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
     777                                        #return 1
    788778               
    789779                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
     
    793783                """Store metrics retained in memory to disk"""
    794784
    795                 debug_msg( 1, printTime() + ' - ganglia_store_thread(): started.' )
    796 
    797785                # Store metrics somewhere between every 360 and 640 seconds
    798786                #
    799787                STORE_INTERVAL = random.randint( 360, 640 )
    800788
    801                 store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
    802                 store_metric_thread.start()
    803 
    804                 debug_msg( 1, printTime() + ' - ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
     789                try:
     790                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
     791                        store_metric_thread.start()
     792                except threading.error, msg:
     793                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
     794                        return 1
     795
     796                debug_msg( 1, 'ganglia_store_thread(): started.' )
     797
     798                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
    805799                time.sleep( STORE_INTERVAL )
    806                 debug_msg( 1, printTime() + ' - ganglia_store_thread(): Done sleeping.' )
     800                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
    807801
    808802                if store_metric_thread.isAlive():
    809803
    810                         debug_msg( 1, printTime() + ' - ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
     804                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
    811805                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    812                         debug_msg( 1, printTime() + ' - ganglia_store_thread(): Done waiting.' )
    813 
    814                 debug_msg( 1, printTime() + ' - ganglia_store_thread(): finished.' )
     806                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
     807
     808                debug_msg( 1, 'ganglia_store_thread(): finished.' )
    815809
    816810                return 0
     
    819813                """Actual metric storing thread"""
    820814
    821                 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): started.' )
    822                 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): Storing data..' )
     815                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
     816                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
    823817                ret = self.myXMLHandler.storeMetrics()
    824                 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): Done storing.' )
    825                 debug_msg( 1, printTime() + ' - ganglia_store_metric_thread(): finished.' )
     818                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
     819                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
    826820               
    827821                return ret
     
    830824                """Process XML"""
    831825
    832                 debug_msg( 1, printTime() + ' - xmlthread(): started.' )
    833 
    834                 parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
    835                 parsethread.start()
    836 
    837                 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
     826                try:
     827                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
     828                        parsethread.start()
     829                except threading.error, msg:
     830                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
     831                        return 1
     832
     833                debug_msg( 1, 'ganglia_xml_thread(): started.' )
     834
     835                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
    838836                time.sleep( float( self.config.getLowestInterval() ) ) 
    839                 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Done sleeping.' )
     837                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
    840838
    841839                if parsethread.isAlive():
    842840
    843                         debug_msg( 1, printTime() + ' - ganglia_xml_thread(): parsethread() still running, waiting to finish..' )
     841                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
    844842                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    845                         debug_msg( 1, printTime() + ' - ganglia_xml_thread(): Done waiting.' )
    846 
    847                 debug_msg( 1, printTime() + ' - ganglia_xml_thread(): finished.' )
     843                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
     844
     845                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
    848846
    849847                return 0
     
    852850                """Actual parsing thread"""
    853851
    854                 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): started.' )
    855                 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): Parsing XML..' )
     852                debug_msg( 1, 'ganglia_parse_thread(): started.' )
     853                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
    856854                self.myXMLSource = self.myXMLGatherer.getFileObject()
    857855                ret = xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
    858                 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): Done parsing.' )
    859                 debug_msg( 1, printTime() + ' - ganglia_parse_thread(): finished.' )
     856                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
     857                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
    860858
    861859                return ret
     
    12481246                                os.makedirs( rrd_dir )
    12491247
    1250                         except OSError, msg:
     1248                        except os.OSError, msg:
    12511249
    12521250                                if msg.find( 'File exists' ) != -1:
     
    13041302                return 0
    13051303
     1304def daemon():
     1305        """daemonized threading"""
     1306
     1307        print 'daemon 1'
     1308        # Fork the first child
     1309        #
     1310        pid = os.fork()
     1311
     1312        print 'daemon 2'
     1313        if pid > 0:
     1314
     1315                sys.exit(0)  # end parent
     1316
     1317        print 'daemon 3'
     1318        # creates a session and sets the process group ID
     1319        #
     1320        os.setsid()
     1321
     1322        # Fork the second child
     1323        #
     1324        pid = os.fork()
     1325
     1326        print 'daemon 4'
     1327        if pid > 0:
     1328
     1329                sys.exit(0)  # end parent
     1330
     1331        print 'daemon 5'
     1332        # Go to the root directory and set the umask
     1333        #
     1334        os.chdir('/')
     1335        os.umask(0)
     1336
     1337        print 'daemon 6'
     1338        sys.stdin.close()
     1339        sys.stdout.close()
     1340        sys.stderr.close()
     1341
     1342        os.open('/dev/null', 0)
     1343        os.dup(0)
     1344        os.dup(0)
     1345
     1346        debug_msg( 0, 'daemon started.' )
     1347        run()
     1348
     1349def run():
     1350        """Threading start"""
     1351
     1352        myTorqueProcessor = TorqueXMLProcessor()
     1353        myGangliaProcessor = GangliaXMLProcessor()
     1354
     1355        try:
     1356                torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
     1357                ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
     1358
     1359                torque_xml_thread.start()
     1360                ganglia_xml_thread.start()
     1361               
     1362        except threading.error, msg:
     1363                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
     1364                syslog.closelog()
     1365                sys.exit(1)
     1366               
     1367        debug_msg( 0, 'main threading started.' )
     1368
    13061369def main():
    13071370        """Program startup"""
    13081371
    1309         myTorqueProcessor = TorqueXMLProcessor()
    1310         myGangliaProcessor = GangliaXMLProcessor()
     1372        if( DAEMONIZE and USE_SYSLOG ):
     1373                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
    13111374
    13121375        if DAEMONIZE:
    1313                 torque_xml_thread = threading.Thread( None, myTorqueProcessor.daemon, 'torque_proc_thread' )
    1314                 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.daemon, 'ganglia_proc_thread' )
     1376                daemon()
    13151377        else:
    1316                 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
    1317                 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
    1318 
    1319         torque_xml_thread.start()
    1320         ganglia_xml_thread.start()
    1321 
     1378                run()
     1379
     1380#
    13221381# Global functions
     1382#
    13231383
    13241384def check_dir( directory ):
     
    13311391
    13321392def debug_msg( level, msg ):
    1333         """Only print msg if it is not below our debug level"""
    1334 
    1335         if (DEBUG_LEVEL >= level):
    1336                 sys.stderr.write( msg + '\n' )
     1393        """Only print msg if correct levels"""
     1394
     1395        if (not DAEMONIZE and DEBUG_LEVEL >= level):
     1396                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
     1397       
     1398        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
     1399                syslog.syslog( msg )
    13371400
    13381401def printTime( ):
Note: See TracChangeset for help on using the changeset viewer.