source: trunk/daemon/togad.py @ 54

Last change on this file since 54 was 54, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Several bugfixes
  • Now more than 1 storethread is possible
File size: 20.0 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[3]16
[8]17# Specify debugging level here;
18#
[38]19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
[40]22# 8  = RRD file activity
23# 7  = daemon threading
[8]24#
[54]25DEBUG_LEVEL = 8
[6]26
[9]27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
[13]39# Amount of hours to store in one single archived .rrd
[9]40#
[43]41ARCHIVE_HOURS_PER_RRD = 12
[13]42
[22]43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
[17]47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
[13]52
[17]53# What XML data types not to store
[13]54#
[17]55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]56
[47]57# Maximum time (in seconds) a parsethread may run
58#
59PARSE_TIMEOUT = 60
60
61# Maximum time (in seconds) a storethread may run
62#
63STORE_TIMEOUT = 360
64
[54]65# Number of storing threads
66#
67STORE_THREADS = 1
[8]68"""
69This is TOrque-GAnglia's data Daemon
70"""
71
[37]72class RRDMutator:
[38]73        "A class for handling .rrd mutations"
[37]74
75        binary = '/usr/bin/rrdtool'
76
77        def __init__( self, binary=None ):
78
79                if binary:
80                        self.binary = binary
81
82        def create( self, filename, args ):
[40]83                return self.perform( 'create', '"' + filename + '"', args )
[37]84
85        def update( self, filename, args ):
[40]86                return self.perform( 'update', '"' + filename + '"', args )
[37]87
[42]88        def grabLastUpdate( self, filename ):
89
90                last_update = 0
91
[53]92                debug_msg( 8, self.binary + ' info "' + filename + '"' )
93
[42]94                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
95
96                        if line.find( 'last_update') != -1:
97
98                                last_update = line.split( ' = ' )[1]
99
100                if last_update:
101                        return last_update
102                else:
103                        return 0
104
[40]105        def perform( self, action, filename, args ):
[37]106
107                arg_string = None
108
[40]109                if type( args ) is not ListType:
110                        debug_msg( 8, 'Arguments needs to be of type List' )
111                        return 1
112
[37]113                for arg in args:
114
115                        if not arg_string:
116
117                                arg_string = arg
118                        else:
119                                arg_string = arg_string + ' ' + arg
120
[54]121                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]122
[40]123                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]124
125                        if line.find( 'ERROR' ) != -1:
126
127                                error_msg = string.join( line.split( ' ' )[1:] )
[40]128                                debug_msg( 8, error_msg )
[37]129                                return 1
130
131                return 0
132
[6]133class GangliaXMLHandler( ContentHandler ):
[8]134        "Parse Ganglia's XML"
[3]135
[33]136        def __init__( self, config ):
137                self.config = config
[35]138                self.clusters = { }
[54]139                debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
[44]140                self.gatherClusters()
[54]141                debug_msg( 0, printTime() + ' - Check done.' )
[33]142
[44]143        def gatherClusters( self ):
144
[45]145                archive_dir = check_dir(ARCHIVE_PATH)
[44]146
147                hosts = [ ]
148
149                if os.path.exists( archive_dir ):
150
151                        dirlist = os.listdir( archive_dir )
152
153                        for item in dirlist:
154
155                                clustername = item
156
157                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
158
159                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
160
[6]161        def startElement( self, name, attrs ):
[8]162                "Store appropriate data from xml start tags"
[3]163
[7]164                if name == 'GANGLIA_XML':
[32]165
166                        self.XMLSource = attrs.get( 'SOURCE', "" )
167                        self.gangliaVersion = attrs.get( 'VERSION', "" )
168
[12]169                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]170
[7]171                elif name == 'GRID':
[32]172
173                        self.gridName = attrs.get( 'NAME', "" )
174                        self.time = attrs.get( 'LOCALTIME', "" )
175
[12]176                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]177
[7]178                elif name == 'CLUSTER':
[32]179
180                        self.clusterName = attrs.get( 'NAME', "" )
181                        self.time = attrs.get( 'LOCALTIME', "" )
182
[35]183                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
[32]184
[34]185                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]186
[35]187                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]188
[14]189                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
[32]190
191                        self.hostName = attrs.get( 'NAME', "" )
192                        self.hostIp = attrs.get( 'IP', "" )
193                        self.hostReported = attrs.get( 'REPORTED', "" )
194
[12]195                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]196
[14]197                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
[6]198
[32]199                        type = attrs.get( 'TYPE', "" )
[6]200
[32]201                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]202
[32]203                                myMetric = { }
204                                myMetric['name'] = attrs.get( 'NAME', "" )
205                                myMetric['val'] = attrs.get( 'VAL', "" )
206                                myMetric['time'] = self.hostReported
[3]207
[34]208                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]209
[34]210                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]211
[34]212        def storeMetrics( self ):
[9]213
[34]214                for clustername, rrdh in self.clusters.items():
[16]215
[38]216                        ret = rrdh.storeMetrics()
[9]217
[38]218                        if ret:
219                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
220                                return 1
221
222                return 0
223
[5]224class GangliaXMLGatherer:
[8]225        "Setup a connection and file object to Ganglia's XML"
[3]226
[8]227        s = None
228
229        def __init__( self, host, port ):
230                "Store host and port for connection"
231
[5]232                self.host = host
233                self.port = port
[33]234                self.connect()
[3]235
[33]236        def connect( self ):
237                "Setup connection to XML source"
[8]238
239                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]240
[5]241                        af, socktype, proto, canonname, sa = res
[32]242
[5]243                        try:
[32]244
[8]245                                self.s = socket.socket( af, socktype, proto )
[32]246
[5]247                        except socket.error, msg:
[32]248
[8]249                                self.s = None
[5]250                                continue
[32]251
[5]252                        try:
[32]253
[8]254                                self.s.connect( sa )
[32]255
[5]256                        except socket.error, msg:
[32]257
[8]258                                self.s.close()
259                                self.s = None
[5]260                                continue
[32]261
[5]262                        break
[3]263
[8]264                if self.s is None:
[32]265
[33]266                        debug_msg( 0, 'Could not open socket' )
267                        sys.exit( 1 )
[5]268
[33]269        def disconnect( self ):
270                "Close socket"
271
272                if self.s:
273                        self.s.close()
274                        self.s = None
275
276        def __del__( self ):
277                "Kill the socket before we leave"
278
279                self.disconnect()
280
281        def getFileObject( self ):
282                "Connect, and return a file object"
283
[38]284                if self.s:
285                        # Apearantly, only data is received when a connection is made
286                        # therefor, disconnect and connect
287                        #
288                        self.disconnect()
289                        self.connect()
[33]290
[8]291                return self.s.makefile( 'r' )
[5]292
[8]293class GangliaXMLProcessor:
[5]294
[33]295        def __init__( self ):
296                "Setup initial XML connection and handlers"
297
298                self.config = GangliaConfigParser( GMETAD_CONF )
299
300                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
301                self.myParser = make_parser()   
302                self.myHandler = GangliaXMLHandler( self.config )
303                self.myParser.setContentHandler( self.myHandler )
304
[9]305        def daemon( self ):
[8]306                "Run as daemon forever"
[5]307
[8]308                self.DAEMON = 1
[5]309
[8]310                # Fork the first child
311                #
312                pid = os.fork()
[32]313
[8]314                if pid > 0:
[7]315
[32]316                        sys.exit(0)  # end parent
317
[8]318                # creates a session and sets the process group ID
319                #
320                os.setsid()
[7]321
[8]322                # Fork the second child
323                #
324                pid = os.fork()
[32]325
[8]326                if pid > 0:
[5]327
[32]328                        sys.exit(0)  # end parent
329
[8]330                # Go to the root directory and set the umask
331                #
332                os.chdir('/')
333                os.umask(0)
334
335                sys.stdin.close()
336                sys.stdout.close()
[36]337                #sys.stderr.close()
[8]338
339                os.open('/dev/null', 0)
340                os.dup(0)
341                os.dup(0)
342
343                self.run()
344
[22]345        def printTime( self ):
[33]346                "Print current time in human readable format"
[22]347
[33]348                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]349
[9]350        def run( self ):
[8]351                "Main thread"
352
[36]353                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
[22]354
[54]355                storethreads = [ ]
356
357                for num in range( int( STORE_THREADS ) ):
358
359                        storethreads.append( threading.Thread( None, self.storeMetrics, 'storethread' ) )
360
[36]361                while( 1 ):
362
363                        if not xmlthread.isAlive():
364                                # Gather XML at the same interval as gmetad
365
366                                # threaded call to: self.processXML()
367                                #
368                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
369                                xmlthread.start()
370
[54]371                        for storethread in storethreads:
[36]372
[54]373                                mythread = storethreads.index( storethread )
374
375                                if not storethread.isAlive():
376
377                                        # Store metrics every .. sec
378
379                                        # threaded call to: self.storeMetrics()
380                                        #
381                                        storethreads[ mythread ] = threading.Thread( None, self.storeMetrics, 'storethread' )
382                                        storethreads[ mythread ].start()
[36]383               
384                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
385                        time.sleep( 1 ) 
386
[33]387        def storeMetrics( self ):
388                "Store metrics retained in memory to disk"
[22]389
[40]390                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]391
[38]392                # Store metrics somewhere between every 60 and 180 seconds
393                #
[54]394                #STORE_INTERVAL = random.randint( 360, 640 )
395                STORE_INTERVAL = 60
[22]396
[39]397                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]398                storethread.start()
399
[40]400                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]401                time.sleep( STORE_INTERVAL )
[40]402                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]403
404                if storethread.isAlive():
405
[40]406                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
[47]407                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[40]408                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]409
[40]410                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]411
412                return 0
413
[39]414        def storeThread( self ):
415
[40]416                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
417                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]418                ret = self.myHandler.storeMetrics()
[40]419                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
420                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]421               
422                return ret
423
[8]424        def processXML( self ):
425                "Process XML"
426
[40]427                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]428
[39]429                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]430                parsethread.start()
431
[40]432                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]433                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]434                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]435
436                if parsethread.isAlive():
437
[40]438                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[47]439                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[40]440                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]441
[40]442                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]443
444                return 0
445
[39]446        def parseThread( self ):
447
[40]448                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
449                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]450                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]451                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
452                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]453
454                return ret
455
[9]456class GangliaConfigParser:
457
[34]458        sources = [ ]
[9]459
460        def __init__( self, config ):
[32]461
[9]462                self.config = config
463                self.parseValues()
464
[32]465        def parseValues( self ):
[9]466                "Parse certain values from gmetad.conf"
467
468                readcfg = open( self.config, 'r' )
469
470                for line in readcfg.readlines():
471
472                        if line.count( '"' ) > 1:
473
[10]474                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]475
[11]476                                        source = { }
477                                        source['name'] = line.split( '"' )[1]
[9]478                                        source_words = line.split( '"' )[2].split( ' ' )
479
480                                        for word in source_words:
481
482                                                valid_interval = 1
483
484                                                for letter in word:
[32]485
[9]486                                                        if letter not in string.digits:
[32]487
[9]488                                                                valid_interval = 0
489
[10]490                                                if valid_interval and len(word) > 0:
[32]491
[9]492                                                        source['interval'] = word
[12]493                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]494       
495                                        # No interval found, use Ganglia's default     
496                                        if not source.has_key( 'interval' ):
497                                                source['interval'] = 15
498                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]499
[33]500                                        self.sources.append( source )
[9]501
502        def getInterval( self, source_name ):
[32]503
[9]504                for source in self.sources:
[32]505
[12]506                        if source['name'] == source_name:
[32]507
[9]508                                return source['interval']
[32]509
[9]510                return None
511
[34]512        def getLowestInterval( self ):
513
514                lowest_interval = 0
515
516                for source in self.sources:
517
518                        if not lowest_interval or source['interval'] <= lowest_interval:
519
520                                lowest_interval = source['interval']
521
522                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
523                if lowest_interval:
524                        return lowest_interval
525                else:
526                        return 15
527
[9]528class RRDHandler:
529
[32]530        myMetrics = { }
[40]531        lastStored = { }
[47]532        timeserials = { }
[36]533        slot = None
[32]534
[33]535        def __init__( self, config, cluster ):
[44]536                self.block = 0
[9]537                self.cluster = cluster
[33]538                self.config = config
[40]539                self.slot = threading.Lock()
[37]540                self.rrdm = RRDMutator()
[42]541                self.gatherLastUpdates()
[9]542
[42]543        def gatherLastUpdates( self ):
544                "Populate the lastStored list, containing timestamps of all last updates"
545
546                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
547
548                hosts = [ ]
549
550                if os.path.exists( cluster_dir ):
551
[44]552                        dirlist = os.listdir( cluster_dir )
[42]553
[44]554                        for dir in dirlist:
[42]555
[44]556                                hosts.append( dir )
[42]557
558                for host in hosts:
559
[47]560                        host_dir = cluster_dir + '/' + host
561                        dirlist = os.listdir( host_dir )
562
563                        for dir in dirlist:
564
565                                if not self.timeserials.has_key( host ):
566
567                                        self.timeserials[ host ] = [ ]
568
569                                self.timeserials[ host ].append( dir )
570
[42]571                        last_serial = self.getLastRrdTimeSerial( host )
572                        if last_serial:
573
574                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
575                                if os.path.exists( metric_dir ):
576
[44]577                                        dirlist = os.listdir( metric_dir )
[42]578
[44]579                                        for file in dirlist:
[42]580
[44]581                                                metricname = file.split( '.rrd' )[0]
[42]582
[44]583                                                if not self.lastStored.has_key( host ):
[42]584
[44]585                                                        self.lastStored[ host ] = { }
[42]586
[44]587                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]588
[32]589        def getClusterName( self ):
590                return self.cluster
591
592        def memMetric( self, host, metric ):
593
[34]594                if self.myMetrics.has_key( host ):
[32]595
[34]596                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]597
[34]598                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]599
[34]600                                        if mymetric['time'] == metric['time']:
[32]601
[34]602                                                # Allready have this metric, abort
603                                                return 1
604                        else:
605                                self.myMetrics[ host ][ metric['name'] ] = [ ]
606                else:
[32]607                        self.myMetrics[ host ] = { }
[34]608                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]609
[53]610                # <ATOMIC>
[40]611                #
612                self.slot.acquire()
613
[32]614                self.myMetrics[ host ][ metric['name'] ].append( metric )
615
[40]616                self.slot.release()
[53]617                #
618                # </ATOMIC>
[40]619
[47]620        def makeUpdateList( self, host, metriclist ):
[37]621
622                update_list = [ ]
[41]623                metric = None
[37]624
[47]625                while len( metriclist ) > 0:
[37]626
[53]627                        metric = metriclist.pop( 0 )
[37]628
[53]629                        if self.checkStoreMetric( host, metric ):
630                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[40]631
[37]632                return update_list
633
[49]634        def checkStoreMetric( self, host, metric ):
[40]635
636                if self.lastStored.has_key( host ):
637
[47]638                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]639
[47]640                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]641
[50]642                                        # This is old
[40]643                                        return 0
644
[50]645                return 1
646
[54]647        def memLastUpdate( self, host, metricname, metriclist ):
[50]648
[54]649                if not self.lastStored.has_key( host ):
650                        self.lastStored[ host ] = { }
651
[50]652                last_update_time = 0
653
654                for metric in metriclist:
655
[54]656                        if metric['name'] == metricname:
[50]657
[54]658                                if metric['time'] > last_update_time:
[50]659
[54]660                                        last_update_time = metric['time']
[40]661
[54]662                if self.lastStored[ host ].has_key( metricname ):
[52]663                       
[54]664                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]665                                return 1
[40]666
[54]667                self.lastStored[ host ][ metricname ] = last_update_time
[52]668
[33]669        def storeMetrics( self ):
670
671                for hostname, mymetrics in self.myMetrics.items():     
672
673                        for metricname, mymetric in mymetrics.items():
674
[53]675                                metrics_to_store = [ ]
676
677                                # <ATOMIC>
[50]678                                #
[47]679                                self.slot.acquire() 
[33]680
[54]681                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]682
[54]683                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
684                                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
[53]685
686                                self.slot.release()
687                                #
688                                # </ATOMIC>
689
[47]690                                # Create a mapping table, each metric to the period where it should be stored
691                                #
[53]692                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]693
[50]694                                update_rets = [ ]
695
[47]696                                for period, pmetric in metric_serial_table.items():
697
698                                        self.createCheck( hostname, metricname, period )       
699
700                                        update_ret = self.update( hostname, metricname, period, pmetric )
701
702                                        if update_ret == 0:
703
704                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
705                                        else:
706                                                debug_msg( 9, 'metric update failed' )
707
[50]708                                        update_rets.append( update_ret )
[47]709
[50]710                                if not (1) in update_rets:
711
[54]712                                        self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]713
[17]714        def makeTimeSerial( self ):
[32]715                "Generate a time serial. Seconds since epoch"
[17]716
717                # Seconds since epoch
718                mytime = int( time.time() )
719
720                return mytime
721
[50]722        def makeRrdPath( self, host, metricname, timeserial ):
[32]723                """
724                Make a RRD location/path and filename
725                If a metric or timeserial are supplied the complete locations
726                will be made, else just the host directory
727                """
[17]728
[50]729                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
730                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[17]731
732                return rrd_dir, rrd_file
733
[20]734        def getLastRrdTimeSerial( self, host ):
[32]735                """
736                Find the last timeserial (directory) for this host
737                This is determined once every host
738                """
[17]739
[19]740                newest_timeserial = 0
741
[47]742                for dir in self.timeserials[ host ]:
[32]743
[47]744                        valid_dir = 1
[17]745
[47]746                        for letter in dir:
747                                if letter not in string.digits:
748                                        valid_dir = 0
[17]749
[47]750                        if valid_dir:
751                                timeserial = dir
752                                if timeserial > newest_timeserial:
753                                        newest_timeserial = timeserial
[17]754
755                if newest_timeserial:
[18]756                        return newest_timeserial
[17]757                else:
758                        return 0
759
[47]760        def determinePeriod( self, host, check_serial ):
761
762                period_serial = 0
763
764                for serial in self.timeserials[ host ]:
765
766                        if check_serial >= serial and period_serial < serial:
767
768                                period_serial = serial
769
770                return period_serial
771
772        def determineSerials( self, host, metricname, metriclist ):
773                """
774                Determine the correct serial and corresponding rrd to store
775                for a list of metrics
776                """
777
778                metric_serial_table = { }
779
780                for metric in metriclist:
781
782                        if metric['name'] == metricname:
783
784                                period = self.determinePeriod( host, metric['time'] )   
785
786                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
787
[49]788                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]789
790                                        # This one should get it's own new period
791                                        period = metric['time']
[50]792                                        self.timeserials[ host ].append( period )
[47]793
794                                if not metric_serial_table.has_key( period ):
795
[49]796                                        metric_serial_table[ period ] = [ ]
[47]797
798                                metric_serial_table[ period ].append( metric )
799
800                return metric_serial_table
801
[33]802        def createCheck( self, host, metricname, timeserial ):
[9]803                "Check if an .rrd allready exists for this metric, create if not"
804
[35]805                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]806               
[33]807                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]808
[9]809                if not os.path.exists( rrd_dir ):
810                        os.makedirs( rrd_dir )
[14]811                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]812
[14]813                if not os.path.exists( rrd_file ):
[9]814
[33]815                        interval = self.config.getInterval( self.cluster )
[47]816                        heartbeat = 8 * int( interval )
[9]817
[37]818                        params = [ ]
[12]819
[37]820                        params.append( '--step' )
821                        params.append( str( interval ) )
[12]822
[37]823                        params.append( '--start' )
[47]824                        params.append( str( int( timeserial ) - 1 ) )
[12]825
[37]826                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
827                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]828
[37]829                        self.rrdm.create( str(rrd_file), params )
830
[14]831                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
832
[47]833        def update( self, host, metricname, timeserial, metriclist ):
[9]834
[35]835                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]836
[33]837                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]838
[47]839                update_list = self.makeUpdateList( host, metriclist )
[15]840
[41]841                if len( update_list ) > 0:
842                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]843
[41]844                        if ret:
845                                return 1
[27]846               
[41]847                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]848
[36]849                return 0
850
[8]851def main():
852        "Program startup"
853
854        myProcessor = GangliaXMLProcessor()
855
[22]856        if DAEMONIZE:
857                myProcessor.daemon()
858        else:
859                myProcessor.run()
860
[9]861def check_dir( directory ):
862        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
863
864        if directory[-1] == '/':
865                directory = directory[:-1]
866
867        return directory
868
[12]869def debug_msg( level, msg ):
870
871        if (DEBUG_LEVEL >= level):
872                sys.stderr.write( msg + '\n' )
873
[46]874def printTime( ):
875        "Print current time in human readable format"
876
877        return time.strftime("%a %d %b %Y %H:%M:%S")
878
[5]879# Let's go
[9]880if __name__ == '__main__':
881        main()
Note: See TracBrowser for help on using the repository browser.