source: trunk/daemon/togad.py @ 47

Last change on this file since 47 was 47, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Timeout's for thread can now be set in a variable
  • While grabbing all last update times on metric rrds now also make a list of all available timeserial periods
  • Try to determine the correct timeserial period for each value of a metric, and call createcheck and update for corresponding values
File size: 20.5 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[3]16
[8]17# Specify debugging level here;
18#
[38]19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
[40]22# 8  = RRD file activity
23# 7  = daemon threading
[8]24#
[46]25DEBUG_LEVEL = 7
[6]26
[9]27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
[13]39# Amount of hours to store in one single archived .rrd
[9]40#
[43]41ARCHIVE_HOURS_PER_RRD = 12
[13]42
[22]43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
[17]47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
[13]52
[17]53# What XML data types not to store
[13]54#
[17]55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]56
[47]57# Maximum time (in seconds) a parsethread may run
58#
59PARSE_TIMEOUT = 60
60
61# Maximum time (in seconds) a storethread may run
62#
63STORE_TIMEOUT = 360
64
[8]65"""
66This is TOrque-GAnglia's data Daemon
67"""
68
[37]69class RRDMutator:
[38]70        "A class for handling .rrd mutations"
[37]71
72        binary = '/usr/bin/rrdtool'
73
74        def __init__( self, binary=None ):
75
76                if binary:
77                        self.binary = binary
78
79        def create( self, filename, args ):
[40]80                return self.perform( 'create', '"' + filename + '"', args )
[37]81
82        def update( self, filename, args ):
[40]83                return self.perform( 'update', '"' + filename + '"', args )
[37]84
[42]85        def grabLastUpdate( self, filename ):
86
87                last_update = 0
88
89                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
90
91                        if line.find( 'last_update') != -1:
92
93                                last_update = line.split( ' = ' )[1]
94
95                if last_update:
96                        return last_update
97                else:
98                        return 0
99
[40]100        def perform( self, action, filename, args ):
[37]101
102                arg_string = None
103
[40]104                if type( args ) is not ListType:
105                        debug_msg( 8, 'Arguments needs to be of type List' )
106                        return 1
107
[37]108                for arg in args:
109
110                        if not arg_string:
111
112                                arg_string = arg
113                        else:
114                                arg_string = arg_string + ' ' + arg
115
[40]116                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]117
[40]118                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]119
120                        if line.find( 'ERROR' ) != -1:
121
122                                error_msg = string.join( line.split( ' ' )[1:] )
[40]123                                debug_msg( 8, error_msg )
[37]124                                return 1
125
126                return 0
127
[6]128class GangliaXMLHandler( ContentHandler ):
[8]129        "Parse Ganglia's XML"
[3]130
[33]131        def __init__( self, config ):
132                self.config = config
[35]133                self.clusters = { }
[46]134                debug_msg( 0, printTime() + ' Gathering cluster timeserials and RRD last updates..' )
[44]135                self.gatherClusters()
[46]136                debug_msg( 0, printTime() + ' Done gathering timeserials and last updates.' )
[33]137
[44]138        def gatherClusters( self ):
139
[45]140                archive_dir = check_dir(ARCHIVE_PATH)
[44]141
142                hosts = [ ]
143
144                if os.path.exists( archive_dir ):
145
146                        dirlist = os.listdir( archive_dir )
147
148                        for item in dirlist:
149
150                                clustername = item
151
152                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
153
154                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
155
[6]156        def startElement( self, name, attrs ):
[8]157                "Store appropriate data from xml start tags"
[3]158
[7]159                if name == 'GANGLIA_XML':
[32]160
161                        self.XMLSource = attrs.get( 'SOURCE', "" )
162                        self.gangliaVersion = attrs.get( 'VERSION', "" )
163
[12]164                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]165
[7]166                elif name == 'GRID':
[32]167
168                        self.gridName = attrs.get( 'NAME', "" )
169                        self.time = attrs.get( 'LOCALTIME', "" )
170
[12]171                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]172
[7]173                elif name == 'CLUSTER':
[32]174
175                        self.clusterName = attrs.get( 'NAME', "" )
176                        self.time = attrs.get( 'LOCALTIME', "" )
177
[35]178                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
[32]179
[34]180                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]181
[35]182                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]183
[14]184                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
[32]185
186                        self.hostName = attrs.get( 'NAME', "" )
187                        self.hostIp = attrs.get( 'IP', "" )
188                        self.hostReported = attrs.get( 'REPORTED', "" )
189
[12]190                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]191
[14]192                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
[6]193
[32]194                        type = attrs.get( 'TYPE', "" )
[6]195
[32]196                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]197
[32]198                                myMetric = { }
199                                myMetric['name'] = attrs.get( 'NAME', "" )
200                                myMetric['val'] = attrs.get( 'VAL', "" )
201                                myMetric['time'] = self.hostReported
[3]202
[34]203                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]204
[34]205                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]206
[34]207        def storeMetrics( self ):
[9]208
[34]209                for clustername, rrdh in self.clusters.items():
[16]210
[38]211                        ret = rrdh.storeMetrics()
[9]212
[38]213                        if ret:
214                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
215                                return 1
216
217                return 0
218
[5]219class GangliaXMLGatherer:
[8]220        "Setup a connection and file object to Ganglia's XML"
[3]221
[8]222        s = None
223
224        def __init__( self, host, port ):
225                "Store host and port for connection"
226
[5]227                self.host = host
228                self.port = port
[33]229                self.connect()
[3]230
[33]231        def connect( self ):
232                "Setup connection to XML source"
[8]233
234                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]235
[5]236                        af, socktype, proto, canonname, sa = res
[32]237
[5]238                        try:
[32]239
[8]240                                self.s = socket.socket( af, socktype, proto )
[32]241
[5]242                        except socket.error, msg:
[32]243
[8]244                                self.s = None
[5]245                                continue
[32]246
[5]247                        try:
[32]248
[8]249                                self.s.connect( sa )
[32]250
[5]251                        except socket.error, msg:
[32]252
[8]253                                self.s.close()
254                                self.s = None
[5]255                                continue
[32]256
[5]257                        break
[3]258
[8]259                if self.s is None:
[32]260
[33]261                        debug_msg( 0, 'Could not open socket' )
262                        sys.exit( 1 )
[5]263
[33]264        def disconnect( self ):
265                "Close socket"
266
267                if self.s:
268                        self.s.close()
269                        self.s = None
270
271        def __del__( self ):
272                "Kill the socket before we leave"
273
274                self.disconnect()
275
276        def getFileObject( self ):
277                "Connect, and return a file object"
278
[38]279                if self.s:
280                        # Apearantly, only data is received when a connection is made
281                        # therefor, disconnect and connect
282                        #
283                        self.disconnect()
284                        self.connect()
[33]285
[8]286                return self.s.makefile( 'r' )
[5]287
[8]288class GangliaXMLProcessor:
[5]289
[33]290        def __init__( self ):
291                "Setup initial XML connection and handlers"
292
293                self.config = GangliaConfigParser( GMETAD_CONF )
294
295                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
296                self.myParser = make_parser()   
297                self.myHandler = GangliaXMLHandler( self.config )
298                self.myParser.setContentHandler( self.myHandler )
299
[9]300        def daemon( self ):
[8]301                "Run as daemon forever"
[5]302
[8]303                self.DAEMON = 1
[5]304
[8]305                # Fork the first child
306                #
307                pid = os.fork()
[32]308
[8]309                if pid > 0:
[7]310
[32]311                        sys.exit(0)  # end parent
312
[8]313                # creates a session and sets the process group ID
314                #
315                os.setsid()
[7]316
[8]317                # Fork the second child
318                #
319                pid = os.fork()
[32]320
[8]321                if pid > 0:
[5]322
[32]323                        sys.exit(0)  # end parent
324
[8]325                # Go to the root directory and set the umask
326                #
327                os.chdir('/')
328                os.umask(0)
329
330                sys.stdin.close()
331                sys.stdout.close()
[36]332                #sys.stderr.close()
[8]333
334                os.open('/dev/null', 0)
335                os.dup(0)
336                os.dup(0)
337
338                self.run()
339
[22]340        def printTime( self ):
[33]341                "Print current time in human readable format"
[22]342
[33]343                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]344
[9]345        def run( self ):
[8]346                "Main thread"
347
[36]348                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
349                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]350
[36]351                while( 1 ):
352
353                        if not xmlthread.isAlive():
354                                # Gather XML at the same interval as gmetad
355
356                                # threaded call to: self.processXML()
357                                #
358                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
359                                xmlthread.start()
360
361                        if not storethread.isAlive():
362                                # Store metrics every .. sec
363
364                                # threaded call to: self.storeMetrics()
365                                #
366                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
367                                storethread.start()
368               
369                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
370                        time.sleep( 1 ) 
371
[33]372        def storeMetrics( self ):
373                "Store metrics retained in memory to disk"
[22]374
[40]375                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]376
[38]377                # Store metrics somewhere between every 60 and 180 seconds
378                #
[44]379                #STORE_INTERVAL = random.randint( 360, 640 )
380                STORE_INTERVAL = 16
[22]381
[39]382                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]383                storethread.start()
384
[40]385                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]386                time.sleep( STORE_INTERVAL )
[40]387                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]388
389                if storethread.isAlive():
390
[40]391                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
[47]392                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[40]393                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]394
[40]395                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]396
397                return 0
398
[39]399        def storeThread( self ):
400
[40]401                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
402                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]403                ret = self.myHandler.storeMetrics()
[40]404                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
405                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]406               
407                return ret
408
[8]409        def processXML( self ):
410                "Process XML"
411
[40]412                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]413
[39]414                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]415                parsethread.start()
416
[40]417                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]418                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]419                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]420
421                if parsethread.isAlive():
422
[40]423                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[47]424                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[40]425                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]426
[40]427                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]428
429                return 0
430
[39]431        def parseThread( self ):
432
[40]433                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
434                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]435                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]436                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
437                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]438
439                return ret
440
[9]441class GangliaConfigParser:
442
[34]443        sources = [ ]
[9]444
445        def __init__( self, config ):
[32]446
[9]447                self.config = config
448                self.parseValues()
449
[32]450        def parseValues( self ):
[9]451                "Parse certain values from gmetad.conf"
452
453                readcfg = open( self.config, 'r' )
454
455                for line in readcfg.readlines():
456
457                        if line.count( '"' ) > 1:
458
[10]459                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]460
[11]461                                        source = { }
462                                        source['name'] = line.split( '"' )[1]
[9]463                                        source_words = line.split( '"' )[2].split( ' ' )
464
465                                        for word in source_words:
466
467                                                valid_interval = 1
468
469                                                for letter in word:
[32]470
[9]471                                                        if letter not in string.digits:
[32]472
[9]473                                                                valid_interval = 0
474
[10]475                                                if valid_interval and len(word) > 0:
[32]476
[9]477                                                        source['interval'] = word
[12]478                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]479       
480                                        # No interval found, use Ganglia's default     
481                                        if not source.has_key( 'interval' ):
482                                                source['interval'] = 15
483                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]484
[33]485                                        self.sources.append( source )
[9]486
487        def getInterval( self, source_name ):
[32]488
[9]489                for source in self.sources:
[32]490
[12]491                        if source['name'] == source_name:
[32]492
[9]493                                return source['interval']
[32]494
[9]495                return None
496
[34]497        def getLowestInterval( self ):
498
499                lowest_interval = 0
500
501                for source in self.sources:
502
503                        if not lowest_interval or source['interval'] <= lowest_interval:
504
505                                lowest_interval = source['interval']
506
507                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
508                if lowest_interval:
509                        return lowest_interval
510                else:
511                        return 15
512
[9]513class RRDHandler:
514
[32]515        myMetrics = { }
[40]516        lastStored = { }
[47]517        timeserials = { }
[36]518        slot = None
[32]519
[33]520        def __init__( self, config, cluster ):
[44]521                self.block = 0
[9]522                self.cluster = cluster
[33]523                self.config = config
[40]524                self.slot = threading.Lock()
[37]525                self.rrdm = RRDMutator()
[42]526                self.gatherLastUpdates()
[9]527
[44]528        def isBlocking( self ):
529
530                return self.block
531
[42]532        def gatherLastUpdates( self ):
533                "Populate the lastStored list, containing timestamps of all last updates"
534
[44]535                self.block = 1
536
[42]537                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
538
539                hosts = [ ]
540
541                if os.path.exists( cluster_dir ):
542
[44]543                        dirlist = os.listdir( cluster_dir )
[42]544
[44]545                        for dir in dirlist:
[42]546
[44]547                                hosts.append( dir )
[42]548
549                for host in hosts:
550
[47]551                        host_dir = cluster_dir + '/' + host
552                        dirlist = os.listdir( host_dir )
553
554                        for dir in dirlist:
555
556                                if not self.timeserials.has_key( host ):
557
558                                        self.timeserials[ host ] = [ ]
559
560                                self.timeserials[ host ].append( dir )
561
[42]562                        last_serial = self.getLastRrdTimeSerial( host )
563                        if last_serial:
564
565                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
566                                if os.path.exists( metric_dir ):
567
[44]568                                        dirlist = os.listdir( metric_dir )
[42]569
[44]570                                        for file in dirlist:
[42]571
[44]572                                                metricname = file.split( '.rrd' )[0]
[42]573
[44]574                                                if not self.lastStored.has_key( host ):
[42]575
[44]576                                                        self.lastStored[ host ] = { }
[42]577
[44]578                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]579
[32]580        def getClusterName( self ):
581                return self.cluster
582
583        def memMetric( self, host, metric ):
584
[34]585                if self.myMetrics.has_key( host ):
[32]586
[34]587                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]588
[34]589                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]590
[34]591                                        if mymetric['time'] == metric['time']:
[32]592
[34]593                                                # Allready have this metric, abort
594                                                return 1
595                        else:
596                                self.myMetrics[ host ][ metric['name'] ] = [ ]
597                else:
[32]598                        self.myMetrics[ host ] = { }
[34]599                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]600
[40]601                # Ah, psst, push it
602                #
603                # <atomic>
604                self.slot.acquire()
605
[32]606                self.myMetrics[ host ][ metric['name'] ].append( metric )
607
[40]608                self.slot.release()
609                # </atomic>
610
[47]611        def makeUpdateList( self, host, metriclist ):
[37]612
613                update_list = [ ]
[41]614                metric = None
[37]615
[47]616                while len( metriclist ) > 0:
[37]617
[40]618                        # Kabouter pop
619                        #
620                        # <atomic>     
[47]621                        #self.slot.acquire()
[37]622
[41]623                        # len might have changed since loop start
624                        #
[47]625                        if len( metriclist ) > 0:
626                                metric = metriclist.pop( 0 )
[40]627
[47]628                        #self.slot.release()
[40]629                        # </atomic>
630
[41]631                        if metric:
[47]632                                if self.checkStoreMetric( host, metric ):
[41]633                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[40]634
[37]635                return update_list
636
[40]637        def checkStoreMetric( self, host, metricname, metric ):
638
639                if self.lastStored.has_key( host ):
640
[47]641                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]642
[47]643                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]644
645                                        # Allready wrote a value with this timestamp, skip tnx
646                                        return 0
647
648                else:
649                        self.lastStored[ host ] = { }
650
[47]651                self.lastStored[ host ][ metric['name'] ] = metric['time']
[40]652
653                return 1
654
[33]655        def storeMetrics( self ):
656
657                for hostname, mymetrics in self.myMetrics.items():     
658
659                        for metricname, mymetric in mymetrics.items():
660
[47]661                                #mytime = self.makeTimeSerial()
662                                #serial = mymetric['time']
663                                #correct_serial = self.checkNewRrdPeriod( hostname, mytime )
[33]664
[47]665                                self.slot.acquire() 
[33]666
[47]667                                # Create a mapping table, each metric to the period where it should be stored
668                                #
669                                metric_serial_table = self.determineSerials( hostname, metricname, mymetric )
670                                self.myMetrics[ hostname ][ metricname ] = [ ]
[33]671
[47]672                                self.slot.release()
[43]673
[47]674                                for period, pmetric in metric_serial_table.items():
675
676                                        self.createCheck( hostname, metricname, period )       
677
678                                        update_ret = self.update( hostname, metricname, period, pmetric )
679
680                                        if update_ret == 0:
681
682                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
683                                        else:
684                                                debug_msg( 9, 'metric update failed' )
685
686                                sys.exit(1)
687
[17]688        def makeTimeSerial( self ):
[32]689                "Generate a time serial. Seconds since epoch"
[17]690
691                # Seconds since epoch
692                mytime = int( time.time() )
693
694                return mytime
695
[33]696        def makeRrdPath( self, host, metricname=None, timeserial=None ):
[32]697                """
698                Make a RRD location/path and filename
699                If a metric or timeserial are supplied the complete locations
700                will be made, else just the host directory
701                """
[17]702
[20]703                if not timeserial:     
704                        rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
705                else:
706                        rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
[35]707                if metricname:
[33]708                        rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[20]709                else:
710                        rrd_file = None
[17]711
712                return rrd_dir, rrd_file
713
[20]714        def getLastRrdTimeSerial( self, host ):
[32]715                """
716                Find the last timeserial (directory) for this host
717                This is determined once every host
718                """
[17]719
[19]720                newest_timeserial = 0
721
[47]722                for dir in self.timeserials[ host ]:
[32]723
[47]724                        valid_dir = 1
[17]725
[47]726                        for letter in dir:
727                                if letter not in string.digits:
728                                        valid_dir = 0
[17]729
[47]730                        if valid_dir:
731                                timeserial = dir
732                                if timeserial > newest_timeserial:
733                                        newest_timeserial = timeserial
[17]734
735                if newest_timeserial:
[18]736                        return newest_timeserial
[17]737                else:
738                        return 0
739
[47]740        def determinePeriod( self, host, check_serial ):
741
742                period_serial = 0
743
744                for serial in self.timeserials[ host ]:
745
746                        if check_serial >= serial and period_serial < serial:
747
748                                period_serial = serial
749
750                return period_serial
751
752        def determineSerials( self, host, metricname, metriclist ):
753                """
754                Determine the correct serial and corresponding rrd to store
755                for a list of metrics
756                """
757
758                metric_serial_table = { }
759
760                for metric in metriclist:
761
762                        if metric['name'] == metricname:
763
764                                period = self.determinePeriod( host, metric['time'] )   
765
766                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
767
768                                if (metric['time'] - period) > archive_secs:
769
770                                        # This one should get it's own new period
771                                        period = metric['time']
772
773                                if not metric_serial_table.has_key( period ):
774
775                                        metric_serial_table = [ ]
776
777                                metric_serial_table[ period ].append( metric )
778
779                print metric_serial_table
780
781                return metric_serial_table
782
[20]783        def checkNewRrdPeriod( self, host, current_timeserial ):
[32]784                """
785                Check if current timeserial belongs to recent time period
786                or should become a new period (and file).
[17]787
[32]788                Returns the serial of the correct time period
789                """
790
[20]791                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
[30]792                debug_msg( 9, 'last timeserial of %s is %s' %( host, last_timeserial ) )
[17]793
794                if not last_timeserial:
[18]795                        serial = current_timeserial
796                else:
[17]797
[18]798                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[17]799
[47]800                        if (current_timeserial - last_timeserial) > archive_secs:
[18]801                                serial = current_timeserial
802                        else:
803                                serial = last_timeserial
[17]804
[18]805                return serial
806
[33]807        def getFirstTime( self, host, metricname ):
808                "Get the first time of a metric we know of"
809
810                first_time = 0
811
812                for metric in self.myMetrics[ host ][ metricname ]:
813
814                        if not first_time or metric['time'] <= first_time:
815
816                                first_time = metric['time']
817
[35]818                return first_time
819
[33]820        def createCheck( self, host, metricname, timeserial ):
[9]821                "Check if an .rrd allready exists for this metric, create if not"
822
[35]823                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]824               
[33]825                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]826
[9]827                if not os.path.exists( rrd_dir ):
828                        os.makedirs( rrd_dir )
[14]829                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]830
[14]831                if not os.path.exists( rrd_file ):
[9]832
[33]833                        interval = self.config.getInterval( self.cluster )
[47]834                        heartbeat = 8 * int( interval )
[9]835
[37]836                        params = [ ]
[12]837
[37]838                        params.append( '--step' )
839                        params.append( str( interval ) )
[12]840
[37]841                        params.append( '--start' )
[47]842                        params.append( str( int( timeserial ) - 1 ) )
[12]843
[37]844                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
845                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]846
[37]847                        self.rrdm.create( str(rrd_file), params )
848
[14]849                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
850
[47]851        def update( self, host, metricname, timeserial, metriclist ):
[9]852
[35]853                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]854
[33]855                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]856
[47]857                update_list = self.makeUpdateList( host, metriclist )
[15]858
[41]859                if len( update_list ) > 0:
860                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]861
[41]862                        if ret:
863                                return 1
[27]864               
[41]865                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]866
[36]867                return 0
868
[8]869def main():
870        "Program startup"
871
872        myProcessor = GangliaXMLProcessor()
873
[22]874        if DAEMONIZE:
875                myProcessor.daemon()
876        else:
877                myProcessor.run()
878
[9]879def check_dir( directory ):
880        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
881
882        if directory[-1] == '/':
883                directory = directory[:-1]
884
885        return directory
886
[12]887def debug_msg( level, msg ):
888
889        if (DEBUG_LEVEL >= level):
890                sys.stderr.write( msg + '\n' )
891
[46]892def printTime( ):
893        "Print current time in human readable format"
894
895        return time.strftime("%a %d %b %Y %H:%M:%S")
896
[5]897# Let's go
[9]898if __name__ == '__main__':
899        main()
Note: See TracBrowser for help on using the repository browser.