Changeset 47


Ignore:
Timestamp:
04/07/05 11:53:52 (17 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Timeout's for thread can now be set in a variable
  • While grabbing all last update times on metric rrds now also make a list of all available timeserial periods
  • Try to determine the correct timeserial period for each value of a metric, and call createcheck and update for corresponding values
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r46 r47  
    5555UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
    5656
     57# Maximum time (in seconds) a parsethread may run
     58#
     59PARSE_TIMEOUT = 60
     60
     61# Maximum time (in seconds) a storethread may run
     62#
     63STORE_TIMEOUT = 360
     64
    5765"""
    5866This is TOrque-GAnglia's data Daemon
     
    382390
    383391                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
    384                         storethread.join( 180 ) # Maximum time is for storing thread to finish
     392                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    385393                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
    386394
     
    414422
    415423                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
    416                         parsethread.join( 180 ) # Maximum time for XML thread to finish
     424                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    417425                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
    418426
     
    507515        myMetrics = { }
    508516        lastStored = { }
     517        timeserials = { }
    509518        slot = None
    510519
     
    540549                for host in hosts:
    541550
     551                        host_dir = cluster_dir + '/' + host
     552                        dirlist = os.listdir( host_dir )
     553
     554                        for dir in dirlist:
     555
     556                                if not self.timeserials.has_key( host ):
     557
     558                                        self.timeserials[ host ] = [ ]
     559
     560                                self.timeserials[ host ].append( dir )
     561
    542562                        last_serial = self.getLastRrdTimeSerial( host )
    543563                        if last_serial:
     
    589609                # </atomic>
    590610
    591         def makeUpdateList( self, host, metricname ):
     611        def makeUpdateList( self, host, metriclist ):
    592612
    593613                update_list = [ ]
    594614                metric = None
    595615
    596                 while len( self.myMetrics[ host ][ metricname ] ) > 0:
     616                while len( metriclist ) > 0:
    597617
    598618                        # Kabouter pop
    599619                        #
    600620                        # <atomic>     
    601                         self.slot.acquire()
     621                        #self.slot.acquire()
    602622
    603623                        # len might have changed since loop start
    604624                        #
    605                         if len( self.myMetrics[ host ][ metricname ] ) > 0:
    606                                 metric = self.myMetrics[ host ][ metricname ].pop( 0 )
    607 
    608                         self.slot.release()
     625                        if len( metriclist ) > 0:
     626                                metric = metriclist.pop( 0 )
     627
     628                        #self.slot.release()
    609629                        # </atomic>
    610630
    611631                        if metric:
    612                                 if self.checkStoreMetric( host, metricname, metric ):
     632                                if self.checkStoreMetric( host, metric ):
    613633                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
    614                                 #else:
    615                                         #print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
    616634
    617635                return update_list
     
    621639                if self.lastStored.has_key( host ):
    622640
    623                         if self.lastStored[ host ].has_key( metricname ):
    624 
    625                                 if metric['time'] <= self.lastStored[ host ][ metricname ]:
     641                        if self.lastStored[ host ].has_key( metric['name'] ):
     642
     643                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
    626644
    627645                                        # Allready wrote a value with this timestamp, skip tnx
     
    631649                        self.lastStored[ host ] = { }
    632650
    633                 self.lastStored[ host ][ metricname ] = metric['time']
     651                self.lastStored[ host ][ metric['name'] ] = metric['time']
    634652
    635653                return 1
     
    641659                        for metricname, mymetric in mymetrics.items():
    642660
    643                                 mytime = self.makeTimeSerial()
    644                                 correct_serial = self.checkNewRrdPeriod( hostname, mytime )
    645                                 self.createCheck( hostname, metricname, correct_serial )       
    646                                 update_ret = self.update( hostname, metricname, correct_serial )
    647 
    648                                 if update_ret == 0:
    649 
    650                                         debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
    651                                 else:
    652                                         debug_msg( 9, 'metric update failed' )
    653 
    654                                 #sys.exit(1)
     661                                #mytime = self.makeTimeSerial()
     662                                #serial = mymetric['time']
     663                                #correct_serial = self.checkNewRrdPeriod( hostname, mytime )
     664
     665                                self.slot.acquire()
     666
     667                                # Create a mapping table, each metric to the period where it should be stored
     668                                #
     669                                metric_serial_table = self.determineSerials( hostname, metricname, mymetric )
     670                                self.myMetrics[ hostname ][ metricname ] = [ ]
     671
     672                                self.slot.release()
     673
     674                                for period, pmetric in metric_serial_table.items():
     675
     676                                        self.createCheck( hostname, metricname, period )       
     677
     678                                        update_ret = self.update( hostname, metricname, period, pmetric )
     679
     680                                        if update_ret == 0:
     681
     682                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
     683                                        else:
     684                                                debug_msg( 9, 'metric update failed' )
     685
     686                                sys.exit(1)
    655687
    656688        def makeTimeSerial( self ):
     
    686718                """
    687719
    688                 rrd_dir, rrd_file = self.makeRrdPath( host )
    689 
    690720                newest_timeserial = 0
    691721
    692                 if os.path.exists( rrd_dir ):
    693 
    694                         dirlist = os.listdir( rrd_dir )
    695 
    696                         for dir in dirlist:
    697 
    698                                 valid_dir = 1
    699 
    700                                 for letter in dir:
    701                                         if letter not in string.digits:
    702                                                 valid_dir = 0
    703 
    704                                 if valid_dir:
    705                                         timeserial = dir
    706                                         if timeserial > newest_timeserial:
    707                                                 newest_timeserial = timeserial
     722                for dir in self.timeserials[ host ]:
     723
     724                        valid_dir = 1
     725
     726                        for letter in dir:
     727                                if letter not in string.digits:
     728                                        valid_dir = 0
     729
     730                        if valid_dir:
     731                                timeserial = dir
     732                                if timeserial > newest_timeserial:
     733                                        newest_timeserial = timeserial
    708734
    709735                if newest_timeserial:
     
    712738                        return 0
    713739
     740        def determinePeriod( self, host, check_serial ):
     741
     742                period_serial = 0
     743
     744                for serial in self.timeserials[ host ]:
     745
     746                        if check_serial >= serial and period_serial < serial:
     747
     748                                period_serial = serial
     749
     750                return period_serial
     751
     752        def determineSerials( self, host, metricname, metriclist ):
     753                """
     754                Determine the correct serial and corresponding rrd to store
     755                for a list of metrics
     756                """
     757
     758                metric_serial_table = { }
     759
     760                for metric in metriclist:
     761
     762                        if metric['name'] == metricname:
     763
     764                                period = self.determinePeriod( host, metric['time'] )   
     765
     766                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
     767
     768                                if (metric['time'] - period) > archive_secs:
     769
     770                                        # This one should get it's own new period
     771                                        period = metric['time']
     772
     773                                if not metric_serial_table.has_key( period ):
     774
     775                                        metric_serial_table = [ ]
     776
     777                                metric_serial_table[ period ].append( metric )
     778
     779                print metric_serial_table
     780
     781                return metric_serial_table
     782
    714783        def checkNewRrdPeriod( self, host, current_timeserial ):
    715784                """
     
    729798                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
    730799
    731                         if (current_timeserial - last_timeserial) >= archive_secs:
     800                        if (current_timeserial - last_timeserial) > archive_secs:
    732801                                serial = current_timeserial
    733802                        else:
     
    753822
    754823                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
    755 
     824               
    756825                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
    757826
     
    763832
    764833                        interval = self.config.getInterval( self.cluster )
    765                         heartbeat = 8 * int(interval)
     834                        heartbeat = 8 * int( interval )
    766835
    767836                        params = [ ]
     
    771840
    772841                        params.append( '--start' )
    773                         params.append( str( int( self.getFirstTime( host, metricname ) ) - 1 ) )
     842                        params.append( str( int( timeserial ) - 1 ) )
    774843
    775844                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
     
    780849                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
    781850
    782         def update( self, host, metricname, timeserial ):
     851        def update( self, host, metricname, timeserial, metriclist ):
    783852
    784853                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
     
    786855                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
    787856
    788                 update_list = self.makeUpdateList( host, metricname )
     857                update_list = self.makeUpdateList( host, metriclist )
    789858
    790859                if len( update_list ) > 0:
Note: See TracChangeset for help on using the changeset viewer.