Changeset 54


Ignore:
Timestamp:
04/08/05 12:11:16 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Several bugfixes
  • Now more than 1 storethread is possible
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r53 r54  
    2323# 7  = daemon threading
    2424#
    25 DEBUG_LEVEL = 7
     25DEBUG_LEVEL = 8
    2626
    2727# Where is the gmetad.conf located
     
    6363STORE_TIMEOUT = 360
    6464
     65# Number of storing threads
     66#
     67STORE_THREADS = 1
    6568"""
    6669This is TOrque-GAnglia's data Daemon
     
    116119                                arg_string = arg_string + ' ' + arg
    117120
    118                 debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
     121                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
    119122
    120123                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
     
    134137                self.config = config
    135138                self.clusters = { }
    136                 debug_msg( 0, printTime() + ' Checking existing toga rrd archive..' )
     139                debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
    137140                self.gatherClusters()
    138                 debug_msg( 0, printTime() + ' Check done.' )
     141                debug_msg( 0, printTime() + ' - Check done.' )
    139142
    140143        def gatherClusters( self ):
     
    349352
    350353                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
    351                 storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
     354
     355                storethreads = [ ]
     356
     357                for num in range( int( STORE_THREADS ) ):
     358
     359                        storethreads.append( threading.Thread( None, self.storeMetrics, 'storethread' ) )
    352360
    353361                while( 1 ):
     
    361369                                xmlthread.start()
    362370
    363                         if not storethread.isAlive():
    364                                 # Store metrics every .. sec
    365 
    366                                 # threaded call to: self.storeMetrics()
    367                                 #
    368                                 storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
    369                                 storethread.start()
     371                        for storethread in storethreads:
     372
     373                                mythread = storethreads.index( storethread )
     374
     375                                if not storethread.isAlive():
     376
     377                                        # Store metrics every .. sec
     378
     379                                        # threaded call to: self.storeMetrics()
     380                                        #
     381                                        storethreads[ mythread ] = threading.Thread( None, self.storeMetrics, 'storethread' )
     382                                        storethreads[ mythread ].start()
    370383               
    371384                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
     
    379392                # Store metrics somewhere between every 60 and 180 seconds
    380393                #
    381                 STORE_INTERVAL = random.randint( 360, 640 )
     394                #STORE_INTERVAL = random.randint( 360, 640 )
     395                STORE_INTERVAL = 60
    382396
    383397                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
     
    631645                return 1
    632646
    633         def memLastUpdate( self, host, metriclist ):
    634 
    635                 last_update_time = 0
    636 
    637                 for metric in metriclist:
    638 
    639                         if metric['time'] > last_update_time:
    640 
    641                                 last_update_time = metric['time']
     647        def memLastUpdate( self, host, metricname, metriclist ):
    642648
    643649                if not self.lastStored.has_key( host ):
    644650                        self.lastStored[ host ] = { }
    645651
    646                 if self.lastStored[ host ].has_key( metric['name'] ):
     652                last_update_time = 0
     653
     654                for metric in metriclist:
     655
     656                        if metric['name'] == metricname:
     657
     658                                if metric['time'] > last_update_time:
     659
     660                                        last_update_time = metric['time']
     661
     662                if self.lastStored[ host ].has_key( metricname ):
    647663                       
    648                         if last_update_time <= self.lastStored[ host ][ metric['name'] ]:
     664                        if last_update_time <= self.lastStored[ host ][ metricname ]:
    649665                                return 1
    650666
    651                 self.lastStored[ host ][ metric['name'] ] = last_update_time
     667                self.lastStored[ host ][ metricname ] = last_update_time
    652668
    653669        def storeMetrics( self ):
     
    663679                                self.slot.acquire()
    664680
    665                                 while len( mymetric ) > 0:
    666 
    667                                         metrics_to_store.append( mymetric.pop( 0 ) )
     681                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     682
     683                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     684                                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
    668685
    669686                                self.slot.release()
     
    674691                                #
    675692                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
    676                                 self.myMetrics[ hostname ][ metricname ] = [ ]
    677693
    678694                                update_rets = [ ]
     
    694710                                if not (1) in update_rets:
    695711
    696                                         self.memLastUpdate( hostname, mymetric )
     712                                        self.memLastUpdate( hostname, metricname, metrics_to_store )
    697713
    698714        def makeTimeSerial( self ):
Note: See TracChangeset for help on using the changeset viewer.