source: trunk/daemon/togad.py @ 54

Last change on this file since 54 was 54, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Several bugfixes
  • Now more than 1 storethread is possible
File size: 20.0 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import string
8import os
9import os.path
10import time
11import re
12import threading
13import mutex
14import random
15from types import *
16
17# Specify debugging level here;
18#
19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
22# 8  = RRD file activity
23# 7  = daemon threading
24#
25DEBUG_LEVEL = 8
26
27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
39# Amount of hours to store in one single archived .rrd
40#
41ARCHIVE_HOURS_PER_RRD = 12
42
43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
52
53# What XML data types not to store
54#
55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
56
57# Maximum time (in seconds) a parsethread may run
58#
59PARSE_TIMEOUT = 60
60
61# Maximum time (in seconds) a storethread may run
62#
63STORE_TIMEOUT = 360
64
65# Number of storing threads
66#
67STORE_THREADS = 1
68"""
69This is TOrque-GAnglia's data Daemon
70"""
71
72class RRDMutator:
73        "A class for handling .rrd mutations"
74
75        binary = '/usr/bin/rrdtool'
76
77        def __init__( self, binary=None ):
78
79                if binary:
80                        self.binary = binary
81
82        def create( self, filename, args ):
83                return self.perform( 'create', '"' + filename + '"', args )
84
85        def update( self, filename, args ):
86                return self.perform( 'update', '"' + filename + '"', args )
87
88        def grabLastUpdate( self, filename ):
89
90                last_update = 0
91
92                debug_msg( 8, self.binary + ' info "' + filename + '"' )
93
94                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
95
96                        if line.find( 'last_update') != -1:
97
98                                last_update = line.split( ' = ' )[1]
99
100                if last_update:
101                        return last_update
102                else:
103                        return 0
104
105        def perform( self, action, filename, args ):
106
107                arg_string = None
108
109                if type( args ) is not ListType:
110                        debug_msg( 8, 'Arguments needs to be of type List' )
111                        return 1
112
113                for arg in args:
114
115                        if not arg_string:
116
117                                arg_string = arg
118                        else:
119                                arg_string = arg_string + ' ' + arg
120
121                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
122
123                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
124
125                        if line.find( 'ERROR' ) != -1:
126
127                                error_msg = string.join( line.split( ' ' )[1:] )
128                                debug_msg( 8, error_msg )
129                                return 1
130
131                return 0
132
133class GangliaXMLHandler( ContentHandler ):
134        "Parse Ganglia's XML"
135
136        def __init__( self, config ):
137                self.config = config
138                self.clusters = { }
139                debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
140                self.gatherClusters()
141                debug_msg( 0, printTime() + ' - Check done.' )
142
143        def gatherClusters( self ):
144
145                archive_dir = check_dir(ARCHIVE_PATH)
146
147                hosts = [ ]
148
149                if os.path.exists( archive_dir ):
150
151                        dirlist = os.listdir( archive_dir )
152
153                        for item in dirlist:
154
155                                clustername = item
156
157                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
158
159                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
160
161        def startElement( self, name, attrs ):
162                "Store appropriate data from xml start tags"
163
164                if name == 'GANGLIA_XML':
165
166                        self.XMLSource = attrs.get( 'SOURCE', "" )
167                        self.gangliaVersion = attrs.get( 'VERSION', "" )
168
169                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
170
171                elif name == 'GRID':
172
173                        self.gridName = attrs.get( 'NAME', "" )
174                        self.time = attrs.get( 'LOCALTIME', "" )
175
176                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
177
178                elif name == 'CLUSTER':
179
180                        self.clusterName = attrs.get( 'NAME', "" )
181                        self.time = attrs.get( 'LOCALTIME', "" )
182
183                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
184
185                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
186
187                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
188
189                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
190
191                        self.hostName = attrs.get( 'NAME', "" )
192                        self.hostIp = attrs.get( 'IP', "" )
193                        self.hostReported = attrs.get( 'REPORTED', "" )
194
195                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
196
197                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
198
199                        type = attrs.get( 'TYPE', "" )
200
201                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
202
203                                myMetric = { }
204                                myMetric['name'] = attrs.get( 'NAME', "" )
205                                myMetric['val'] = attrs.get( 'VAL', "" )
206                                myMetric['time'] = self.hostReported
207
208                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
209
210                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
211
212        def storeMetrics( self ):
213
214                for clustername, rrdh in self.clusters.items():
215
216                        ret = rrdh.storeMetrics()
217
218                        if ret:
219                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
220                                return 1
221
222                return 0
223
224class GangliaXMLGatherer:
225        "Setup a connection and file object to Ganglia's XML"
226
227        s = None
228
229        def __init__( self, host, port ):
230                "Store host and port for connection"
231
232                self.host = host
233                self.port = port
234                self.connect()
235
236        def connect( self ):
237                "Setup connection to XML source"
238
239                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
240
241                        af, socktype, proto, canonname, sa = res
242
243                        try:
244
245                                self.s = socket.socket( af, socktype, proto )
246
247                        except socket.error, msg:
248
249                                self.s = None
250                                continue
251
252                        try:
253
254                                self.s.connect( sa )
255
256                        except socket.error, msg:
257
258                                self.s.close()
259                                self.s = None
260                                continue
261
262                        break
263
264                if self.s is None:
265
266                        debug_msg( 0, 'Could not open socket' )
267                        sys.exit( 1 )
268
269        def disconnect( self ):
270                "Close socket"
271
272                if self.s:
273                        self.s.close()
274                        self.s = None
275
276        def __del__( self ):
277                "Kill the socket before we leave"
278
279                self.disconnect()
280
281        def getFileObject( self ):
282                "Connect, and return a file object"
283
284                if self.s:
285                        # Apearantly, only data is received when a connection is made
286                        # therefor, disconnect and connect
287                        #
288                        self.disconnect()
289                        self.connect()
290
291                return self.s.makefile( 'r' )
292
293class GangliaXMLProcessor:
294
295        def __init__( self ):
296                "Setup initial XML connection and handlers"
297
298                self.config = GangliaConfigParser( GMETAD_CONF )
299
300                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
301                self.myParser = make_parser()   
302                self.myHandler = GangliaXMLHandler( self.config )
303                self.myParser.setContentHandler( self.myHandler )
304
305        def daemon( self ):
306                "Run as daemon forever"
307
308                self.DAEMON = 1
309
310                # Fork the first child
311                #
312                pid = os.fork()
313
314                if pid > 0:
315
316                        sys.exit(0)  # end parent
317
318                # creates a session and sets the process group ID
319                #
320                os.setsid()
321
322                # Fork the second child
323                #
324                pid = os.fork()
325
326                if pid > 0:
327
328                        sys.exit(0)  # end parent
329
330                # Go to the root directory and set the umask
331                #
332                os.chdir('/')
333                os.umask(0)
334
335                sys.stdin.close()
336                sys.stdout.close()
337                #sys.stderr.close()
338
339                os.open('/dev/null', 0)
340                os.dup(0)
341                os.dup(0)
342
343                self.run()
344
345        def printTime( self ):
346                "Print current time in human readable format"
347
348                return time.strftime("%a %d %b %Y %H:%M:%S")
349
350        def run( self ):
351                "Main thread"
352
353                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
354
355                storethreads = [ ]
356
357                for num in range( int( STORE_THREADS ) ):
358
359                        storethreads.append( threading.Thread( None, self.storeMetrics, 'storethread' ) )
360
361                while( 1 ):
362
363                        if not xmlthread.isAlive():
364                                # Gather XML at the same interval as gmetad
365
366                                # threaded call to: self.processXML()
367                                #
368                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
369                                xmlthread.start()
370
371                        for storethread in storethreads:
372
373                                mythread = storethreads.index( storethread )
374
375                                if not storethread.isAlive():
376
377                                        # Store metrics every .. sec
378
379                                        # threaded call to: self.storeMetrics()
380                                        #
381                                        storethreads[ mythread ] = threading.Thread( None, self.storeMetrics, 'storethread' )
382                                        storethreads[ mythread ].start()
383               
384                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
385                        time.sleep( 1 ) 
386
387        def storeMetrics( self ):
388                "Store metrics retained in memory to disk"
389
390                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
391
392                # Store metrics somewhere between every 60 and 180 seconds
393                #
394                #STORE_INTERVAL = random.randint( 360, 640 )
395                STORE_INTERVAL = 60
396
397                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
398                storethread.start()
399
400                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
401                time.sleep( STORE_INTERVAL )
402                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
403
404                if storethread.isAlive():
405
406                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
407                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
408                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
409
410                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
411
412                return 0
413
414        def storeThread( self ):
415
416                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
417                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
418                ret = self.myHandler.storeMetrics()
419                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
420                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
421               
422                return ret
423
424        def processXML( self ):
425                "Process XML"
426
427                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
428
429                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
430                parsethread.start()
431
432                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
433                time.sleep( float( self.config.getLowestInterval() ) ) 
434                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
435
436                if parsethread.isAlive():
437
438                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
439                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
440                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
441
442                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
443
444                return 0
445
446        def parseThread( self ):
447
448                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
449                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
450                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
451                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
452                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
453
454                return ret
455
456class GangliaConfigParser:
457
458        sources = [ ]
459
460        def __init__( self, config ):
461
462                self.config = config
463                self.parseValues()
464
465        def parseValues( self ):
466                "Parse certain values from gmetad.conf"
467
468                readcfg = open( self.config, 'r' )
469
470                for line in readcfg.readlines():
471
472                        if line.count( '"' ) > 1:
473
474                                if line.find( 'data_source' ) != -1 and line[0] != '#':
475
476                                        source = { }
477                                        source['name'] = line.split( '"' )[1]
478                                        source_words = line.split( '"' )[2].split( ' ' )
479
480                                        for word in source_words:
481
482                                                valid_interval = 1
483
484                                                for letter in word:
485
486                                                        if letter not in string.digits:
487
488                                                                valid_interval = 0
489
490                                                if valid_interval and len(word) > 0:
491
492                                                        source['interval'] = word
493                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
494       
495                                        # No interval found, use Ganglia's default     
496                                        if not source.has_key( 'interval' ):
497                                                source['interval'] = 15
498                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
499
500                                        self.sources.append( source )
501
502        def getInterval( self, source_name ):
503
504                for source in self.sources:
505
506                        if source['name'] == source_name:
507
508                                return source['interval']
509
510                return None
511
512        def getLowestInterval( self ):
513
514                lowest_interval = 0
515
516                for source in self.sources:
517
518                        if not lowest_interval or source['interval'] <= lowest_interval:
519
520                                lowest_interval = source['interval']
521
522                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
523                if lowest_interval:
524                        return lowest_interval
525                else:
526                        return 15
527
528class RRDHandler:
529
530        myMetrics = { }
531        lastStored = { }
532        timeserials = { }
533        slot = None
534
535        def __init__( self, config, cluster ):
536                self.block = 0
537                self.cluster = cluster
538                self.config = config
539                self.slot = threading.Lock()
540                self.rrdm = RRDMutator()
541                self.gatherLastUpdates()
542
543        def gatherLastUpdates( self ):
544                "Populate the lastStored list, containing timestamps of all last updates"
545
546                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
547
548                hosts = [ ]
549
550                if os.path.exists( cluster_dir ):
551
552                        dirlist = os.listdir( cluster_dir )
553
554                        for dir in dirlist:
555
556                                hosts.append( dir )
557
558                for host in hosts:
559
560                        host_dir = cluster_dir + '/' + host
561                        dirlist = os.listdir( host_dir )
562
563                        for dir in dirlist:
564
565                                if not self.timeserials.has_key( host ):
566
567                                        self.timeserials[ host ] = [ ]
568
569                                self.timeserials[ host ].append( dir )
570
571                        last_serial = self.getLastRrdTimeSerial( host )
572                        if last_serial:
573
574                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
575                                if os.path.exists( metric_dir ):
576
577                                        dirlist = os.listdir( metric_dir )
578
579                                        for file in dirlist:
580
581                                                metricname = file.split( '.rrd' )[0]
582
583                                                if not self.lastStored.has_key( host ):
584
585                                                        self.lastStored[ host ] = { }
586
587                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
588
589        def getClusterName( self ):
590                return self.cluster
591
592        def memMetric( self, host, metric ):
593
594                if self.myMetrics.has_key( host ):
595
596                        if self.myMetrics[ host ].has_key( metric['name'] ):
597
598                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
599
600                                        if mymetric['time'] == metric['time']:
601
602                                                # Allready have this metric, abort
603                                                return 1
604                        else:
605                                self.myMetrics[ host ][ metric['name'] ] = [ ]
606                else:
607                        self.myMetrics[ host ] = { }
608                        self.myMetrics[ host ][ metric['name'] ] = [ ]
609
610                # <ATOMIC>
611                #
612                self.slot.acquire()
613
614                self.myMetrics[ host ][ metric['name'] ].append( metric )
615
616                self.slot.release()
617                #
618                # </ATOMIC>
619
620        def makeUpdateList( self, host, metriclist ):
621
622                update_list = [ ]
623                metric = None
624
625                while len( metriclist ) > 0:
626
627                        metric = metriclist.pop( 0 )
628
629                        if self.checkStoreMetric( host, metric ):
630                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
631
632                return update_list
633
634        def checkStoreMetric( self, host, metric ):
635
636                if self.lastStored.has_key( host ):
637
638                        if self.lastStored[ host ].has_key( metric['name'] ):
639
640                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
641
642                                        # This is old
643                                        return 0
644
645                return 1
646
647        def memLastUpdate( self, host, metricname, metriclist ):
648
649                if not self.lastStored.has_key( host ):
650                        self.lastStored[ host ] = { }
651
652                last_update_time = 0
653
654                for metric in metriclist:
655
656                        if metric['name'] == metricname:
657
658                                if metric['time'] > last_update_time:
659
660                                        last_update_time = metric['time']
661
662                if self.lastStored[ host ].has_key( metricname ):
663                       
664                        if last_update_time <= self.lastStored[ host ][ metricname ]:
665                                return 1
666
667                self.lastStored[ host ][ metricname ] = last_update_time
668
669        def storeMetrics( self ):
670
671                for hostname, mymetrics in self.myMetrics.items():     
672
673                        for metricname, mymetric in mymetrics.items():
674
675                                metrics_to_store = [ ]
676
677                                # <ATOMIC>
678                                #
679                                self.slot.acquire() 
680
681                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
682
683                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
684                                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
685
686                                self.slot.release()
687                                #
688                                # </ATOMIC>
689
690                                # Create a mapping table, each metric to the period where it should be stored
691                                #
692                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
693
694                                update_rets = [ ]
695
696                                for period, pmetric in metric_serial_table.items():
697
698                                        self.createCheck( hostname, metricname, period )       
699
700                                        update_ret = self.update( hostname, metricname, period, pmetric )
701
702                                        if update_ret == 0:
703
704                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
705                                        else:
706                                                debug_msg( 9, 'metric update failed' )
707
708                                        update_rets.append( update_ret )
709
710                                if not (1) in update_rets:
711
712                                        self.memLastUpdate( hostname, metricname, metrics_to_store )
713
714        def makeTimeSerial( self ):
715                "Generate a time serial. Seconds since epoch"
716
717                # Seconds since epoch
718                mytime = int( time.time() )
719
720                return mytime
721
722        def makeRrdPath( self, host, metricname, timeserial ):
723                """
724                Make a RRD location/path and filename
725                If a metric or timeserial are supplied the complete locations
726                will be made, else just the host directory
727                """
728
729                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
730                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
731
732                return rrd_dir, rrd_file
733
734        def getLastRrdTimeSerial( self, host ):
735                """
736                Find the last timeserial (directory) for this host
737                This is determined once every host
738                """
739
740                newest_timeserial = 0
741
742                for dir in self.timeserials[ host ]:
743
744                        valid_dir = 1
745
746                        for letter in dir:
747                                if letter not in string.digits:
748                                        valid_dir = 0
749
750                        if valid_dir:
751                                timeserial = dir
752                                if timeserial > newest_timeserial:
753                                        newest_timeserial = timeserial
754
755                if newest_timeserial:
756                        return newest_timeserial
757                else:
758                        return 0
759
760        def determinePeriod( self, host, check_serial ):
761
762                period_serial = 0
763
764                for serial in self.timeserials[ host ]:
765
766                        if check_serial >= serial and period_serial < serial:
767
768                                period_serial = serial
769
770                return period_serial
771
772        def determineSerials( self, host, metricname, metriclist ):
773                """
774                Determine the correct serial and corresponding rrd to store
775                for a list of metrics
776                """
777
778                metric_serial_table = { }
779
780                for metric in metriclist:
781
782                        if metric['name'] == metricname:
783
784                                period = self.determinePeriod( host, metric['time'] )   
785
786                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
787
788                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
789
790                                        # This one should get it's own new period
791                                        period = metric['time']
792                                        self.timeserials[ host ].append( period )
793
794                                if not metric_serial_table.has_key( period ):
795
796                                        metric_serial_table[ period ] = [ ]
797
798                                metric_serial_table[ period ].append( metric )
799
800                return metric_serial_table
801
802        def createCheck( self, host, metricname, timeserial ):
803                "Check if an .rrd allready exists for this metric, create if not"
804
805                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
806               
807                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
808
809                if not os.path.exists( rrd_dir ):
810                        os.makedirs( rrd_dir )
811                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
812
813                if not os.path.exists( rrd_file ):
814
815                        interval = self.config.getInterval( self.cluster )
816                        heartbeat = 8 * int( interval )
817
818                        params = [ ]
819
820                        params.append( '--step' )
821                        params.append( str( interval ) )
822
823                        params.append( '--start' )
824                        params.append( str( int( timeserial ) - 1 ) )
825
826                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
827                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
828
829                        self.rrdm.create( str(rrd_file), params )
830
831                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
832
833        def update( self, host, metricname, timeserial, metriclist ):
834
835                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
836
837                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
838
839                update_list = self.makeUpdateList( host, metriclist )
840
841                if len( update_list ) > 0:
842                        ret = self.rrdm.update( str(rrd_file), update_list )
843
844                        if ret:
845                                return 1
846               
847                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
848
849                return 0
850
851def main():
852        "Program startup"
853
854        myProcessor = GangliaXMLProcessor()
855
856        if DAEMONIZE:
857                myProcessor.daemon()
858        else:
859                myProcessor.run()
860
861def check_dir( directory ):
862        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
863
864        if directory[-1] == '/':
865                directory = directory[:-1]
866
867        return directory
868
869def debug_msg( level, msg ):
870
871        if (DEBUG_LEVEL >= level):
872                sys.stderr.write( msg + '\n' )
873
874def printTime( ):
875        "Print current time in human readable format"
876
877        return time.strftime("%a %d %b %Y %H:%M:%S")
878
879# Let's go
880if __name__ == '__main__':
881        main()
Note: See TracBrowser for help on using the repository browser.