source: trunk/daemon/togad.py @ 58

Last change on this file since 58 was 58, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Added catch of exception that happens in a rare case when trying mkdirs while targetdir allready exists. I noticed this happening in the weekend testing run.

(This shouldn't be happening because of the if-statement
not sure what is causing it, weird..)

  • Will now ignore a 'File exists' Exception of mkdirs, all other exceptions still cause abort
File size: 20.0 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[3]16
[8]17# Specify debugging level here;
18#
[38]19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
[40]22# 8  = RRD file activity
23# 7  = daemon threading
[8]24#
[55]25DEBUG_LEVEL = 7
[6]26
[9]27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
[13]39# Amount of hours to store in one single archived .rrd
[9]40#
[43]41ARCHIVE_HOURS_PER_RRD = 12
[13]42
[22]43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
[17]47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
[13]52
[17]53# What XML data types not to store
[13]54#
[17]55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]56
[47]57# Maximum time (in seconds) a parsethread may run
58#
59PARSE_TIMEOUT = 60
60
61# Maximum time (in seconds) a storethread may run
62#
63STORE_TIMEOUT = 360
64
[8]65"""
66This is TOrque-GAnglia's data Daemon
67"""
68
[37]69class RRDMutator:
[38]70        "A class for handling .rrd mutations"
[37]71
72        binary = '/usr/bin/rrdtool'
73
74        def __init__( self, binary=None ):
75
76                if binary:
77                        self.binary = binary
78
79        def create( self, filename, args ):
[40]80                return self.perform( 'create', '"' + filename + '"', args )
[37]81
82        def update( self, filename, args ):
[40]83                return self.perform( 'update', '"' + filename + '"', args )
[37]84
[42]85        def grabLastUpdate( self, filename ):
86
87                last_update = 0
88
[53]89                debug_msg( 8, self.binary + ' info "' + filename + '"' )
90
[42]91                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
92
93                        if line.find( 'last_update') != -1:
94
95                                last_update = line.split( ' = ' )[1]
96
97                if last_update:
98                        return last_update
99                else:
100                        return 0
101
[40]102        def perform( self, action, filename, args ):
[37]103
104                arg_string = None
105
[40]106                if type( args ) is not ListType:
107                        debug_msg( 8, 'Arguments needs to be of type List' )
108                        return 1
109
[37]110                for arg in args:
111
112                        if not arg_string:
113
114                                arg_string = arg
115                        else:
116                                arg_string = arg_string + ' ' + arg
117
[54]118                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]119
[40]120                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]121
122                        if line.find( 'ERROR' ) != -1:
123
124                                error_msg = string.join( line.split( ' ' )[1:] )
[40]125                                debug_msg( 8, error_msg )
[37]126                                return 1
127
128                return 0
129
[6]130class GangliaXMLHandler( ContentHandler ):
[8]131        "Parse Ganglia's XML"
[3]132
[33]133        def __init__( self, config ):
134                self.config = config
[35]135                self.clusters = { }
[54]136                debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
[44]137                self.gatherClusters()
[54]138                debug_msg( 0, printTime() + ' - Check done.' )
[33]139
[44]140        def gatherClusters( self ):
141
[45]142                archive_dir = check_dir(ARCHIVE_PATH)
[44]143
144                hosts = [ ]
145
146                if os.path.exists( archive_dir ):
147
148                        dirlist = os.listdir( archive_dir )
149
150                        for item in dirlist:
151
152                                clustername = item
153
154                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
155
156                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
157
[6]158        def startElement( self, name, attrs ):
[8]159                "Store appropriate data from xml start tags"
[3]160
[7]161                if name == 'GANGLIA_XML':
[32]162
163                        self.XMLSource = attrs.get( 'SOURCE', "" )
164                        self.gangliaVersion = attrs.get( 'VERSION', "" )
165
[12]166                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]167
[7]168                elif name == 'GRID':
[32]169
170                        self.gridName = attrs.get( 'NAME', "" )
171                        self.time = attrs.get( 'LOCALTIME', "" )
172
[12]173                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]174
[7]175                elif name == 'CLUSTER':
[32]176
177                        self.clusterName = attrs.get( 'NAME', "" )
178                        self.time = attrs.get( 'LOCALTIME', "" )
179
[35]180                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
[32]181
[34]182                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]183
[35]184                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]185
[14]186                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
[32]187
188                        self.hostName = attrs.get( 'NAME', "" )
189                        self.hostIp = attrs.get( 'IP', "" )
190                        self.hostReported = attrs.get( 'REPORTED', "" )
191
[12]192                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]193
[14]194                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
[6]195
[32]196                        type = attrs.get( 'TYPE', "" )
[6]197
[32]198                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]199
[32]200                                myMetric = { }
201                                myMetric['name'] = attrs.get( 'NAME', "" )
202                                myMetric['val'] = attrs.get( 'VAL', "" )
203                                myMetric['time'] = self.hostReported
[3]204
[34]205                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]206
[34]207                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]208
[34]209        def storeMetrics( self ):
[9]210
[34]211                for clustername, rrdh in self.clusters.items():
[16]212
[38]213                        ret = rrdh.storeMetrics()
[9]214
[38]215                        if ret:
216                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
217                                return 1
218
219                return 0
220
[5]221class GangliaXMLGatherer:
[8]222        "Setup a connection and file object to Ganglia's XML"
[3]223
[8]224        s = None
225
226        def __init__( self, host, port ):
227                "Store host and port for connection"
228
[5]229                self.host = host
230                self.port = port
[33]231                self.connect()
[3]232
[33]233        def connect( self ):
234                "Setup connection to XML source"
[8]235
236                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]237
[5]238                        af, socktype, proto, canonname, sa = res
[32]239
[5]240                        try:
[32]241
[8]242                                self.s = socket.socket( af, socktype, proto )
[32]243
[5]244                        except socket.error, msg:
[32]245
[8]246                                self.s = None
[5]247                                continue
[32]248
[5]249                        try:
[32]250
[8]251                                self.s.connect( sa )
[32]252
[5]253                        except socket.error, msg:
[32]254
[8]255                                self.s.close()
256                                self.s = None
[5]257                                continue
[32]258
[5]259                        break
[3]260
[8]261                if self.s is None:
[32]262
[33]263                        debug_msg( 0, 'Could not open socket' )
264                        sys.exit( 1 )
[5]265
[33]266        def disconnect( self ):
267                "Close socket"
268
269                if self.s:
270                        self.s.close()
271                        self.s = None
272
273        def __del__( self ):
274                "Kill the socket before we leave"
275
276                self.disconnect()
277
278        def getFileObject( self ):
279                "Connect, and return a file object"
280
[38]281                if self.s:
282                        # Apearantly, only data is received when a connection is made
283                        # therefor, disconnect and connect
284                        #
285                        self.disconnect()
286                        self.connect()
[33]287
[8]288                return self.s.makefile( 'r' )
[5]289
[8]290class GangliaXMLProcessor:
[5]291
[33]292        def __init__( self ):
293                "Setup initial XML connection and handlers"
294
295                self.config = GangliaConfigParser( GMETAD_CONF )
296
297                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
298                self.myParser = make_parser()   
299                self.myHandler = GangliaXMLHandler( self.config )
300                self.myParser.setContentHandler( self.myHandler )
301
[9]302        def daemon( self ):
[8]303                "Run as daemon forever"
[5]304
[8]305                self.DAEMON = 1
[5]306
[8]307                # Fork the first child
308                #
309                pid = os.fork()
[32]310
[8]311                if pid > 0:
[7]312
[32]313                        sys.exit(0)  # end parent
314
[8]315                # creates a session and sets the process group ID
316                #
317                os.setsid()
[7]318
[8]319                # Fork the second child
320                #
321                pid = os.fork()
[32]322
[8]323                if pid > 0:
[5]324
[32]325                        sys.exit(0)  # end parent
326
[8]327                # Go to the root directory and set the umask
328                #
329                os.chdir('/')
330                os.umask(0)
331
332                sys.stdin.close()
333                sys.stdout.close()
[36]334                #sys.stderr.close()
[8]335
336                os.open('/dev/null', 0)
337                os.dup(0)
338                os.dup(0)
339
340                self.run()
341
[22]342        def printTime( self ):
[33]343                "Print current time in human readable format"
[22]344
[33]345                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]346
[9]347        def run( self ):
[8]348                "Main thread"
349
[36]350                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
[55]351                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]352
[36]353                while( 1 ):
354
355                        if not xmlthread.isAlive():
356                                # Gather XML at the same interval as gmetad
357
358                                # threaded call to: self.processXML()
359                                #
360                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
361                                xmlthread.start()
362
[55]363                        if not storethread.isAlive():
364                                # Store metrics every .. sec
[36]365
[55]366                                # threaded call to: self.storeMetrics()
367                                #
368                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
369                                storethread.start()
[36]370               
371                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
372                        time.sleep( 1 ) 
373
[33]374        def storeMetrics( self ):
375                "Store metrics retained in memory to disk"
[22]376
[40]377                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]378
[38]379                # Store metrics somewhere between every 60 and 180 seconds
380                #
[55]381                STORE_INTERVAL = random.randint( 360, 640 )
[22]382
[39]383                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]384                storethread.start()
385
[40]386                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]387                time.sleep( STORE_INTERVAL )
[40]388                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]389
390                if storethread.isAlive():
391
[40]392                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
[47]393                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[40]394                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]395
[40]396                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]397
398                return 0
399
[39]400        def storeThread( self ):
401
[40]402                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
403                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]404                ret = self.myHandler.storeMetrics()
[40]405                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
406                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]407               
408                return ret
409
[8]410        def processXML( self ):
411                "Process XML"
412
[40]413                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]414
[39]415                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]416                parsethread.start()
417
[40]418                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]419                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]420                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]421
422                if parsethread.isAlive():
423
[40]424                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[47]425                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[40]426                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]427
[40]428                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]429
430                return 0
431
[39]432        def parseThread( self ):
433
[40]434                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
435                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]436                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]437                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
438                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]439
440                return ret
441
[9]442class GangliaConfigParser:
443
[34]444        sources = [ ]
[9]445
446        def __init__( self, config ):
[32]447
[9]448                self.config = config
449                self.parseValues()
450
[32]451        def parseValues( self ):
[9]452                "Parse certain values from gmetad.conf"
453
454                readcfg = open( self.config, 'r' )
455
456                for line in readcfg.readlines():
457
458                        if line.count( '"' ) > 1:
459
[10]460                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]461
[11]462                                        source = { }
463                                        source['name'] = line.split( '"' )[1]
[9]464                                        source_words = line.split( '"' )[2].split( ' ' )
465
466                                        for word in source_words:
467
468                                                valid_interval = 1
469
470                                                for letter in word:
[32]471
[9]472                                                        if letter not in string.digits:
[32]473
[9]474                                                                valid_interval = 0
475
[10]476                                                if valid_interval and len(word) > 0:
[32]477
[9]478                                                        source['interval'] = word
[12]479                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]480       
481                                        # No interval found, use Ganglia's default     
482                                        if not source.has_key( 'interval' ):
483                                                source['interval'] = 15
484                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]485
[33]486                                        self.sources.append( source )
[9]487
488        def getInterval( self, source_name ):
[32]489
[9]490                for source in self.sources:
[32]491
[12]492                        if source['name'] == source_name:
[32]493
[9]494                                return source['interval']
[32]495
[9]496                return None
497
[34]498        def getLowestInterval( self ):
499
500                lowest_interval = 0
501
502                for source in self.sources:
503
504                        if not lowest_interval or source['interval'] <= lowest_interval:
505
506                                lowest_interval = source['interval']
507
508                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
509                if lowest_interval:
510                        return lowest_interval
511                else:
512                        return 15
513
[9]514class RRDHandler:
515
[32]516        myMetrics = { }
[40]517        lastStored = { }
[47]518        timeserials = { }
[36]519        slot = None
[32]520
[33]521        def __init__( self, config, cluster ):
[44]522                self.block = 0
[9]523                self.cluster = cluster
[33]524                self.config = config
[40]525                self.slot = threading.Lock()
[37]526                self.rrdm = RRDMutator()
[42]527                self.gatherLastUpdates()
[9]528
[42]529        def gatherLastUpdates( self ):
530                "Populate the lastStored list, containing timestamps of all last updates"
531
532                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
533
534                hosts = [ ]
535
536                if os.path.exists( cluster_dir ):
537
[44]538                        dirlist = os.listdir( cluster_dir )
[42]539
[44]540                        for dir in dirlist:
[42]541
[44]542                                hosts.append( dir )
[42]543
544                for host in hosts:
545
[47]546                        host_dir = cluster_dir + '/' + host
547                        dirlist = os.listdir( host_dir )
548
549                        for dir in dirlist:
550
551                                if not self.timeserials.has_key( host ):
552
553                                        self.timeserials[ host ] = [ ]
554
555                                self.timeserials[ host ].append( dir )
556
[42]557                        last_serial = self.getLastRrdTimeSerial( host )
558                        if last_serial:
559
560                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
561                                if os.path.exists( metric_dir ):
562
[44]563                                        dirlist = os.listdir( metric_dir )
[42]564
[44]565                                        for file in dirlist:
[42]566
[44]567                                                metricname = file.split( '.rrd' )[0]
[42]568
[44]569                                                if not self.lastStored.has_key( host ):
[42]570
[44]571                                                        self.lastStored[ host ] = { }
[42]572
[44]573                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]574
[32]575        def getClusterName( self ):
576                return self.cluster
577
578        def memMetric( self, host, metric ):
579
[34]580                if self.myMetrics.has_key( host ):
[32]581
[34]582                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]583
[34]584                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]585
[34]586                                        if mymetric['time'] == metric['time']:
[32]587
[34]588                                                # Allready have this metric, abort
589                                                return 1
590                        else:
591                                self.myMetrics[ host ][ metric['name'] ] = [ ]
592                else:
[32]593                        self.myMetrics[ host ] = { }
[34]594                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]595
[53]596                # <ATOMIC>
[40]597                #
598                self.slot.acquire()
599
[32]600                self.myMetrics[ host ][ metric['name'] ].append( metric )
601
[40]602                self.slot.release()
[53]603                #
604                # </ATOMIC>
[40]605
[47]606        def makeUpdateList( self, host, metriclist ):
[37]607
608                update_list = [ ]
[41]609                metric = None
[37]610
[47]611                while len( metriclist ) > 0:
[37]612
[53]613                        metric = metriclist.pop( 0 )
[37]614
[53]615                        if self.checkStoreMetric( host, metric ):
616                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[40]617
[37]618                return update_list
619
[49]620        def checkStoreMetric( self, host, metric ):
[40]621
622                if self.lastStored.has_key( host ):
623
[47]624                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]625
[47]626                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]627
[50]628                                        # This is old
[40]629                                        return 0
630
[50]631                return 1
632
[54]633        def memLastUpdate( self, host, metricname, metriclist ):
[50]634
[54]635                if not self.lastStored.has_key( host ):
636                        self.lastStored[ host ] = { }
637
[50]638                last_update_time = 0
639
640                for metric in metriclist:
641
[54]642                        if metric['name'] == metricname:
[50]643
[54]644                                if metric['time'] > last_update_time:
[50]645
[54]646                                        last_update_time = metric['time']
[40]647
[54]648                if self.lastStored[ host ].has_key( metricname ):
[52]649                       
[54]650                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]651                                return 1
[40]652
[54]653                self.lastStored[ host ][ metricname ] = last_update_time
[52]654
[33]655        def storeMetrics( self ):
656
657                for hostname, mymetrics in self.myMetrics.items():     
658
659                        for metricname, mymetric in mymetrics.items():
660
[53]661                                metrics_to_store = [ ]
662
663                                # <ATOMIC>
[50]664                                #
[47]665                                self.slot.acquire() 
[33]666
[54]667                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]668
[54]669                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
670                                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
[53]671
672                                self.slot.release()
673                                #
674                                # </ATOMIC>
675
[47]676                                # Create a mapping table, each metric to the period where it should be stored
677                                #
[53]678                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]679
[50]680                                update_rets = [ ]
681
[47]682                                for period, pmetric in metric_serial_table.items():
683
684                                        self.createCheck( hostname, metricname, period )       
685
686                                        update_ret = self.update( hostname, metricname, period, pmetric )
687
688                                        if update_ret == 0:
689
690                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
691                                        else:
692                                                debug_msg( 9, 'metric update failed' )
693
[50]694                                        update_rets.append( update_ret )
[47]695
[50]696                                if not (1) in update_rets:
697
[54]698                                        self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]699
[17]700        def makeTimeSerial( self ):
[32]701                "Generate a time serial. Seconds since epoch"
[17]702
703                # Seconds since epoch
704                mytime = int( time.time() )
705
706                return mytime
707
[50]708        def makeRrdPath( self, host, metricname, timeserial ):
[32]709                """
710                Make a RRD location/path and filename
711                If a metric or timeserial are supplied the complete locations
712                will be made, else just the host directory
713                """
[17]714
[50]715                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
716                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[17]717
718                return rrd_dir, rrd_file
719
[20]720        def getLastRrdTimeSerial( self, host ):
[32]721                """
722                Find the last timeserial (directory) for this host
723                This is determined once every host
724                """
[17]725
[19]726                newest_timeserial = 0
727
[47]728                for dir in self.timeserials[ host ]:
[32]729
[47]730                        valid_dir = 1
[17]731
[47]732                        for letter in dir:
733                                if letter not in string.digits:
734                                        valid_dir = 0
[17]735
[47]736                        if valid_dir:
737                                timeserial = dir
738                                if timeserial > newest_timeserial:
739                                        newest_timeserial = timeserial
[17]740
741                if newest_timeserial:
[18]742                        return newest_timeserial
[17]743                else:
744                        return 0
745
[47]746        def determinePeriod( self, host, check_serial ):
747
748                period_serial = 0
749
[56]750                if self.timeserials.has_key( host ):
[47]751
[56]752                        for serial in self.timeserials[ host ]:
[47]753
[56]754                                if check_serial >= serial and period_serial < serial:
[47]755
[56]756                                        period_serial = serial
757
[47]758                return period_serial
759
760        def determineSerials( self, host, metricname, metriclist ):
761                """
762                Determine the correct serial and corresponding rrd to store
763                for a list of metrics
764                """
765
766                metric_serial_table = { }
767
768                for metric in metriclist:
769
770                        if metric['name'] == metricname:
771
772                                period = self.determinePeriod( host, metric['time'] )   
773
774                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
775
[49]776                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]777
778                                        # This one should get it's own new period
779                                        period = metric['time']
[57]780
781                                        if not self.timeserials.has_key( host ):
782                                                self.timeserials[ host ] = [ ]
783
[50]784                                        self.timeserials[ host ].append( period )
[47]785
786                                if not metric_serial_table.has_key( period ):
787
[49]788                                        metric_serial_table[ period ] = [ ]
[47]789
790                                metric_serial_table[ period ].append( metric )
791
792                return metric_serial_table
793
[33]794        def createCheck( self, host, metricname, timeserial ):
[9]795                "Check if an .rrd allready exists for this metric, create if not"
796
[35]797                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]798               
[33]799                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]800
[9]801                if not os.path.exists( rrd_dir ):
[58]802
803                        try:
804                                os.makedirs( rrd_dir )
805
806                        except OSError, msg:
807
808                                if msg.find( 'File exists' ) != -1:
809
810                                        # Ignore exists errors
811                                        pass
812
813                                else:
814
815                                        print msg
816                                        return
817
[14]818                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]819
[14]820                if not os.path.exists( rrd_file ):
[9]821
[33]822                        interval = self.config.getInterval( self.cluster )
[47]823                        heartbeat = 8 * int( interval )
[9]824
[37]825                        params = [ ]
[12]826
[37]827                        params.append( '--step' )
828                        params.append( str( interval ) )
[12]829
[37]830                        params.append( '--start' )
[47]831                        params.append( str( int( timeserial ) - 1 ) )
[12]832
[37]833                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
834                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]835
[37]836                        self.rrdm.create( str(rrd_file), params )
837
[14]838                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
839
[47]840        def update( self, host, metricname, timeserial, metriclist ):
[9]841
[35]842                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]843
[33]844                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]845
[47]846                update_list = self.makeUpdateList( host, metriclist )
[15]847
[41]848                if len( update_list ) > 0:
849                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]850
[41]851                        if ret:
852                                return 1
[27]853               
[41]854                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]855
[36]856                return 0
857
[8]858def main():
859        "Program startup"
860
861        myProcessor = GangliaXMLProcessor()
862
[22]863        if DAEMONIZE:
864                myProcessor.daemon()
865        else:
866                myProcessor.run()
867
[9]868def check_dir( directory ):
869        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
870
871        if directory[-1] == '/':
872                directory = directory[:-1]
873
874        return directory
875
[12]876def debug_msg( level, msg ):
877
878        if (DEBUG_LEVEL >= level):
879                sys.stderr.write( msg + '\n' )
880
[46]881def printTime( ):
882        "Print current time in human readable format"
883
884        return time.strftime("%a %d %b %Y %H:%M:%S")
885
[5]886# Let's go
[9]887if __name__ == '__main__':
888        main()
Note: See TracBrowser for help on using the repository browser.