source: trunk/daemon/togad.py @ 44

Last change on this file since 44 was 44, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Grabbing of last serials and updates of existing .rrds will now be done on application boot instead of from the parsing thread. Calling from parsing thread caused XML parsing errors because of the time needed.
  • Also replaced os.walk with os.listdir, we don't need a recursive dirlist, just the root list
File size: 18.9 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[3]16
[8]17# Specify debugging level here;
18#
[38]19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
[40]22# 8  = RRD file activity
23# 7  = daemon threading
[8]24#
[44]25DEBUG_LEVEL = 8
[6]26
[9]27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
[13]39# Amount of hours to store in one single archived .rrd
[9]40#
[43]41ARCHIVE_HOURS_PER_RRD = 12
[13]42
[22]43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
[17]47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
[13]52
[17]53# What XML data types not to store
[13]54#
[17]55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]56
[8]57"""
58This is TOrque-GAnglia's data Daemon
59"""
60
[37]61class RRDMutator:
[38]62        "A class for handling .rrd mutations"
[37]63
64        binary = '/usr/bin/rrdtool'
65
66        def __init__( self, binary=None ):
67
68                if binary:
69                        self.binary = binary
70
71        def create( self, filename, args ):
[40]72                return self.perform( 'create', '"' + filename + '"', args )
[37]73
74        def update( self, filename, args ):
[40]75                return self.perform( 'update', '"' + filename + '"', args )
[37]76
[42]77        def grabLastUpdate( self, filename ):
78
79                last_update = 0
80
81                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
82
83                        if line.find( 'last_update') != -1:
84
85                                last_update = line.split( ' = ' )[1]
86
87                if last_update:
88                        return last_update
89                else:
90                        return 0
91
[40]92        def perform( self, action, filename, args ):
[37]93
94                arg_string = None
95
[40]96                if type( args ) is not ListType:
97                        debug_msg( 8, 'Arguments needs to be of type List' )
98                        return 1
99
[37]100                for arg in args:
101
102                        if not arg_string:
103
104                                arg_string = arg
105                        else:
106                                arg_string = arg_string + ' ' + arg
107
[40]108                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]109
[40]110                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]111
112                        if line.find( 'ERROR' ) != -1:
113
114                                error_msg = string.join( line.split( ' ' )[1:] )
[40]115                                debug_msg( 8, error_msg )
[37]116                                return 1
117
118                return 0
119
[6]120class GangliaXMLHandler( ContentHandler ):
[8]121        "Parse Ganglia's XML"
[3]122
[33]123        def __init__( self, config ):
124                self.config = config
[35]125                self.clusters = { }
[44]126                self.gatherClusters()
[33]127
[44]128        def gatherClusters( self ):
129
130                archive_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
131
132                hosts = [ ]
133
134                if os.path.exists( archive_dir ):
135
136                        dirlist = os.listdir( archive_dir )
137
138                        for item in dirlist:
139
140                                clustername = item
141
142                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
143
144                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
145
[6]146        def startElement( self, name, attrs ):
[8]147                "Store appropriate data from xml start tags"
[3]148
[7]149                if name == 'GANGLIA_XML':
[32]150
151                        self.XMLSource = attrs.get( 'SOURCE', "" )
152                        self.gangliaVersion = attrs.get( 'VERSION', "" )
153
[12]154                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]155
[7]156                elif name == 'GRID':
[32]157
158                        self.gridName = attrs.get( 'NAME', "" )
159                        self.time = attrs.get( 'LOCALTIME', "" )
160
[12]161                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]162
[7]163                elif name == 'CLUSTER':
[32]164
165                        self.clusterName = attrs.get( 'NAME', "" )
166                        self.time = attrs.get( 'LOCALTIME', "" )
167
[35]168                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
[32]169
[34]170                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]171
[35]172                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]173
[14]174                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
[32]175
176                        self.hostName = attrs.get( 'NAME', "" )
177                        self.hostIp = attrs.get( 'IP', "" )
178                        self.hostReported = attrs.get( 'REPORTED', "" )
179
[12]180                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]181
[14]182                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
[6]183
[32]184                        type = attrs.get( 'TYPE', "" )
[6]185
[32]186                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]187
[32]188                                myMetric = { }
189                                myMetric['name'] = attrs.get( 'NAME', "" )
190                                myMetric['val'] = attrs.get( 'VAL', "" )
191                                myMetric['time'] = self.hostReported
[3]192
[34]193                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]194
[34]195                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]196
[34]197        def storeMetrics( self ):
[9]198
[34]199                for clustername, rrdh in self.clusters.items():
[16]200
[38]201                        ret = rrdh.storeMetrics()
[9]202
[38]203                        if ret:
204                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
205                                return 1
206
207                return 0
208
[5]209class GangliaXMLGatherer:
[8]210        "Setup a connection and file object to Ganglia's XML"
[3]211
[8]212        s = None
213
214        def __init__( self, host, port ):
215                "Store host and port for connection"
216
[5]217                self.host = host
218                self.port = port
[33]219                self.connect()
[3]220
[33]221        def connect( self ):
222                "Setup connection to XML source"
[8]223
224                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]225
[5]226                        af, socktype, proto, canonname, sa = res
[32]227
[5]228                        try:
[32]229
[8]230                                self.s = socket.socket( af, socktype, proto )
[32]231
[5]232                        except socket.error, msg:
[32]233
[8]234                                self.s = None
[5]235                                continue
[32]236
[5]237                        try:
[32]238
[8]239                                self.s.connect( sa )
[32]240
[5]241                        except socket.error, msg:
[32]242
[8]243                                self.s.close()
244                                self.s = None
[5]245                                continue
[32]246
[5]247                        break
[3]248
[8]249                if self.s is None:
[32]250
[33]251                        debug_msg( 0, 'Could not open socket' )
252                        sys.exit( 1 )
[5]253
[33]254        def disconnect( self ):
255                "Close socket"
256
257                if self.s:
258                        self.s.close()
259                        self.s = None
260
261        def __del__( self ):
262                "Kill the socket before we leave"
263
264                self.disconnect()
265
266        def getFileObject( self ):
267                "Connect, and return a file object"
268
[38]269                if self.s:
270                        # Apearantly, only data is received when a connection is made
271                        # therefor, disconnect and connect
272                        #
273                        self.disconnect()
274                        self.connect()
[33]275
[8]276                return self.s.makefile( 'r' )
[5]277
[8]278class GangliaXMLProcessor:
[5]279
[33]280        def __init__( self ):
281                "Setup initial XML connection and handlers"
282
283                self.config = GangliaConfigParser( GMETAD_CONF )
284
285                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
286                self.myParser = make_parser()   
287                self.myHandler = GangliaXMLHandler( self.config )
288                self.myParser.setContentHandler( self.myHandler )
289
[9]290        def daemon( self ):
[8]291                "Run as daemon forever"
[5]292
[8]293                self.DAEMON = 1
[5]294
[8]295                # Fork the first child
296                #
297                pid = os.fork()
[32]298
[8]299                if pid > 0:
[7]300
[32]301                        sys.exit(0)  # end parent
302
[8]303                # creates a session and sets the process group ID
304                #
305                os.setsid()
[7]306
[8]307                # Fork the second child
308                #
309                pid = os.fork()
[32]310
[8]311                if pid > 0:
[5]312
[32]313                        sys.exit(0)  # end parent
314
[8]315                # Go to the root directory and set the umask
316                #
317                os.chdir('/')
318                os.umask(0)
319
320                sys.stdin.close()
321                sys.stdout.close()
[36]322                #sys.stderr.close()
[8]323
324                os.open('/dev/null', 0)
325                os.dup(0)
326                os.dup(0)
327
328                self.run()
329
[22]330        def printTime( self ):
[33]331                "Print current time in human readable format"
[22]332
[33]333                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]334
[9]335        def run( self ):
[8]336                "Main thread"
337
[36]338                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
339                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]340
[36]341                while( 1 ):
342
343                        if not xmlthread.isAlive():
344                                # Gather XML at the same interval as gmetad
345
346                                # threaded call to: self.processXML()
347                                #
348                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
349                                xmlthread.start()
350
351                        if not storethread.isAlive():
352                                # Store metrics every .. sec
353
354                                # threaded call to: self.storeMetrics()
355                                #
356                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
357                                storethread.start()
358               
359                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
360                        time.sleep( 1 ) 
361
[33]362        def storeMetrics( self ):
363                "Store metrics retained in memory to disk"
[22]364
[40]365                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]366
[38]367                # Store metrics somewhere between every 60 and 180 seconds
368                #
[44]369                #STORE_INTERVAL = random.randint( 360, 640 )
370                STORE_INTERVAL = 16
[22]371
[39]372                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]373                storethread.start()
374
[40]375                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]376                time.sleep( STORE_INTERVAL )
[40]377                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]378
379                if storethread.isAlive():
380
[40]381                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
382                        storethread.join( 180 ) # Maximum time is for storing thread to finish
383                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]384
[40]385                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]386
387                return 0
388
[39]389        def storeThread( self ):
390
[40]391                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
392                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]393                ret = self.myHandler.storeMetrics()
[40]394                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
395                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]396               
397                return ret
398
[8]399        def processXML( self ):
400                "Process XML"
401
[40]402                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]403
[39]404                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]405                parsethread.start()
406
[40]407                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]408                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]409                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]410
411                if parsethread.isAlive():
412
[40]413                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[43]414                        parsethread.join( 180 ) # Maximum time for XML thread to finish
[40]415                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]416
[40]417                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]418
419                return 0
420
[39]421        def parseThread( self ):
422
[40]423                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
424                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]425                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]426                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
427                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]428
429                return ret
430
[9]431class GangliaConfigParser:
432
[34]433        sources = [ ]
[9]434
435        def __init__( self, config ):
[32]436
[9]437                self.config = config
438                self.parseValues()
439
[32]440        def parseValues( self ):
[9]441                "Parse certain values from gmetad.conf"
442
443                readcfg = open( self.config, 'r' )
444
445                for line in readcfg.readlines():
446
447                        if line.count( '"' ) > 1:
448
[10]449                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]450
[11]451                                        source = { }
452                                        source['name'] = line.split( '"' )[1]
[9]453                                        source_words = line.split( '"' )[2].split( ' ' )
454
455                                        for word in source_words:
456
457                                                valid_interval = 1
458
459                                                for letter in word:
[32]460
[9]461                                                        if letter not in string.digits:
[32]462
[9]463                                                                valid_interval = 0
464
[10]465                                                if valid_interval and len(word) > 0:
[32]466
[9]467                                                        source['interval'] = word
[12]468                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]469       
470                                        # No interval found, use Ganglia's default     
471                                        if not source.has_key( 'interval' ):
472                                                source['interval'] = 15
473                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]474
[33]475                                        self.sources.append( source )
[9]476
477        def getInterval( self, source_name ):
[32]478
[9]479                for source in self.sources:
[32]480
[12]481                        if source['name'] == source_name:
[32]482
[9]483                                return source['interval']
[32]484
[9]485                return None
486
[34]487        def getLowestInterval( self ):
488
489                lowest_interval = 0
490
491                for source in self.sources:
492
493                        if not lowest_interval or source['interval'] <= lowest_interval:
494
495                                lowest_interval = source['interval']
496
497                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
498                if lowest_interval:
499                        return lowest_interval
500                else:
501                        return 15
502
[9]503class RRDHandler:
504
[32]505        myMetrics = { }
[40]506        lastStored = { }
[36]507        slot = None
[32]508
[33]509        def __init__( self, config, cluster ):
[44]510                self.block = 0
[9]511                self.cluster = cluster
[33]512                self.config = config
[40]513                self.slot = threading.Lock()
[37]514                self.rrdm = RRDMutator()
[42]515                self.gatherLastUpdates()
[9]516
[44]517        def isBlocking( self ):
518
519                return self.block
520
[42]521        def gatherLastUpdates( self ):
522                "Populate the lastStored list, containing timestamps of all last updates"
523
[44]524                self.block = 1
525
[42]526                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
527
528                hosts = [ ]
529
530                if os.path.exists( cluster_dir ):
531
[44]532                        dirlist = os.listdir( cluster_dir )
[42]533
[44]534                        for dir in dirlist:
[42]535
[44]536                                hosts.append( dir )
[42]537
538                for host in hosts:
539
540                        last_serial = self.getLastRrdTimeSerial( host )
541                        if last_serial:
542
543                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
544                                if os.path.exists( metric_dir ):
545
[44]546                                        dirlist = os.listdir( metric_dir )
[42]547
[44]548                                        for file in dirlist:
[42]549
[44]550                                                metricname = file.split( '.rrd' )[0]
[42]551
[44]552                                                if not self.lastStored.has_key( host ):
[42]553
[44]554                                                        self.lastStored[ host ] = { }
[42]555
[44]556                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]557
[32]558        def getClusterName( self ):
559                return self.cluster
560
561        def memMetric( self, host, metric ):
562
[34]563                if self.myMetrics.has_key( host ):
[32]564
[34]565                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]566
[34]567                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]568
[34]569                                        if mymetric['time'] == metric['time']:
[32]570
[34]571                                                # Allready have this metric, abort
572                                                return 1
573                        else:
574                                self.myMetrics[ host ][ metric['name'] ] = [ ]
575                else:
[32]576                        self.myMetrics[ host ] = { }
[34]577                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]578
[40]579                # Ah, psst, push it
580                #
581                # <atomic>
582                self.slot.acquire()
583
[32]584                self.myMetrics[ host ][ metric['name'] ].append( metric )
585
[40]586                self.slot.release()
587                # </atomic>
588
[37]589        def makeUpdateList( self, host, metricname ):
590
591                update_list = [ ]
[41]592                metric = None
[37]593
[40]594                while len( self.myMetrics[ host ][ metricname ] ) > 0:
[37]595
[40]596                        # Kabouter pop
597                        #
598                        # <atomic>     
599                        self.slot.acquire()
[37]600
[41]601                        # len might have changed since loop start
602                        #
603                        if len( self.myMetrics[ host ][ metricname ] ) > 0:
[42]604                                metric = self.myMetrics[ host ][ metricname ].pop( 0 )
[40]605
606                        self.slot.release()
607                        # </atomic>
608
[41]609                        if metric:
610                                if self.checkStoreMetric( host, metricname, metric ):
611                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[43]612                                #else:
613                                        #print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
[40]614
[37]615                return update_list
616
[40]617        def checkStoreMetric( self, host, metricname, metric ):
618
619                if self.lastStored.has_key( host ):
620
621                        if self.lastStored[ host ].has_key( metricname ):
622
[43]623                                if metric['time'] <= self.lastStored[ host ][ metricname ]:
[40]624
625                                        # Allready wrote a value with this timestamp, skip tnx
626                                        return 0
627
628                else:
629                        self.lastStored[ host ] = { }
630
631                self.lastStored[ host ][ metricname ] = metric['time']
632
633                return 1
634
[33]635        def storeMetrics( self ):
636
637                for hostname, mymetrics in self.myMetrics.items():     
638
639                        for metricname, mymetric in mymetrics.items():
640
[35]641                                mytime = self.makeTimeSerial()
[40]642                                correct_serial = self.checkNewRrdPeriod( hostname, mytime )
643                                self.createCheck( hostname, metricname, correct_serial )       
644                                update_ret = self.update( hostname, metricname, correct_serial )
[33]645
[36]646                                if update_ret == 0:
[33]647
648                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
649                                else:
650                                        debug_msg( 9, 'metric update failed' )
651
[43]652                                #sys.exit(1)
653
[17]654        def makeTimeSerial( self ):
[32]655                "Generate a time serial. Seconds since epoch"
[17]656
657                # Seconds since epoch
658                mytime = int( time.time() )
659
660                return mytime
661
[33]662        def makeRrdPath( self, host, metricname=None, timeserial=None ):
[32]663                """
664                Make a RRD location/path and filename
665                If a metric or timeserial are supplied the complete locations
666                will be made, else just the host directory
667                """
[17]668
[20]669                if not timeserial:     
670                        rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
671                else:
672                        rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
[35]673                if metricname:
[33]674                        rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[20]675                else:
676                        rrd_file = None
[17]677
678                return rrd_dir, rrd_file
679
[20]680        def getLastRrdTimeSerial( self, host ):
[32]681                """
682                Find the last timeserial (directory) for this host
683                This is determined once every host
684                """
[17]685
[20]686                rrd_dir, rrd_file = self.makeRrdPath( host )
[17]687
[19]688                newest_timeserial = 0
689
[17]690                if os.path.exists( rrd_dir ):
[32]691
[44]692                        dirlist = os.listdir( rrd_dir )
[17]693
[44]694                        for dir in dirlist:
[17]695
[44]696                                valid_dir = 1
[17]697
[44]698                                for letter in dir:
699                                        if letter not in string.digits:
700                                                valid_dir = 0
[17]701
[44]702                                if valid_dir:
703                                        timeserial = dir
704                                        if timeserial > newest_timeserial:
705                                                newest_timeserial = timeserial
[17]706
707                if newest_timeserial:
[18]708                        return newest_timeserial
[17]709                else:
710                        return 0
711
[20]712        def checkNewRrdPeriod( self, host, current_timeserial ):
[32]713                """
714                Check if current timeserial belongs to recent time period
715                or should become a new period (and file).
[17]716
[32]717                Returns the serial of the correct time period
718                """
719
[20]720                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
[30]721                debug_msg( 9, 'last timeserial of %s is %s' %( host, last_timeserial ) )
[17]722
723                if not last_timeserial:
[18]724                        serial = current_timeserial
725                else:
[17]726
[18]727                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[17]728
[18]729                        if (current_timeserial - last_timeserial) >= archive_secs:
730                                serial = current_timeserial
731                        else:
732                                serial = last_timeserial
[17]733
[18]734                return serial
735
[33]736        def getFirstTime( self, host, metricname ):
737                "Get the first time of a metric we know of"
738
739                first_time = 0
740
741                for metric in self.myMetrics[ host ][ metricname ]:
742
743                        if not first_time or metric['time'] <= first_time:
744
745                                first_time = metric['time']
746
[35]747                return first_time
748
[33]749        def createCheck( self, host, metricname, timeserial ):
[9]750                "Check if an .rrd allready exists for this metric, create if not"
751
[35]752                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]753
[33]754                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]755
[9]756                if not os.path.exists( rrd_dir ):
757                        os.makedirs( rrd_dir )
[14]758                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]759
[14]760                if not os.path.exists( rrd_file ):
[9]761
[33]762                        interval = self.config.getInterval( self.cluster )
[14]763                        heartbeat = 8 * int(interval)
[9]764
[37]765                        params = [ ]
[12]766
[37]767                        params.append( '--step' )
768                        params.append( str( interval ) )
[12]769
[37]770                        params.append( '--start' )
771                        params.append( str( int( self.getFirstTime( host, metricname ) ) - 1 ) )
[12]772
[37]773                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
774                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]775
[37]776                        self.rrdm.create( str(rrd_file), params )
777
[14]778                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
779
[33]780        def update( self, host, metricname, timeserial ):
[9]781
[35]782                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]783
[33]784                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]785
[37]786                update_list = self.makeUpdateList( host, metricname )
[15]787
[41]788                if len( update_list ) > 0:
789                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]790
[41]791                        if ret:
792                                return 1
[27]793               
[41]794                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]795
[36]796                return 0
797
[8]798def main():
799        "Program startup"
800
801        myProcessor = GangliaXMLProcessor()
802
[22]803        if DAEMONIZE:
804                myProcessor.daemon()
805        else:
806                myProcessor.run()
807
[9]808def check_dir( directory ):
809        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
810
811        if directory[-1] == '/':
812                directory = directory[:-1]
813
814        return directory
815
[12]816def debug_msg( level, msg ):
817
818        if (DEBUG_LEVEL >= level):
819                sys.stderr.write( msg + '\n' )
820
[5]821# Let's go
[9]822if __name__ == '__main__':
823        main()
Note: See TracBrowser for help on using the repository browser.