Changeset 40


Ignore:
Timestamp:
04/05/05 12:01:16 (17 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Changed thread locking scheme
  • Added function to remember last stored timestamp so that we will not try to update a rrd more than once with same timestamp
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r39 r40  
    1313import mutex
    1414import random
     15from types import *
    1516
    1617# Specify debugging level here;
     
    1920# 10 = XML: host, cluster, grid, ganglia
    2021# 9  = RRD activity, gmetad config parsing
    21 # 8  = daemon threading
     22# 8  = RRD file activity
     23# 7  = daemon threading
    2224#
    23 DEBUG_LEVEL = 8
     25DEBUG_LEVEL = 9
    2426
    2527# Where is the gmetad.conf located
     
    6870
    6971        def create( self, filename, args ):
    70                 return self.perform( 'create' + ' "' + filename + '"', args )
     72                return self.perform( 'create', '"' + filename + '"', args )
    7173
    7274        def update( self, filename, args ):
    73                 return self.perform( 'update' + ' "' + filename + '"', args )
    74 
    75         def perform( self, action, args ):
     75                return self.perform( 'update', '"' + filename + '"', args )
     76
     77        def perform( self, action, filename, args ):
    7678
    7779                arg_string = None
     80
     81                if type( args ) is not ListType:
     82                        debug_msg( 8, 'Arguments needs to be of type List' )
     83                        return 1
    7884
    7985                for arg in args:
     
    8591                                arg_string = arg_string + ' ' + arg
    8692
    87                 debug_msg( 9, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + arg_string  )
    88 
    89                 for line in os.popen( self.binary + ' ' + action + ' ' + arg_string ).readlines():
     93                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
     94
     95                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
     96
     97                        print line
    9098
    9199                        if line.find( 'ERROR' ) != -1:
    92100
    93101                                error_msg = string.join( line.split( ' ' )[1:] )
    94                                 debug_msg( 9, error_msg )
     102                                debug_msg( 8, error_msg )
    95103                                return 1
    96104
     
    323331                "Store metrics retained in memory to disk"
    324332
    325                 debug_msg( 8, self.printTime() + ' - storethread(): started.' )
     333                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
    326334
    327335                # Store metrics somewhere between every 60 and 180 seconds
    328336                #
    329                 STORE_INTERVAL = random.randint( 60, 180 )
     337                #STORE_INTERVAL = random.randint( 60, 180 )
     338                STORE_INTERVAL = 40
    330339
    331340                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
    332341                storethread.start()
    333342
    334                 debug_msg( 8, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
     343                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
    335344                time.sleep( STORE_INTERVAL )
    336                 debug_msg( 8, self.printTime() + ' - storethread(): Done sleeping.' )
     345                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
    337346
    338347                if storethread.isAlive():
    339348
    340                         debug_msg( 8, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
    341                         parsethread.join( 180 ) # Maximum time is for storing thread to finish
    342                         debug_msg( 8, self.printTime() + ' - storethread(): Done waiting.' )
    343 
    344                 debug_msg( 8, self.printTime() + ' - storethread(): finished.' )
     349                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
     350                        storethread.join( 180 ) # Maximum time is for storing thread to finish
     351                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
     352
     353                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
    345354
    346355                return 0
     
    348357        def storeThread( self ):
    349358
    350                 debug_msg( 8, self.printTime() + ' - storemetricthread(): started.' )
    351                 debug_msg( 8, self.printTime() + ' - storemetricthread(): Storing data..' )
     359                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
     360                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
    352361                ret = self.myHandler.storeMetrics()
    353                 debug_msg( 8, self.printTime() + ' - storemetricthread(): Done storing.' )
    354                 debug_msg( 8, self.printTime() + ' - storemetricthread(): finished.' )
     362                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
     363                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
    355364               
    356365                return ret
     
    359368                "Process XML"
    360369
    361                 debug_msg( 8, self.printTime() + ' - xmlthread(): started.' )
     370                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
    362371
    363372                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
    364373                parsethread.start()
    365374
    366                 debug_msg( 8, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
     375                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
    367376                time.sleep( float( self.config.getLowestInterval() ) ) 
    368                 debug_msg( 8, self.printTime() + ' - xmlthread(): Done sleeping.' )
     377                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
    369378
    370379                if parsethread.isAlive():
    371380
    372                         debug_msg( 8, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
     381                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
    373382                        parsethread.join( 60 ) # Maximum time for XML thread to finish
    374                         debug_msg( 8, self.printTime() + ' - xmlthread(): Done waiting.' )
    375 
    376                 debug_msg( 8, self.printTime() + ' - xmlthread(): finished.' )
     383                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
     384
     385                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
    377386
    378387                return 0
     
    380389        def parseThread( self ):
    381390
    382                 debug_msg( 8, self.printTime() + ' - parsethread(): started.' )
    383                 debug_msg( 8, self.printTime() + ' - parsethread(): Parsing XML..' )
     391                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
     392                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
    384393                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
    385                 debug_msg( 8, self.printTime() + ' - parsethread(): Done parsing.' )
    386                 debug_msg( 8, self.printTime() + ' - parsethread(): finished.' )
     394                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
     395                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
    387396
    388397                return ret
     
    463472
    464473        myMetrics = { }
     474        lastStored = { }
    465475        slot = None
    466476
     
    468478                self.cluster = cluster
    469479                self.config = config
    470                 self.slot = mutex.mutex()
     480                self.slot = threading.Lock()
    471481                self.rrdm = RRDMutator()
    472482
     
    492502                        self.myMetrics[ host ][ metric['name'] ] = [ ]
    493503
    494                 self.slot.testandset()
     504                # Ah, psst, push it
     505                #
     506                # <atomic>
     507                self.slot.acquire()
     508
    495509                self.myMetrics[ host ][ metric['name'] ].append( metric )
    496                 self.slot.unlock()
     510
     511                self.slot.release()
     512                # </atomic>
    497513
    498514        def makeUpdateList( self, host, metricname ):
     
    500516                update_list = [ ]
    501517
    502                 for metric in self.myMetrics[ host ][ metricname ]:
    503 
    504                         update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
     518                while len( self.myMetrics[ host ][ metricname ] ) > 0:
     519
     520                        # Kabouter pop
     521                        #
     522                        # <atomic>     
     523                        self.slot.acquire()
     524
     525                        metric = self.myMetrics[ host ][ metricname ].pop()
     526
     527                        self.slot.release()
     528                        # </atomic>
     529
     530                        if self.checkStoreMetric( host, metricname, metric ):
     531                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
     532                        else:
     533                                print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
    505534
    506535                return update_list
    507536
     537        def checkStoreMetric( self, host, metricname, metric ):
     538
     539                if self.lastStored.has_key( host ):
     540
     541                        if self.lastStored[ host ].has_key( metricname ):
     542
     543                                if self.lastStored[ host ][ metricname ] <= metric['time']:
     544
     545                                        # Allready wrote a value with this timestamp, skip tnx
     546                                        return 0
     547
     548                else:
     549                        self.lastStored[ host ] = { }
     550
     551                self.lastStored[ host ][ metricname ] = metric['time']
     552
     553                return 1
     554
    508555        def storeMetrics( self ):
    509556
     
    513560
    514561                                mytime = self.makeTimeSerial()
    515                                 self.createCheck( hostname, metricname, mytime )       
    516                                 update_ret = self.update( hostname, metricname, mytime )
     562                                correct_serial = self.checkNewRrdPeriod( hostname, mytime )
     563                                self.createCheck( hostname, metricname, correct_serial )       
     564                                update_ret = self.update( hostname, metricname, correct_serial )
    517565
    518566                                if update_ret == 0:
    519567
    520                                         self.slot.testandset()
    521                                         del self.myMetrics[ hostname ][ metricname ]
    522                                         self.slot.unlock()
    523568                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
    524569                                else:
    525570                                        debug_msg( 9, 'metric update failed' )
    526571                                        return 1
     572
     573                                return 1
    527574
    528575        def makeTimeSerial( self ):
Note: See TracChangeset for help on using the changeset viewer.