Changeset 32 for trunk/daemon


Ignore:
Timestamp:
04/01/05 16:50:54 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

Begin of RRD metric handling rewrite to in-memory rrd's

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r31 r32  
    6363        "Parse Ganglia's XML"
    6464
    65         metrics = [ ]
     65        def __init__( self ):
     66                self.metrics = [ ]
     67                self.clusters = { }
    6668
    6769        def startElement( self, name, attrs ):
     
    6971
    7072                if name == 'GANGLIA_XML':
    71                         self.XMLSource = attrs.get('SOURCE',"")
    72                         self.gangliaVersion = attrs.get('VERSION',"")
     73
     74                        self.XMLSource = attrs.get( 'SOURCE', "" )
     75                        self.gangliaVersion = attrs.get( 'VERSION', "" )
     76
    7377                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
    7478
    7579                elif name == 'GRID':
    76                         self.gridName = attrs.get('NAME',"")
    77                         self.time = attrs.get('LOCALTIME',"")
     80
     81                        self.gridName = attrs.get( 'NAME', "" )
     82                        self.time = attrs.get( 'LOCALTIME', "" )
     83
    7884                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
    7985
    8086                elif name == 'CLUSTER':
    81                         self.clusterName = attrs.get('NAME',"")
    82                         self.time = attrs.get('LOCALTIME',"")
    83                         self.rrd = RRDHandler( self.clusterName )
     87
     88                        self.clusterName = attrs.get( 'NAME', "" )
     89                        self.time = attrs.get( 'LOCALTIME', "" )
     90
     91                        self.clusters[ self.clusterName ] = RRDHandler( self.clusterName )
     92
    8493                        debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
    8594
    8695                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
    87                         self.hostName = attrs.get('NAME',"")
    88                         self.hostIp = attrs.get('IP',"")
    89                         self.hostReported = attrs.get('REPORTED',"")
    90                         # Reset the metrics list for each host
    91                         self.metrics = [ ]
     96
     97                        self.hostName = attrs.get( 'NAME', "" )
     98                        self.hostIp = attrs.get( 'IP', "" )
     99                        self.hostReported = attrs.get( 'REPORTED', "" )
     100
    92101                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
    93102
    94103                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
    95                         myMetric = { }
    96                         myMetric['name'] = attrs.get('NAME',"")
    97                         myMetric['val'] = attrs.get('VAL',"")
    98                         myMetric['time'] = self.hostReported
    99                         myMetric['type'] = attrs.get('TYPE',"")
    100 
    101                         self.metrics.append( myMetric )
     104
     105                        type = attrs.get( 'TYPE', "" )
     106
     107                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
     108
     109                                myMetric = { }
     110                                myMetric['name'] = attrs.get( 'NAME', "" )
     111                                myMetric['val'] = attrs.get( 'VAL', "" )
     112                                myMetric['time'] = self.hostReported
     113
     114                                self.clusters[ self.clusterName ].memMetric( self.hostname, myMetric )
     115
    102116                        debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
    103 
    104                 return
    105 
    106         def endElement( self, name ):
    107                 #if name == 'GANGLIA_XML':
    108 
    109                 #if name == 'GRID':
    110 
    111                 #if name == 'CLUSTER':
    112 
    113                 if name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
    114 
    115                         # Determine time here, so all use same time in this run
    116                         mytime = self.rrd.makeTimeSerial()
    117                         correct_serial = self.rrd.checkNewRrdPeriod( self.hostName, mytime )
    118 
    119                         #debug_msg( 8, 'time %s: Storing metrics for %s' %(mytime, self.hostName) )
    120                         print 'Storing metrics for %s:' %(self.hostName),
    121                         self.storeMetrics( self.hostName, correct_serial )
    122 
    123                 #if name == 'METRIC':
    124117
    125118        def storeMetrics( self, hostname, timeserial ):
     
    129122
    130123                                self.rrd.createCheck( hostname, metric, timeserial )   
    131                                 print ' [%s.%s]' %(metric['name'], metric['time']),
    132124                                self.rrd.update( hostname, metric, timeserial )
    133125                                debug_msg( 9, 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] ) )
    134126                                #sys.exit(1)
    135                 print
    136        
    137127
    138128class GangliaXMLGatherer:
     
    156146
    157147                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
     148
    158149                        af, socktype, proto, canonname, sa = res
     150
    159151                        try:
     152
    160153                                self.s = socket.socket( af, socktype, proto )
     154
    161155                        except socket.error, msg:
     156
    162157                                self.s = None
    163158                                continue
     159
    164160                        try:
     161
    165162                                self.s.connect( sa )
     163
    166164                        except socket.error, msg:
     165
    167166                                self.s.close()
    168167                                self.s = None
    169168                                continue
     169
    170170                        break
    171171
    172172                if self.s is None:
     173
    173174                        print 'Could not open socket'
    174175                        sys.exit(1)
     
    186187                #
    187188                pid = os.fork()
     189
    188190                if pid > 0:
    189                         sys.exit(0)  # end parrent
     191
     192                        sys.exit(0)  # end parent
    190193
    191194                # creates a session and sets the process group ID
     
    196199                #
    197200                pid = os.fork()
     201
    198202                if pid > 0:
    199                         sys.exit(0)  # end parrent
     203
     204                        sys.exit(0)  # end parent
    200205
    201206                # Go to the root directory and set the umask
     
    221226                "Main thread"
    222227
    223                 while ( 1 ):
     228                while( 1 ):
    224229
    225230                        debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' )
     
    268273
    269274        def __init__( self, config ):
     275
    270276                self.config = config
    271277                self.parseValues()
    272278
    273         def parseValues(self):
     279        def parseValues( self ):
    274280                "Parse certain values from gmetad.conf"
    275281
     
    291297
    292298                                                for letter in word:
     299
    293300                                                        if letter not in string.digits:
     301
    294302                                                                valid_interval = 0
    295303
    296304                                                if valid_interval and len(word) > 0:
     305
    297306                                                        source['interval'] = word
    298307                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
     
    300309                # No interval found, use Ganglia's default     
    301310                if not source.has_key( 'interval' ):
     311
    302312                        source['interval'] = 15
    303313                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
     
    306316
    307317        def getInterval( self, source_name ):
     318
    308319                for source in self.sources:
     320
    309321                        if source['name'] == source_name:
     322
    310323                                return source['interval']
     324
    311325                return None
    312326
    313327class RRDHandler:
     328
     329        myMetrics = { }
    314330
    315331        def __init__( self, cluster ):
     
    317333                self.gmetad_conf = GangliaConfigParser( GMETAD_CONF )
    318334
     335        def getClusterName( self ):
     336                return self.cluster
     337
     338        def memMetric( self, host, metric ):
     339
     340                for m in self.myMetrics[ host ]:
     341
     342                        if m['time'] == metric['time']:
     343
     344                                # Allready have this metric, abort
     345                                return 1
     346
     347                if not self.myMetrics.has_key( host ):
     348
     349                        self.myMetrics[ host ] = { }
     350
     351                if not self.myMetrics[ host ].has_key( metric['name'] ):
     352
     353                        self.myMetrics[ host ][ metric['name'] ] = [ ]
     354
     355                self.myMetrics[ host ][ metric['name'] ].append( metric )
     356
     357        def makeUpdateString( self, host, metric ):
     358
     359                update_string = ''
     360
     361                for m in self.myMetrics[ host ][ metric['name'] ]:
     362
     363                        update_string = update_string + ' %s:%s' %( metric['time'], metric['val'] )
     364
     365                return update_string
     366
    319367        def makeTimeSerial( self ):
     368                "Generate a time serial. Seconds since epoch"
    320369
    321370                # YYYYMMDDhhmmss: 20050321143411
     
    328377
    329378        def makeRrdPath( self, host, metric=None, timeserial=None ):
     379                """
     380                Make a RRD location/path and filename
     381                If a metric or timeserial are supplied the complete locations
     382                will be made, else just the host directory
     383                """
    330384
    331385                if not timeserial:     
     
    341395
    342396        def getLastRrdTimeSerial( self, host ):
     397                """
     398                Find the last timeserial (directory) for this host
     399                This is determined once every host
     400                """
    343401
    344402                rrd_dir, rrd_file = self.makeRrdPath( host )
     
    347405
    348406                if os.path.exists( rrd_dir ):
     407
    349408                        for root, dirs, files in os.walk( rrd_dir ):
    350409
     
    368427
    369428        def checkNewRrdPeriod( self, host, current_timeserial ):
     429                """
     430                Check if current timeserial belongs to recent time period
     431                or should become a new period (and file).
     432
     433                Returns the serial of the correct time period
     434                """
    370435
    371436                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
     
    426491
    427492                try:
     493
    428494                        rrdtool.update( str(rrd_file), str(update_string) )
     495
    429496                except rrdtool.error, detail:
     497
    430498                        debug_msg( 0, 'EXCEPTION! While trying to update rrd:' )
    431499                        debug_msg( 0, '\trrd %s with %s' %( str(rrd_file), update_string ) )
    432500                        debug_msg( 0, str(detail) )
     501
    433502                        sys.exit( 1 )
    434503               
Note: See TracChangeset for help on using the changeset viewer.