source: trunk/daemon/togad.py @ 46

Last change on this file since 46 was 46, checked in by bastiaans, 19 years ago

daemon/togad.py:

Added extra debug msg

File size: 19.1 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[3]16
[8]17# Specify debugging level here;
18#
[38]19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
[40]22# 8  = RRD file activity
23# 7  = daemon threading
[8]24#
[46]25DEBUG_LEVEL = 7
[6]26
[9]27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
[13]39# Amount of hours to store in one single archived .rrd
[9]40#
[43]41ARCHIVE_HOURS_PER_RRD = 12
[13]42
[22]43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
[17]47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
[13]52
[17]53# What XML data types not to store
[13]54#
[17]55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]56
[8]57"""
58This is TOrque-GAnglia's data Daemon
59"""
60
[37]61class RRDMutator:
[38]62        "A class for handling .rrd mutations"
[37]63
64        binary = '/usr/bin/rrdtool'
65
66        def __init__( self, binary=None ):
67
68                if binary:
69                        self.binary = binary
70
71        def create( self, filename, args ):
[40]72                return self.perform( 'create', '"' + filename + '"', args )
[37]73
74        def update( self, filename, args ):
[40]75                return self.perform( 'update', '"' + filename + '"', args )
[37]76
[42]77        def grabLastUpdate( self, filename ):
78
79                last_update = 0
80
81                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
82
83                        if line.find( 'last_update') != -1:
84
85                                last_update = line.split( ' = ' )[1]
86
87                if last_update:
88                        return last_update
89                else:
90                        return 0
91
[40]92        def perform( self, action, filename, args ):
[37]93
94                arg_string = None
95
[40]96                if type( args ) is not ListType:
97                        debug_msg( 8, 'Arguments needs to be of type List' )
98                        return 1
99
[37]100                for arg in args:
101
102                        if not arg_string:
103
104                                arg_string = arg
105                        else:
106                                arg_string = arg_string + ' ' + arg
107
[40]108                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]109
[40]110                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]111
112                        if line.find( 'ERROR' ) != -1:
113
114                                error_msg = string.join( line.split( ' ' )[1:] )
[40]115                                debug_msg( 8, error_msg )
[37]116                                return 1
117
118                return 0
119
[6]120class GangliaXMLHandler( ContentHandler ):
[8]121        "Parse Ganglia's XML"
[3]122
[33]123        def __init__( self, config ):
124                self.config = config
[35]125                self.clusters = { }
[46]126                debug_msg( 0, printTime() + ' Gathering cluster timeserials and RRD last updates..' )
[44]127                self.gatherClusters()
[46]128                debug_msg( 0, printTime() + ' Done gathering timeserials and last updates.' )
[33]129
[44]130        def gatherClusters( self ):
131
[45]132                archive_dir = check_dir(ARCHIVE_PATH)
[44]133
134                hosts = [ ]
135
136                if os.path.exists( archive_dir ):
137
138                        dirlist = os.listdir( archive_dir )
139
140                        for item in dirlist:
141
142                                clustername = item
143
144                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
145
146                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
147
[6]148        def startElement( self, name, attrs ):
[8]149                "Store appropriate data from xml start tags"
[3]150
[7]151                if name == 'GANGLIA_XML':
[32]152
153                        self.XMLSource = attrs.get( 'SOURCE', "" )
154                        self.gangliaVersion = attrs.get( 'VERSION', "" )
155
[12]156                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]157
[7]158                elif name == 'GRID':
[32]159
160                        self.gridName = attrs.get( 'NAME', "" )
161                        self.time = attrs.get( 'LOCALTIME', "" )
162
[12]163                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]164
[7]165                elif name == 'CLUSTER':
[32]166
167                        self.clusterName = attrs.get( 'NAME', "" )
168                        self.time = attrs.get( 'LOCALTIME', "" )
169
[35]170                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
[32]171
[34]172                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]173
[35]174                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]175
[14]176                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
[32]177
178                        self.hostName = attrs.get( 'NAME', "" )
179                        self.hostIp = attrs.get( 'IP', "" )
180                        self.hostReported = attrs.get( 'REPORTED', "" )
181
[12]182                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]183
[14]184                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
[6]185
[32]186                        type = attrs.get( 'TYPE', "" )
[6]187
[32]188                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]189
[32]190                                myMetric = { }
191                                myMetric['name'] = attrs.get( 'NAME', "" )
192                                myMetric['val'] = attrs.get( 'VAL', "" )
193                                myMetric['time'] = self.hostReported
[3]194
[34]195                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]196
[34]197                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]198
[34]199        def storeMetrics( self ):
[9]200
[34]201                for clustername, rrdh in self.clusters.items():
[16]202
[38]203                        ret = rrdh.storeMetrics()
[9]204
[38]205                        if ret:
206                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
207                                return 1
208
209                return 0
210
[5]211class GangliaXMLGatherer:
[8]212        "Setup a connection and file object to Ganglia's XML"
[3]213
[8]214        s = None
215
216        def __init__( self, host, port ):
217                "Store host and port for connection"
218
[5]219                self.host = host
220                self.port = port
[33]221                self.connect()
[3]222
[33]223        def connect( self ):
224                "Setup connection to XML source"
[8]225
226                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]227
[5]228                        af, socktype, proto, canonname, sa = res
[32]229
[5]230                        try:
[32]231
[8]232                                self.s = socket.socket( af, socktype, proto )
[32]233
[5]234                        except socket.error, msg:
[32]235
[8]236                                self.s = None
[5]237                                continue
[32]238
[5]239                        try:
[32]240
[8]241                                self.s.connect( sa )
[32]242
[5]243                        except socket.error, msg:
[32]244
[8]245                                self.s.close()
246                                self.s = None
[5]247                                continue
[32]248
[5]249                        break
[3]250
[8]251                if self.s is None:
[32]252
[33]253                        debug_msg( 0, 'Could not open socket' )
254                        sys.exit( 1 )
[5]255
[33]256        def disconnect( self ):
257                "Close socket"
258
259                if self.s:
260                        self.s.close()
261                        self.s = None
262
263        def __del__( self ):
264                "Kill the socket before we leave"
265
266                self.disconnect()
267
268        def getFileObject( self ):
269                "Connect, and return a file object"
270
[38]271                if self.s:
272                        # Apearantly, only data is received when a connection is made
273                        # therefor, disconnect and connect
274                        #
275                        self.disconnect()
276                        self.connect()
[33]277
[8]278                return self.s.makefile( 'r' )
[5]279
[8]280class GangliaXMLProcessor:
[5]281
[33]282        def __init__( self ):
283                "Setup initial XML connection and handlers"
284
285                self.config = GangliaConfigParser( GMETAD_CONF )
286
287                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
288                self.myParser = make_parser()   
289                self.myHandler = GangliaXMLHandler( self.config )
290                self.myParser.setContentHandler( self.myHandler )
291
[9]292        def daemon( self ):
[8]293                "Run as daemon forever"
[5]294
[8]295                self.DAEMON = 1
[5]296
[8]297                # Fork the first child
298                #
299                pid = os.fork()
[32]300
[8]301                if pid > 0:
[7]302
[32]303                        sys.exit(0)  # end parent
304
[8]305                # creates a session and sets the process group ID
306                #
307                os.setsid()
[7]308
[8]309                # Fork the second child
310                #
311                pid = os.fork()
[32]312
[8]313                if pid > 0:
[5]314
[32]315                        sys.exit(0)  # end parent
316
[8]317                # Go to the root directory and set the umask
318                #
319                os.chdir('/')
320                os.umask(0)
321
322                sys.stdin.close()
323                sys.stdout.close()
[36]324                #sys.stderr.close()
[8]325
326                os.open('/dev/null', 0)
327                os.dup(0)
328                os.dup(0)
329
330                self.run()
331
[22]332        def printTime( self ):
[33]333                "Print current time in human readable format"
[22]334
[33]335                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]336
[9]337        def run( self ):
[8]338                "Main thread"
339
[36]340                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
341                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]342
[36]343                while( 1 ):
344
345                        if not xmlthread.isAlive():
346                                # Gather XML at the same interval as gmetad
347
348                                # threaded call to: self.processXML()
349                                #
350                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
351                                xmlthread.start()
352
353                        if not storethread.isAlive():
354                                # Store metrics every .. sec
355
356                                # threaded call to: self.storeMetrics()
357                                #
358                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
359                                storethread.start()
360               
361                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
362                        time.sleep( 1 ) 
363
[33]364        def storeMetrics( self ):
365                "Store metrics retained in memory to disk"
[22]366
[40]367                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]368
[38]369                # Store metrics somewhere between every 60 and 180 seconds
370                #
[44]371                #STORE_INTERVAL = random.randint( 360, 640 )
372                STORE_INTERVAL = 16
[22]373
[39]374                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]375                storethread.start()
376
[40]377                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]378                time.sleep( STORE_INTERVAL )
[40]379                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]380
381                if storethread.isAlive():
382
[40]383                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
384                        storethread.join( 180 ) # Maximum time is for storing thread to finish
385                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]386
[40]387                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]388
389                return 0
390
[39]391        def storeThread( self ):
392
[40]393                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
394                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]395                ret = self.myHandler.storeMetrics()
[40]396                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
397                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]398               
399                return ret
400
[8]401        def processXML( self ):
402                "Process XML"
403
[40]404                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]405
[39]406                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]407                parsethread.start()
408
[40]409                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]410                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]411                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]412
413                if parsethread.isAlive():
414
[40]415                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[43]416                        parsethread.join( 180 ) # Maximum time for XML thread to finish
[40]417                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]418
[40]419                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]420
421                return 0
422
[39]423        def parseThread( self ):
424
[40]425                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
426                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]427                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]428                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
429                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]430
431                return ret
432
[9]433class GangliaConfigParser:
434
[34]435        sources = [ ]
[9]436
437        def __init__( self, config ):
[32]438
[9]439                self.config = config
440                self.parseValues()
441
[32]442        def parseValues( self ):
[9]443                "Parse certain values from gmetad.conf"
444
445                readcfg = open( self.config, 'r' )
446
447                for line in readcfg.readlines():
448
449                        if line.count( '"' ) > 1:
450
[10]451                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]452
[11]453                                        source = { }
454                                        source['name'] = line.split( '"' )[1]
[9]455                                        source_words = line.split( '"' )[2].split( ' ' )
456
457                                        for word in source_words:
458
459                                                valid_interval = 1
460
461                                                for letter in word:
[32]462
[9]463                                                        if letter not in string.digits:
[32]464
[9]465                                                                valid_interval = 0
466
[10]467                                                if valid_interval and len(word) > 0:
[32]468
[9]469                                                        source['interval'] = word
[12]470                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]471       
472                                        # No interval found, use Ganglia's default     
473                                        if not source.has_key( 'interval' ):
474                                                source['interval'] = 15
475                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]476
[33]477                                        self.sources.append( source )
[9]478
479        def getInterval( self, source_name ):
[32]480
[9]481                for source in self.sources:
[32]482
[12]483                        if source['name'] == source_name:
[32]484
[9]485                                return source['interval']
[32]486
[9]487                return None
488
[34]489        def getLowestInterval( self ):
490
491                lowest_interval = 0
492
493                for source in self.sources:
494
495                        if not lowest_interval or source['interval'] <= lowest_interval:
496
497                                lowest_interval = source['interval']
498
499                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
500                if lowest_interval:
501                        return lowest_interval
502                else:
503                        return 15
504
[9]505class RRDHandler:
506
[32]507        myMetrics = { }
[40]508        lastStored = { }
[36]509        slot = None
[32]510
[33]511        def __init__( self, config, cluster ):
[44]512                self.block = 0
[9]513                self.cluster = cluster
[33]514                self.config = config
[40]515                self.slot = threading.Lock()
[37]516                self.rrdm = RRDMutator()
[42]517                self.gatherLastUpdates()
[9]518
[44]519        def isBlocking( self ):
520
521                return self.block
522
[42]523        def gatherLastUpdates( self ):
524                "Populate the lastStored list, containing timestamps of all last updates"
525
[44]526                self.block = 1
527
[42]528                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
529
530                hosts = [ ]
531
532                if os.path.exists( cluster_dir ):
533
[44]534                        dirlist = os.listdir( cluster_dir )
[42]535
[44]536                        for dir in dirlist:
[42]537
[44]538                                hosts.append( dir )
[42]539
540                for host in hosts:
541
542                        last_serial = self.getLastRrdTimeSerial( host )
543                        if last_serial:
544
545                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
546                                if os.path.exists( metric_dir ):
547
[44]548                                        dirlist = os.listdir( metric_dir )
[42]549
[44]550                                        for file in dirlist:
[42]551
[44]552                                                metricname = file.split( '.rrd' )[0]
[42]553
[44]554                                                if not self.lastStored.has_key( host ):
[42]555
[44]556                                                        self.lastStored[ host ] = { }
[42]557
[44]558                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]559
[32]560        def getClusterName( self ):
561                return self.cluster
562
563        def memMetric( self, host, metric ):
564
[34]565                if self.myMetrics.has_key( host ):
[32]566
[34]567                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]568
[34]569                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]570
[34]571                                        if mymetric['time'] == metric['time']:
[32]572
[34]573                                                # Allready have this metric, abort
574                                                return 1
575                        else:
576                                self.myMetrics[ host ][ metric['name'] ] = [ ]
577                else:
[32]578                        self.myMetrics[ host ] = { }
[34]579                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]580
[40]581                # Ah, psst, push it
582                #
583                # <atomic>
584                self.slot.acquire()
585
[32]586                self.myMetrics[ host ][ metric['name'] ].append( metric )
587
[40]588                self.slot.release()
589                # </atomic>
590
[37]591        def makeUpdateList( self, host, metricname ):
592
593                update_list = [ ]
[41]594                metric = None
[37]595
[40]596                while len( self.myMetrics[ host ][ metricname ] ) > 0:
[37]597
[40]598                        # Kabouter pop
599                        #
600                        # <atomic>     
601                        self.slot.acquire()
[37]602
[41]603                        # len might have changed since loop start
604                        #
605                        if len( self.myMetrics[ host ][ metricname ] ) > 0:
[42]606                                metric = self.myMetrics[ host ][ metricname ].pop( 0 )
[40]607
608                        self.slot.release()
609                        # </atomic>
610
[41]611                        if metric:
612                                if self.checkStoreMetric( host, metricname, metric ):
613                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[43]614                                #else:
615                                        #print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
[40]616
[37]617                return update_list
618
[40]619        def checkStoreMetric( self, host, metricname, metric ):
620
621                if self.lastStored.has_key( host ):
622
623                        if self.lastStored[ host ].has_key( metricname ):
624
[43]625                                if metric['time'] <= self.lastStored[ host ][ metricname ]:
[40]626
627                                        # Allready wrote a value with this timestamp, skip tnx
628                                        return 0
629
630                else:
631                        self.lastStored[ host ] = { }
632
633                self.lastStored[ host ][ metricname ] = metric['time']
634
635                return 1
636
[33]637        def storeMetrics( self ):
638
639                for hostname, mymetrics in self.myMetrics.items():     
640
641                        for metricname, mymetric in mymetrics.items():
642
[35]643                                mytime = self.makeTimeSerial()
[40]644                                correct_serial = self.checkNewRrdPeriod( hostname, mytime )
645                                self.createCheck( hostname, metricname, correct_serial )       
646                                update_ret = self.update( hostname, metricname, correct_serial )
[33]647
[36]648                                if update_ret == 0:
[33]649
650                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
651                                else:
652                                        debug_msg( 9, 'metric update failed' )
653
[43]654                                #sys.exit(1)
655
[17]656        def makeTimeSerial( self ):
[32]657                "Generate a time serial. Seconds since epoch"
[17]658
659                # Seconds since epoch
660                mytime = int( time.time() )
661
662                return mytime
663
[33]664        def makeRrdPath( self, host, metricname=None, timeserial=None ):
[32]665                """
666                Make a RRD location/path and filename
667                If a metric or timeserial are supplied the complete locations
668                will be made, else just the host directory
669                """
[17]670
[20]671                if not timeserial:     
672                        rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
673                else:
674                        rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
[35]675                if metricname:
[33]676                        rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[20]677                else:
678                        rrd_file = None
[17]679
680                return rrd_dir, rrd_file
681
[20]682        def getLastRrdTimeSerial( self, host ):
[32]683                """
684                Find the last timeserial (directory) for this host
685                This is determined once every host
686                """
[17]687
[20]688                rrd_dir, rrd_file = self.makeRrdPath( host )
[17]689
[19]690                newest_timeserial = 0
691
[17]692                if os.path.exists( rrd_dir ):
[32]693
[44]694                        dirlist = os.listdir( rrd_dir )
[17]695
[44]696                        for dir in dirlist:
[17]697
[44]698                                valid_dir = 1
[17]699
[44]700                                for letter in dir:
701                                        if letter not in string.digits:
702                                                valid_dir = 0
[17]703
[44]704                                if valid_dir:
705                                        timeserial = dir
706                                        if timeserial > newest_timeserial:
707                                                newest_timeserial = timeserial
[17]708
709                if newest_timeserial:
[18]710                        return newest_timeserial
[17]711                else:
712                        return 0
713
[20]714        def checkNewRrdPeriod( self, host, current_timeserial ):
[32]715                """
716                Check if current timeserial belongs to recent time period
717                or should become a new period (and file).
[17]718
[32]719                Returns the serial of the correct time period
720                """
721
[20]722                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
[30]723                debug_msg( 9, 'last timeserial of %s is %s' %( host, last_timeserial ) )
[17]724
725                if not last_timeserial:
[18]726                        serial = current_timeserial
727                else:
[17]728
[18]729                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[17]730
[18]731                        if (current_timeserial - last_timeserial) >= archive_secs:
732                                serial = current_timeserial
733                        else:
734                                serial = last_timeserial
[17]735
[18]736                return serial
737
[33]738        def getFirstTime( self, host, metricname ):
739                "Get the first time of a metric we know of"
740
741                first_time = 0
742
743                for metric in self.myMetrics[ host ][ metricname ]:
744
745                        if not first_time or metric['time'] <= first_time:
746
747                                first_time = metric['time']
748
[35]749                return first_time
750
[33]751        def createCheck( self, host, metricname, timeserial ):
[9]752                "Check if an .rrd allready exists for this metric, create if not"
753
[35]754                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]755
[33]756                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]757
[9]758                if not os.path.exists( rrd_dir ):
759                        os.makedirs( rrd_dir )
[14]760                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]761
[14]762                if not os.path.exists( rrd_file ):
[9]763
[33]764                        interval = self.config.getInterval( self.cluster )
[14]765                        heartbeat = 8 * int(interval)
[9]766
[37]767                        params = [ ]
[12]768
[37]769                        params.append( '--step' )
770                        params.append( str( interval ) )
[12]771
[37]772                        params.append( '--start' )
773                        params.append( str( int( self.getFirstTime( host, metricname ) ) - 1 ) )
[12]774
[37]775                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
776                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]777
[37]778                        self.rrdm.create( str(rrd_file), params )
779
[14]780                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
781
[33]782        def update( self, host, metricname, timeserial ):
[9]783
[35]784                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]785
[33]786                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]787
[37]788                update_list = self.makeUpdateList( host, metricname )
[15]789
[41]790                if len( update_list ) > 0:
791                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]792
[41]793                        if ret:
794                                return 1
[27]795               
[41]796                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]797
[36]798                return 0
799
[8]800def main():
801        "Program startup"
802
803        myProcessor = GangliaXMLProcessor()
804
[22]805        if DAEMONIZE:
806                myProcessor.daemon()
807        else:
808                myProcessor.run()
809
[9]810def check_dir( directory ):
811        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
812
813        if directory[-1] == '/':
814                directory = directory[:-1]
815
816        return directory
817
[12]818def debug_msg( level, msg ):
819
820        if (DEBUG_LEVEL >= level):
821                sys.stderr.write( msg + '\n' )
822
[46]823def printTime( ):
824        "Print current time in human readable format"
825
826        return time.strftime("%a %d %b %Y %H:%M:%S")
827
[5]828# Let's go
[9]829if __name__ == '__main__':
830        main()
Note: See TracBrowser for help on using the repository browser.