Changeset 38 for trunk


Ignore:
Timestamp:
04/05/05 09:42:57 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

Code cleanup and config for testing

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r37 r38  
    1212import threading
    1313import mutex
     14import random
    1415
    1516# Specify debugging level here;
    1617#
    17 # <=11 = metric XML
    18 # <=10 = host,cluster,grid,ganglia XML
    19 # <=9  = RRD activity,gmetad config parsing
    20 # <=8  = host processing
    21 # <=7  = daemon threading - NOTE: Daemon will 'halt on all errors' from this level
    22 #
    23 DEBUG_LEVEL = 10
     18# 11 = XML: metrics
     19# 10 = XML: host, cluster, grid, ganglia
     20# 9  = RRD activity, gmetad config parsing
     21# 8  = daemon threading
     22#
     23DEBUG_LEVEL = 8
    2424
    2525# Where is the gmetad.conf located
     
    3737# Amount of hours to store in one single archived .rrd
    3838#
    39 ARCHIVE_HOURS_PER_RRD = 12
     39ARCHIVE_HOURS_PER_RRD = 1
    4040
    4141# Wether or not to run as a daemon in background
     
    5858
    5959class RRDMutator:
     60        "A class for handling .rrd mutations"
    6061
    6162        binary = '/usr/bin/rrdtool'
     
    8485                                arg_string = arg_string + ' ' + arg
    8586
    86                 debug_msg( 7, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + arg_string  )
     87                debug_msg( 9, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + arg_string  )
    8788
    8889                for line in os.popen( self.binary + ' ' + action + ' ' + arg_string ).readlines():
     
    9192
    9293                                error_msg = string.join( line.split( ' ' )[1:] )
    93                                 debu_msg( 7, error_msg )
     94                                debug_msg( 9, error_msg )
    9495                                return 1
    9596
     
    158159                for clustername, rrdh in self.clusters.items():
    159160
    160                         print 'storing metrics of cluster ' + clustername
    161                         rrdh.storeMetrics()
     161                        ret = rrdh.storeMetrics()
     162
     163                        if ret:
     164                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
     165                                return 1
     166
     167                return 0
    162168
    163169class GangliaXMLGatherer:
     
    221227                "Connect, and return a file object"
    222228
    223                 self.disconnect()
    224                 self.connect()
     229                if self.s:
     230                        # Apearantly, only data is received when a connection is made
     231                        # therefor, disconnect and connect
     232                        #
     233                        self.disconnect()
     234                        self.connect()
    225235
    226236                return self.s.makefile( 'r' )
     
    283293                return time.strftime("%a %d %b %Y %H:%M:%S")
    284294
    285         def grabXML( self ):
    286 
    287                 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' )
    288                 pid = os.fork()
    289 
    290                 if pid == 0:
    291                         # Child - XML Thread
    292                         #
    293                         # Process XML and exit
    294 
    295                         debug_msg( 7, self.printTime() + ' - xmlthread()  - Start XML processing..' )
    296                         self.processXML()
    297                         debug_msg( 7, self.printTime() + ' - xmlthread()  - Done processing; exiting.' )
    298                         sys.exit( 0 )
    299 
    300                 elif pid > 0:
    301                         # Parent - Time/sleep Thread
    302                         #
    303                         # Make sure XML is processed on time and at regular intervals
    304 
    305                         debug_msg( 7, self.printTime() + ' - mainthread() - Sleep '+ str( self.config.getLowestInterval() ) +'s: zzzzz..' )
    306                         time.sleep( float( self.config.getLowestInterval() ) )
    307                         debug_msg( 7, self.printTime() + ' - mainthread() - Awoken: waiting for XML thread..' )
    308 
    309                         r = os.wait()
    310                         ret = r[1]
    311                         if ret != 0:
    312                                 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: ERROR! xmlthread() exited with status %d' %(ret) )
    313                                 if DEBUG_LEVEL>=7: sys.exit( 1 )
    314                         else:
    315 
    316                                 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: xmlthread() finished succesfully' )
    317 
    318295        def run( self ):
    319296                "Main thread"
    320 
    321                 #self.processXML()
    322                 #self.storeMetrics()
    323 
    324                 #time.sleep( 20 )
    325 
    326                 #self.processXML()
    327                 #self.storeMetrics()
    328 
    329                 #sys.exit(1)
    330297
    331298                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
     
    340307                                #
    341308                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
    342                                 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' )
     309                                debug_msg( 8, self.printTime() + ' - mainthread() - xmlthread() started' )
    343310                                xmlthread.start()
    344311
     
    349316                                #
    350317                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
    351                                 debug_msg( 7, self.printTime() + ' - mainthread() - storethread() started' )
     318                                debug_msg( 8, self.printTime() + ' - mainthread() - storethread() started' )
    352319                                storethread.start()
    353320               
     
    358325                "Store metrics retained in memory to disk"
    359326
    360                 STORE_INTERVAL = 5
    361 
    362                 debug_msg( 7, self.printTime() + ' - storethread() - Storing data..' )
     327                # Store metrics somewhere between every 60 and 180 seconds
     328                #
     329                STORE_INTERVAL = random.randint( 60, 180 )
     330
     331                debug_msg( 8, self.printTime() + ' - storethread() - storemetricthread() started: Storing data..' )
    363332
    364333                # threaded call to: self.myHandler.storeMetrics()
     
    367336                storethread.start()
    368337
    369                 debug_msg( 7, self.printTime() + ' - storethread() - Sleeping.. (%ss)' %STORE_INTERVAL )
     338                debug_msg( 8, self.printTime() + ' - storethread() - Sleeping.. (%ss)' %STORE_INTERVAL )
    370339                time.sleep( STORE_INTERVAL )
    371                 debug_msg( 7, self.printTime() + ' - storethread() - Done sleeping.' )
     340                debug_msg( 8, self.printTime() + ' - storethread() - Done sleeping.' )
    372341
    373342                if storethread.isAlive():
    374343
    375                         debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() still running, waiting to finish..' )
     344                        debug_msg( 8, self.printTime() + ' - storethread() - storemetricthread() still running, waiting to finish..' )
    376345                        parsethread.join( 180 ) # Maximum time is 3 minutes for storing thread to finish - more then enough
    377                         debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() finished.' )
    378 
    379                 debug_msg( 7, self.printTime() + ' - storethread() finished' )
     346
     347                debug_msg( 8, self.printTime() + ' - storethread() - storemetricthread() finished.' )
     348
     349                debug_msg( 8, self.printTime() + ' - storethread() finished' )
    380350
    381351                return 0
     
    384354                "Process XML"
    385355
    386                 debug_msg( 7, self.printTime() + ' - xmlthread() - Parsing..' )
     356                debug_msg( 8, self.printTime() + ' - xmlthread() - parsethread() started: Parsing..' )
    387357
    388358                # threaded call to: self.myParser.parse( self.myXMLGatherer.getFileObject() )
     
    391361                parsethread.start()
    392362
    393                 debug_msg( 7, self.printTime() + ' - xmlthread() - Sleeping.. (%ss)' %self.config.getLowestInterval() )
     363                debug_msg( 8, self.printTime() + ' - xmlthread() - Sleeping.. (%ss)' %self.config.getLowestInterval() )
    394364                time.sleep( float( self.config.getLowestInterval() ) ) 
    395                 debug_msg( 7, self.printTime() + ' - xmlthread() - Done sleeping.' )
     365                debug_msg( 8, self.printTime() + ' - xmlthread() - Done sleeping.' )
    396366
    397367                if parsethread.isAlive():
    398368
    399                         debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() still running, waiting to finish..' )
     369                        debug_msg( 8, self.printTime() + ' - xmlthread() - parsethread() still running, waiting to finish..' )
    400370                        parsethread.join( 180 ) # Maximum time is 3 minutes for XML thread to finish - more then enough
    401                         debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() finished.' )
    402 
    403                 debug_msg( 7, self.printTime() + ' - xmlthread() finished.' )
     371
     372                debug_msg( 8, self.printTime() + ' - xmlthread() - parsethread() finished.' )
     373
     374                debug_msg( 8, self.printTime() + ' - xmlthread() finished.' )
    404375
    405376                return 0
     
    513484                self.slot.unlock()
    514485
    515         def makeUpdateString( self, host, metricname ):
    516 
    517                 update_string = None
    518 
    519                 print self.myMetrics[ host ][ metricname ]
    520 
    521                 for m in self.myMetrics[ host ][ metricname ]:
    522 
    523                         print m
    524 
    525                         if not update_string:
    526                                 update_string = '%s:%s' %( m['time'], m['val'] )
    527                         else:
    528                                 update_string = update_string + ' %s:%s' %( m['time'], m['val'] )
    529 
    530                 return update_string
    531 
    532486        def makeUpdateList( self, host, metricname ):
    533487
     
    558512                                else:
    559513                                        debug_msg( 9, 'metric update failed' )
    560 
    561                                 return 1
     514                                        return 1
    562515
    563516        def makeTimeSerial( self ):
     
    693646                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
    694647
    695                 #timestamp = metric['time']
    696                 #val = metric['val']
    697 
    698                 #update_string = '%s:%s' %(timestamp, val)
    699648                update_list = self.makeUpdateList( host, metricname )
    700649
    701                 #try:
    702 
    703                 self.rrdm.update( str(rrd_file), update_list )
    704 
    705                 #except rrdtool.error, detail:
    706 
    707                 #       debug_msg( 0, 'EXCEPTION! While trying to update rrd:' )
    708                 #       debug_msg( 0, '\trrd %s with %s' %( str(rrd_file), update_string ) )
    709                 #       debug_msg( 0, str(detail) )
    710 
    711                 #       return 1
     650                ret = self.rrdm.update( str(rrd_file), update_list )
     651
     652                if ret:
     653                        return 1
    712654               
    713655                debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
Note: See TracChangeset for help on using the changeset viewer.