source: trunk/daemon/togad.py @ 42

Last change on this file since 42 was 42, checked in by bastiaans, 19 years ago

daemon/togad.py:

Added functions for gathering all 'last_update' values of existing .rrds

  • This is done 1 time, at startup, for each cluster
File size: 18.5 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[3]16
[8]17# Specify debugging level here;
18#
[38]19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
[40]22# 8  = RRD file activity
23# 7  = daemon threading
[8]24#
[40]25DEBUG_LEVEL = 9
[6]26
[9]27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
[13]39# Amount of hours to store in one single archived .rrd
[9]40#
[38]41ARCHIVE_HOURS_PER_RRD = 1
[13]42
[22]43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
[17]47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
[13]52
[17]53# What XML data types not to store
[13]54#
[17]55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]56
[8]57"""
58This is TOrque-GAnglia's data Daemon
59"""
60
[37]61class RRDMutator:
[38]62        "A class for handling .rrd mutations"
[37]63
64        binary = '/usr/bin/rrdtool'
65
66        def __init__( self, binary=None ):
67
68                if binary:
69                        self.binary = binary
70
71        def create( self, filename, args ):
[40]72                return self.perform( 'create', '"' + filename + '"', args )
[37]73
74        def update( self, filename, args ):
[40]75                return self.perform( 'update', '"' + filename + '"', args )
[37]76
[42]77        def grabLastUpdate( self, filename ):
78
79                last_update = 0
80
81                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
82
83                        if line.find( 'last_update') != -1:
84
85                                last_update = line.split( ' = ' )[1]
86
87                if last_update:
88                        return last_update
89                else:
90                        return 0
91
[40]92        def perform( self, action, filename, args ):
[37]93
94                arg_string = None
95
[40]96                if type( args ) is not ListType:
97                        debug_msg( 8, 'Arguments needs to be of type List' )
98                        return 1
99
[37]100                for arg in args:
101
102                        if not arg_string:
103
104                                arg_string = arg
105                        else:
106                                arg_string = arg_string + ' ' + arg
107
[40]108                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]109
[40]110                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]111
112                        if line.find( 'ERROR' ) != -1:
113
114                                error_msg = string.join( line.split( ' ' )[1:] )
[40]115                                debug_msg( 8, error_msg )
[37]116                                return 1
117
118                return 0
119
[6]120class GangliaXMLHandler( ContentHandler ):
[8]121        "Parse Ganglia's XML"
[3]122
[33]123        def __init__( self, config ):
124                self.config = config
[35]125                self.clusters = { }
[33]126
[6]127        def startElement( self, name, attrs ):
[8]128                "Store appropriate data from xml start tags"
[3]129
[7]130                if name == 'GANGLIA_XML':
[32]131
132                        self.XMLSource = attrs.get( 'SOURCE', "" )
133                        self.gangliaVersion = attrs.get( 'VERSION', "" )
134
[12]135                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]136
[7]137                elif name == 'GRID':
[32]138
139                        self.gridName = attrs.get( 'NAME', "" )
140                        self.time = attrs.get( 'LOCALTIME', "" )
141
[12]142                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]143
[7]144                elif name == 'CLUSTER':
[32]145
146                        self.clusterName = attrs.get( 'NAME', "" )
147                        self.time = attrs.get( 'LOCALTIME', "" )
148
[35]149                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
[32]150
[34]151                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]152
[35]153                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]154
[14]155                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
[32]156
157                        self.hostName = attrs.get( 'NAME', "" )
158                        self.hostIp = attrs.get( 'IP', "" )
159                        self.hostReported = attrs.get( 'REPORTED', "" )
160
[12]161                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]162
[14]163                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
[6]164
[32]165                        type = attrs.get( 'TYPE', "" )
[6]166
[32]167                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]168
[32]169                                myMetric = { }
170                                myMetric['name'] = attrs.get( 'NAME', "" )
171                                myMetric['val'] = attrs.get( 'VAL', "" )
172                                myMetric['time'] = self.hostReported
[3]173
[34]174                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]175
[34]176                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]177
[34]178        def storeMetrics( self ):
[9]179
[34]180                for clustername, rrdh in self.clusters.items():
[16]181
[38]182                        ret = rrdh.storeMetrics()
[9]183
[38]184                        if ret:
185                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
186                                return 1
187
188                return 0
189
[5]190class GangliaXMLGatherer:
[8]191        "Setup a connection and file object to Ganglia's XML"
[3]192
[8]193        s = None
194
195        def __init__( self, host, port ):
196                "Store host and port for connection"
197
[5]198                self.host = host
199                self.port = port
[33]200                self.connect()
[3]201
[33]202        def connect( self ):
203                "Setup connection to XML source"
[8]204
205                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]206
[5]207                        af, socktype, proto, canonname, sa = res
[32]208
[5]209                        try:
[32]210
[8]211                                self.s = socket.socket( af, socktype, proto )
[32]212
[5]213                        except socket.error, msg:
[32]214
[8]215                                self.s = None
[5]216                                continue
[32]217
[5]218                        try:
[32]219
[8]220                                self.s.connect( sa )
[32]221
[5]222                        except socket.error, msg:
[32]223
[8]224                                self.s.close()
225                                self.s = None
[5]226                                continue
[32]227
[5]228                        break
[3]229
[8]230                if self.s is None:
[32]231
[33]232                        debug_msg( 0, 'Could not open socket' )
233                        sys.exit( 1 )
[5]234
[33]235        def disconnect( self ):
236                "Close socket"
237
238                if self.s:
239                        self.s.close()
240                        self.s = None
241
242        def __del__( self ):
243                "Kill the socket before we leave"
244
245                self.disconnect()
246
247        def getFileObject( self ):
248                "Connect, and return a file object"
249
[38]250                if self.s:
251                        # Apearantly, only data is received when a connection is made
252                        # therefor, disconnect and connect
253                        #
254                        self.disconnect()
255                        self.connect()
[33]256
[8]257                return self.s.makefile( 'r' )
[5]258
[8]259class GangliaXMLProcessor:
[5]260
[33]261        def __init__( self ):
262                "Setup initial XML connection and handlers"
263
264                self.config = GangliaConfigParser( GMETAD_CONF )
265
266                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
267                self.myParser = make_parser()   
268                self.myHandler = GangliaXMLHandler( self.config )
269                self.myParser.setContentHandler( self.myHandler )
270
[9]271        def daemon( self ):
[8]272                "Run as daemon forever"
[5]273
[8]274                self.DAEMON = 1
[5]275
[8]276                # Fork the first child
277                #
278                pid = os.fork()
[32]279
[8]280                if pid > 0:
[7]281
[32]282                        sys.exit(0)  # end parent
283
[8]284                # creates a session and sets the process group ID
285                #
286                os.setsid()
[7]287
[8]288                # Fork the second child
289                #
290                pid = os.fork()
[32]291
[8]292                if pid > 0:
[5]293
[32]294                        sys.exit(0)  # end parent
295
[8]296                # Go to the root directory and set the umask
297                #
298                os.chdir('/')
299                os.umask(0)
300
301                sys.stdin.close()
302                sys.stdout.close()
[36]303                #sys.stderr.close()
[8]304
305                os.open('/dev/null', 0)
306                os.dup(0)
307                os.dup(0)
308
309                self.run()
310
[22]311        def printTime( self ):
[33]312                "Print current time in human readable format"
[22]313
[33]314                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]315
[9]316        def run( self ):
[8]317                "Main thread"
318
[36]319                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
320                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]321
[36]322                while( 1 ):
323
324                        if not xmlthread.isAlive():
325                                # Gather XML at the same interval as gmetad
326
327                                # threaded call to: self.processXML()
328                                #
329                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
330                                xmlthread.start()
331
332                        if not storethread.isAlive():
333                                # Store metrics every .. sec
334
335                                # threaded call to: self.storeMetrics()
336                                #
337                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
338                                storethread.start()
339               
340                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
341                        time.sleep( 1 ) 
342
[33]343        def storeMetrics( self ):
344                "Store metrics retained in memory to disk"
[22]345
[40]346                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]347
[38]348                # Store metrics somewhere between every 60 and 180 seconds
349                #
[40]350                #STORE_INTERVAL = random.randint( 60, 180 )
[42]351                STORE_INTERVAL = 180
[22]352
[39]353                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]354                storethread.start()
355
[40]356                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]357                time.sleep( STORE_INTERVAL )
[40]358                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]359
360                if storethread.isAlive():
361
[40]362                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
363                        storethread.join( 180 ) # Maximum time is for storing thread to finish
364                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]365
[40]366                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]367
368                return 0
369
[39]370        def storeThread( self ):
371
[40]372                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
373                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]374                ret = self.myHandler.storeMetrics()
[40]375                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
376                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]377               
378                return ret
379
[8]380        def processXML( self ):
381                "Process XML"
382
[40]383                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]384
[39]385                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]386                parsethread.start()
387
[40]388                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]389                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]390                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]391
392                if parsethread.isAlive():
393
[40]394                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[39]395                        parsethread.join( 60 ) # Maximum time for XML thread to finish
[40]396                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]397
[40]398                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]399
400                return 0
401
[39]402        def parseThread( self ):
403
[40]404                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
405                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]406                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]407                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
408                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]409
410                return ret
411
[9]412class GangliaConfigParser:
413
[34]414        sources = [ ]
[9]415
416        def __init__( self, config ):
[32]417
[9]418                self.config = config
419                self.parseValues()
420
[32]421        def parseValues( self ):
[9]422                "Parse certain values from gmetad.conf"
423
424                readcfg = open( self.config, 'r' )
425
426                for line in readcfg.readlines():
427
428                        if line.count( '"' ) > 1:
429
[10]430                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]431
[11]432                                        source = { }
433                                        source['name'] = line.split( '"' )[1]
[9]434                                        source_words = line.split( '"' )[2].split( ' ' )
435
436                                        for word in source_words:
437
438                                                valid_interval = 1
439
440                                                for letter in word:
[32]441
[9]442                                                        if letter not in string.digits:
[32]443
[9]444                                                                valid_interval = 0
445
[10]446                                                if valid_interval and len(word) > 0:
[32]447
[9]448                                                        source['interval'] = word
[12]449                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]450       
451                                        # No interval found, use Ganglia's default     
452                                        if not source.has_key( 'interval' ):
453                                                source['interval'] = 15
454                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]455
[33]456                                        self.sources.append( source )
[9]457
458        def getInterval( self, source_name ):
[32]459
[9]460                for source in self.sources:
[32]461
[12]462                        if source['name'] == source_name:
[32]463
[9]464                                return source['interval']
[32]465
[9]466                return None
467
[34]468        def getLowestInterval( self ):
469
470                lowest_interval = 0
471
472                for source in self.sources:
473
474                        if not lowest_interval or source['interval'] <= lowest_interval:
475
476                                lowest_interval = source['interval']
477
478                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
479                if lowest_interval:
480                        return lowest_interval
481                else:
482                        return 15
483
[9]484class RRDHandler:
485
[32]486        myMetrics = { }
[40]487        lastStored = { }
[36]488        slot = None
[32]489
[33]490        def __init__( self, config, cluster ):
[9]491                self.cluster = cluster
[33]492                self.config = config
[40]493                self.slot = threading.Lock()
[37]494                self.rrdm = RRDMutator()
[42]495                self.gatherLastUpdates()
[9]496
[42]497        def gatherLastUpdates( self ):
498                "Populate the lastStored list, containing timestamps of all last updates"
499
500                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
501
502                hosts = [ ]
503
504                if os.path.exists( cluster_dir ):
505
506                        for root, dirs, files in os.walk( cluster_dir ):
507
508                                for dir in dirs:
509
510                                        valid_dir = 1
511
512                                        for letter in dir:
513                                                if letter not in string.digits:
514                                                        valid_dir = 0
515
516                                        if valid_dir:
517                                                host = root.split( '/' )[-1]
518                                                hosts.append( host )
519
520                for host in hosts:
521
522                        last_serial = self.getLastRrdTimeSerial( host )
523                        if last_serial:
524
525                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
526                                if os.path.exists( metric_dir ):
527
528                                        for root, dirs, files in os.walk( metric_dir ):
529
530                                                for file in files:
531
532                                                        metricname = file.split( '.rrd' )[0]
533
534                                                        if not self.lastStored.has_key( host ):
535
536                                                                self.lastStored[ host ] = { }
537
538                                                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
539
[32]540        def getClusterName( self ):
541                return self.cluster
542
543        def memMetric( self, host, metric ):
544
[34]545                if self.myMetrics.has_key( host ):
[32]546
[34]547                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]548
[34]549                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]550
[34]551                                        if mymetric['time'] == metric['time']:
[32]552
[34]553                                                # Allready have this metric, abort
554                                                return 1
555                        else:
556                                self.myMetrics[ host ][ metric['name'] ] = [ ]
557                else:
[32]558                        self.myMetrics[ host ] = { }
[34]559                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]560
[40]561                # Ah, psst, push it
562                #
563                # <atomic>
564                self.slot.acquire()
565
[32]566                self.myMetrics[ host ][ metric['name'] ].append( metric )
567
[40]568                self.slot.release()
569                # </atomic>
570
[37]571        def makeUpdateList( self, host, metricname ):
572
573                update_list = [ ]
[41]574                metric = None
[37]575
[40]576                while len( self.myMetrics[ host ][ metricname ] ) > 0:
[37]577
[40]578                        # Kabouter pop
579                        #
580                        # <atomic>     
581                        self.slot.acquire()
[37]582
[41]583                        # len might have changed since loop start
584                        #
585                        if len( self.myMetrics[ host ][ metricname ] ) > 0:
[42]586                                metric = self.myMetrics[ host ][ metricname ].pop( 0 )
[40]587
588                        self.slot.release()
589                        # </atomic>
590
[41]591                        if metric:
592                                if self.checkStoreMetric( host, metricname, metric ):
593                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
594                                else:
595                                        print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
[40]596
[37]597                return update_list
598
[40]599        def checkStoreMetric( self, host, metricname, metric ):
600
601                if self.lastStored.has_key( host ):
602
603                        if self.lastStored[ host ].has_key( metricname ):
604
605                                if self.lastStored[ host ][ metricname ] <= metric['time']:
606
607                                        # Allready wrote a value with this timestamp, skip tnx
608                                        return 0
609
610                else:
611                        self.lastStored[ host ] = { }
612
613                self.lastStored[ host ][ metricname ] = metric['time']
614
615                return 1
616
[33]617        def storeMetrics( self ):
618
619                for hostname, mymetrics in self.myMetrics.items():     
620
621                        for metricname, mymetric in mymetrics.items():
622
[35]623                                mytime = self.makeTimeSerial()
[40]624                                correct_serial = self.checkNewRrdPeriod( hostname, mytime )
625                                self.createCheck( hostname, metricname, correct_serial )       
626                                update_ret = self.update( hostname, metricname, correct_serial )
[33]627
[36]628                                if update_ret == 0:
[33]629
630                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
631                                else:
632                                        debug_msg( 9, 'metric update failed' )
633
[17]634        def makeTimeSerial( self ):
[32]635                "Generate a time serial. Seconds since epoch"
[17]636
637                # Seconds since epoch
638                mytime = int( time.time() )
639
640                return mytime
641
[33]642        def makeRrdPath( self, host, metricname=None, timeserial=None ):
[32]643                """
644                Make a RRD location/path and filename
645                If a metric or timeserial are supplied the complete locations
646                will be made, else just the host directory
647                """
[17]648
[20]649                if not timeserial:     
650                        rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
651                else:
652                        rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
[35]653                if metricname:
[33]654                        rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[20]655                else:
656                        rrd_file = None
[17]657
658                return rrd_dir, rrd_file
659
[20]660        def getLastRrdTimeSerial( self, host ):
[32]661                """
662                Find the last timeserial (directory) for this host
663                This is determined once every host
664                """
[17]665
[20]666                rrd_dir, rrd_file = self.makeRrdPath( host )
[17]667
[19]668                newest_timeserial = 0
669
[17]670                if os.path.exists( rrd_dir ):
[32]671
[17]672                        for root, dirs, files in os.walk( rrd_dir ):
673
[20]674                                for dir in dirs:
[17]675
[20]676                                        valid_dir = 1
[17]677
[20]678                                        for letter in dir:
679                                                if letter not in string.digits:
680                                                        valid_dir = 0
[17]681
[20]682                                        if valid_dir:
683                                                timeserial = dir
[17]684                                                if timeserial > newest_timeserial:
685                                                        newest_timeserial = timeserial
686
687                if newest_timeserial:
[18]688                        return newest_timeserial
[17]689                else:
690                        return 0
691
[20]692        def checkNewRrdPeriod( self, host, current_timeserial ):
[32]693                """
694                Check if current timeserial belongs to recent time period
695                or should become a new period (and file).
[17]696
[32]697                Returns the serial of the correct time period
698                """
699
[20]700                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
[30]701                debug_msg( 9, 'last timeserial of %s is %s' %( host, last_timeserial ) )
[17]702
703                if not last_timeserial:
[18]704                        serial = current_timeserial
705                else:
[17]706
[18]707                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[17]708
[18]709                        if (current_timeserial - last_timeserial) >= archive_secs:
710                                serial = current_timeserial
711                        else:
712                                serial = last_timeserial
[17]713
[18]714                return serial
715
[33]716        def getFirstTime( self, host, metricname ):
717                "Get the first time of a metric we know of"
718
719                first_time = 0
720
721                for metric in self.myMetrics[ host ][ metricname ]:
722
723                        if not first_time or metric['time'] <= first_time:
724
725                                first_time = metric['time']
726
[35]727                return first_time
728
[33]729        def createCheck( self, host, metricname, timeserial ):
[9]730                "Check if an .rrd allready exists for this metric, create if not"
731
[35]732                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]733
[33]734                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]735
[9]736                if not os.path.exists( rrd_dir ):
737                        os.makedirs( rrd_dir )
[14]738                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]739
[14]740                if not os.path.exists( rrd_file ):
[9]741
[33]742                        interval = self.config.getInterval( self.cluster )
[14]743                        heartbeat = 8 * int(interval)
[9]744
[37]745                        params = [ ]
[12]746
[37]747                        params.append( '--step' )
748                        params.append( str( interval ) )
[12]749
[37]750                        params.append( '--start' )
751                        params.append( str( int( self.getFirstTime( host, metricname ) ) - 1 ) )
[12]752
[37]753                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
754                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]755
[37]756                        self.rrdm.create( str(rrd_file), params )
757
[14]758                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
759
[33]760        def update( self, host, metricname, timeserial ):
[9]761
[35]762                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]763
[33]764                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]765
[37]766                update_list = self.makeUpdateList( host, metricname )
[15]767
[41]768                if len( update_list ) > 0:
769                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]770
[41]771                        if ret:
772                                return 1
[27]773               
[41]774                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]775
[36]776                return 0
777
[8]778def main():
779        "Program startup"
780
781        myProcessor = GangliaXMLProcessor()
782
[22]783        if DAEMONIZE:
784                myProcessor.daemon()
785        else:
786                myProcessor.run()
787
[9]788def check_dir( directory ):
789        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
790
791        if directory[-1] == '/':
792                directory = directory[:-1]
793
794        return directory
795
[12]796def debug_msg( level, msg ):
797
798        if (DEBUG_LEVEL >= level):
799                sys.stderr.write( msg + '\n' )
800
[5]801# Let's go
[9]802if __name__ == '__main__':
803        main()
Note: See TracBrowser for help on using the repository browser.