Changeset 36


Ignore:
Timestamp:
04/04/05 16:05:26 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Rewrote multithreaded
  • Error occurs when trying to update a RRD with multiple values
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r35 r36  
    1111import time
    1212import re
     13import threading
     14import mutex
    1315
    1416# Specify debugging level here;
     
    181183                "Connect, and return a file object"
    182184
    183                 if not self.s:
    184                         self.connect()
     185                self.disconnect()
     186                self.connect()
    185187
    186188                return self.s.makefile( 'r' )
     
    230232                sys.stdin.close()
    231233                sys.stdout.close()
    232                 sys.stderr.close()
     234                #sys.stderr.close()
    233235
    234236                os.open('/dev/null', 0)
     
    279281                "Main thread"
    280282
    281                 # Daemonized not working yet
    282                 if DAEMONIZE:
    283                         pid = os.fork()
    284 
    285                         # Handle XML grabbing in Child
    286                         if pid == 0:
    287 
    288                                 while( 1 ):
    289                                         self.grabXML()
    290 
    291                         # Do scheduled RRD storing in Parent
    292                         #elif pid > ):
    293 
    294                 else:
    295                         #self.grabXML()
    296                         self.processXML()
    297                         self.storeMetrics()
     283                #self.processXML()
     284                #self.storeMetrics()
     285
     286                #time.sleep( 20 )
     287
     288                #self.processXML()
     289                #self.storeMetrics()
     290
     291                #sys.exit(1)
     292
     293                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
     294                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
     295
     296                while( 1 ):
     297
     298                        if not xmlthread.isAlive():
     299                                # Gather XML at the same interval as gmetad
     300
     301                                # threaded call to: self.processXML()
     302                                #
     303                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
     304                                debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' )
     305                                xmlthread.start()
     306
     307                        if not storethread.isAlive():
     308                                # Store metrics every .. sec
     309
     310                                # threaded call to: self.storeMetrics()
     311                                #
     312                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
     313                                debug_msg( 7, self.printTime() + ' - mainthread() - storethread() started' )
     314                                storethread.start()
     315               
     316                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
     317                        time.sleep( 1 )
    298318
    299319        def storeMetrics( self ):
    300320                "Store metrics retained in memory to disk"
    301321
    302                 self.myHandler.storeMetrics()
     322                STORE_INTERVAL = 30
     323
     324                debug_msg( 7, self.printTime() + ' - storethread() - Storing data..' )
     325
     326                # threaded call to: self.myHandler.storeMetrics()
     327                #
     328                storethread = threading.Thread( None, self.myHandler.storeMetrics(), 'storemetricthread' )
     329                storethread.start()
     330
     331                debug_msg( 7, self.printTime() + ' - storethread() - Sleeping.. (%ss)' %STORE_INTERVAL )
     332                time.sleep( STORE_INTERVAL )
     333                debug_msg( 7, self.printTime() + ' - storethread() - Done sleeping.' )
     334
     335                if storethread.isAlive():
     336
     337                        debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() still running, waiting to finish..' )
     338                        parsethread.join( 180 ) # Maximum time is 3 minutes for storing thread to finish - more then enough
     339                        debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() finished.' )
     340
     341                debug_msg( 7, self.printTime() + ' - storethread() finished' )
     342
     343                return 0
    303344
    304345        def processXML( self ):
    305346                "Process XML"
    306347
    307                 self.myParser.parse( self.myXMLGatherer.getFileObject() )
     348                debug_msg( 7, self.printTime() + ' - xmlthread() - Parsing..' )
     349
     350                # threaded call to: self.myParser.parse( self.myXMLGatherer.getFileObject() )
     351                #
     352                parsethread = threading.Thread( None, self.myParser.parse, 'parsethread', [ self.myXMLGatherer.getFileObject() ] )
     353                parsethread.start()
     354
     355                debug_msg( 7, self.printTime() + ' - xmlthread() - Sleeping.. (%ss)' %self.config.getLowestInterval() )
     356                time.sleep( float( self.config.getLowestInterval() ) ) 
     357                debug_msg( 7, self.printTime() + ' - xmlthread() - Done sleeping.' )
     358
     359                if parsethread.isAlive():
     360
     361                        debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() still running, waiting to finish..' )
     362                        parsethread.join( 180 ) # Maximum time is 3 minutes for XML thread to finish - more then enough
     363                        debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() finished.' )
     364
     365                debug_msg( 7, self.printTime() + ' - xmlthread() finished.' )
     366
     367                return 0
    308368
    309369class GangliaConfigParser:
     
    382442
    383443        myMetrics = { }
     444        slot = None
    384445
    385446        def __init__( self, config, cluster ):
    386447                self.cluster = cluster
    387448                self.config = config
     449                self.slot = mutex.mutex()
    388450
    389451        def getClusterName( self ):
     
    408470                        self.myMetrics[ host ][ metric['name'] ] = [ ]
    409471
    410 
     472                self.slot.testandset()
    411473                self.myMetrics[ host ][ metric['name'] ].append( metric )
     474                self.slot.unlock()
    412475
    413476        def makeUpdateString( self, host, metricname ):
     
    429492                                mytime = self.makeTimeSerial()
    430493                                self.createCheck( hostname, metricname, mytime )       
    431                                 update_okay = self.update( hostname, metricname, mytime )
    432 
    433                                 if not update_okay:
    434 
     494                                update_ret = self.update( hostname, metricname, mytime )
     495
     496                                if update_ret == 0:
     497
     498                                        self.slot.testandset()
    435499                                        del self.myMetrics[ hostname ][ metricname ]
     500                                        self.slot.unlock()
    436501                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
    437502                                else:
    438503                                        debug_msg( 9, 'metric update failed' )
    439504
    440                                 sys.exit(1)
     505                                return 1
    441506
    442507        def makeTimeSerial( self ):
     
    590655                debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), update_string ) )
    591656
     657                return 0
     658
    592659def main():
    593660        "Program startup"
Note: See TracChangeset for help on using the changeset viewer.