source: trunk/daemon/togad.py @ 62

Last change on this file since 62 was 62, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Class added for own data server for catching jobinfo from plugin.
File size: 21.8 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import string
8import os
9import os.path
10import time
11import re
12import threading
13import mutex
14import random
15from types import *
16import DBClass
17
18# Specify debugging level here;
19#
20# 11 = XML: metrics
21# 10 = XML: host, cluster, grid, ganglia
22# 9  = RRD activity, gmetad config parsing
23# 8  = RRD file activity
24# 7  = daemon threading
25#
26DEBUG_LEVEL = 7
27
28# Where is the gmetad.conf located
29#
30GMETAD_CONF = '/etc/gmetad.conf'
31
32# Wether or not to maintain a archive of all ganglia node data
33# Note: This will require a significant amount of hd space
34#       depending on your cluster size
35#
36ARCHIVE = 0
37
38# Where to grab XML data from
39# Normally: local gmetad (port 8651)
40#
41ARCHIVE_SOURCE = "localhost:8651"
42
43# List of data_source names to archive for
44#
45ARCHIVE_DATASOURCES = [ "LISA Cluster" ]
46
47# Where to store the archived rrd's
48#
49ARCHIVE_PATH = '/data/toga/rrds'
50
51# Amount of hours to store in one single archived .rrd
52#
53ARCHIVE_HOURS_PER_RRD = 12
54
55# Wether or not to run a seperate Toga jobinfo server
56#
57TOGA_SERVER = 1
58
59# On what interfaces to listen
60#
61TOGA_SERVER_IP = [ '127.0.0.1' ]
62
63# On what port to listen
64#
65TOGA_SERVER_PORT = 9048
66
67# Toga's SQL dbase name to use
68#
69TOGA_SERVER_SQL_DBASE = "toga"
70
71# Wether or not to run as a daemon in background
72#
73DAEMONIZE = 0
74
75######################
76#                    #
77# Configuration ends #
78#                    #
79######################
80
81###
82# You'll only want to change anything below here unless you
83# know what you are doing (i.e. your name is Ramon Bastiaans)
84###
85
86# What XML data types not to store
87#
88UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
89
90# Maximum time (in seconds) a parsethread may run
91#
92PARSE_TIMEOUT = 60
93
94# Maximum time (in seconds) a storethread may run
95#
96STORE_TIMEOUT = 360
97
98"""
99This is TOrque-GAnglia's data Daemon
100"""
101
102class TogaServer:
103
104        sockets = [ ]
105
106
107
108        def __init__( self ):
109
110                s = None
111                for host in TOGA_SERVER_IP:
112
113                        for res in socket.getaddrinfo( host, TOGA_SERVER_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE ):
114
115                                af, socktype, proto, canonname, sa = res
116
117                                try:
118
119                                        s = socket.socket( af, socktype, proto )
120
121                                except socket.error, msg:
122
123                                        s = None
124                                        continue
125
126                                try:
127                                        s.bind( sa )
128                                        s.listen( 1 )
129
130                                except socket.error, msg:
131
132                                        s.close()
133                                        s = None
134                                        continue
135                                        break
136
137                                if not self.s:
138
139                                        debug_msg( 6, 'Could not open socket' )
140                                        return None
141
142                                else:
143
144                                        self.sockets.append( s )
145
146        def run( self ):
147
148                for s in self.sockets:
149
150                        while( 1 ):
151
152                                conn, addr = s.accept()
153                                pid = os.fork()
154
155                                if pid == 0:
156
157                                        debug_msg( 6, 'New connection to %s' %addr[0] )
158
159                                        leesme = conn.makefile( 'r' )
160                                        for line in leesme.readlines():
161                                                print line
162                                       
163                                        conn.close()
164                                        conn.shutdown( 2 )
165
166                                        sys.exit( 0 )
167
168class RRDMutator:
169        "A class for handling .rrd mutations"
170
171        binary = '/usr/bin/rrdtool'
172
173        def __init__( self, binary=None ):
174
175                if binary:
176                        self.binary = binary
177
178        def create( self, filename, args ):
179                return self.perform( 'create', '"' + filename + '"', args )
180
181        def update( self, filename, args ):
182                return self.perform( 'update', '"' + filename + '"', args )
183
184        def grabLastUpdate( self, filename ):
185
186                last_update = 0
187
188                debug_msg( 8, self.binary + ' info "' + filename + '"' )
189
190                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
191
192                        if line.find( 'last_update') != -1:
193
194                                last_update = line.split( ' = ' )[1]
195
196                if last_update:
197                        return last_update
198                else:
199                        return 0
200
201        def perform( self, action, filename, args ):
202
203                arg_string = None
204
205                if type( args ) is not ListType:
206                        debug_msg( 8, 'Arguments needs to be of type List' )
207                        return 1
208
209                for arg in args:
210
211                        if not arg_string:
212
213                                arg_string = arg
214                        else:
215                                arg_string = arg_string + ' ' + arg
216
217                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
218
219                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
220
221                        if line.find( 'ERROR' ) != -1:
222
223                                error_msg = string.join( line.split( ' ' )[1:] )
224                                debug_msg( 8, error_msg )
225                                return 1
226
227                return 0
228
229class GangliaXMLHandler( ContentHandler ):
230        "Parse Ganglia's XML"
231
232        def __init__( self, config ):
233                self.config = config
234                self.clusters = { }
235                debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
236                self.gatherClusters()
237                debug_msg( 0, printTime() + ' - Check done.' )
238
239        def gatherClusters( self ):
240
241                archive_dir = check_dir(ARCHIVE_PATH)
242
243                hosts = [ ]
244
245                if os.path.exists( archive_dir ):
246
247                        dirlist = os.listdir( archive_dir )
248
249                        for item in dirlist:
250
251                                clustername = item
252
253                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
254
255                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
256
257        def startElement( self, name, attrs ):
258                "Store appropriate data from xml start tags"
259
260                if name == 'GANGLIA_XML':
261
262                        self.XMLSource = attrs.get( 'SOURCE', "" )
263                        self.gangliaVersion = attrs.get( 'VERSION', "" )
264
265                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
266
267                elif name == 'GRID':
268
269                        self.gridName = attrs.get( 'NAME', "" )
270                        self.time = attrs.get( 'LOCALTIME', "" )
271
272                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
273
274                elif name == 'CLUSTER':
275
276                        self.clusterName = attrs.get( 'NAME', "" )
277                        self.time = attrs.get( 'LOCALTIME', "" )
278
279                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
280
281                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
282
283                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
284
285                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
286
287                        self.hostName = attrs.get( 'NAME', "" )
288                        self.hostIp = attrs.get( 'IP', "" )
289                        self.hostReported = attrs.get( 'REPORTED', "" )
290
291                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
292
293                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
294
295                        type = attrs.get( 'TYPE', "" )
296
297                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
298
299                                myMetric = { }
300                                myMetric['name'] = attrs.get( 'NAME', "" )
301                                myMetric['val'] = attrs.get( 'VAL', "" )
302                                myMetric['time'] = self.hostReported
303
304                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
305
306                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
307
308        def storeMetrics( self ):
309
310                for clustername, rrdh in self.clusters.items():
311
312                        ret = rrdh.storeMetrics()
313
314                        if ret:
315                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
316                                return 1
317
318                return 0
319
320class GangliaXMLGatherer:
321        "Setup a connection and file object to Ganglia's XML"
322
323        s = None
324
325        def __init__( self, host, port ):
326                "Store host and port for connection"
327
328                self.host = host
329                self.port = port
330                self.connect()
331
332        def connect( self ):
333                "Setup connection to XML source"
334
335                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
336
337                        af, socktype, proto, canonname, sa = res
338
339                        try:
340
341                                self.s = socket.socket( af, socktype, proto )
342
343                        except socket.error, msg:
344
345                                self.s = None
346                                continue
347
348                        try:
349
350                                self.s.connect( sa )
351
352                        except socket.error, msg:
353
354                                self.s.close()
355                                self.s = None
356                                continue
357
358                        break
359
360                if self.s is None:
361
362                        debug_msg( 0, 'Could not open socket' )
363                        sys.exit( 1 )
364
365        def disconnect( self ):
366                "Close socket"
367
368                if self.s:
369                        self.s.close()
370                        self.s = None
371
372        def __del__( self ):
373                "Kill the socket before we leave"
374
375                self.disconnect()
376
377        def getFileObject( self ):
378                "Connect, and return a file object"
379
380                if self.s:
381                        # Apearantly, only data is received when a connection is made
382                        # therefor, disconnect and connect
383                        #
384                        self.disconnect()
385                        self.connect()
386
387                return self.s.makefile( 'r' )
388
389class GangliaXMLProcessor:
390
391        def __init__( self ):
392                "Setup initial XML connection and handlers"
393
394                self.config = GangliaConfigParser( GMETAD_CONF )
395
396                self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_SOURCE.split( ':' )[0], ARCHIVE_SOURCE.split( ':' )[1] ) 
397                self.myParser = make_parser()   
398                self.myHandler = GangliaXMLHandler( self.config )
399                self.myParser.setContentHandler( self.myHandler )
400
401        def daemon( self ):
402                "Run as daemon forever"
403
404                self.DAEMON = 1
405
406                # Fork the first child
407                #
408                pid = os.fork()
409
410                if pid > 0:
411
412                        sys.exit(0)  # end parent
413
414                # creates a session and sets the process group ID
415                #
416                os.setsid()
417
418                # Fork the second child
419                #
420                pid = os.fork()
421
422                if pid > 0:
423
424                        sys.exit(0)  # end parent
425
426                # Go to the root directory and set the umask
427                #
428                os.chdir('/')
429                os.umask(0)
430
431                sys.stdin.close()
432                sys.stdout.close()
433                #sys.stderr.close()
434
435                os.open('/dev/null', 0)
436                os.dup(0)
437                os.dup(0)
438
439                self.run()
440
441        def printTime( self ):
442                "Print current time in human readable format"
443
444                return time.strftime("%a %d %b %Y %H:%M:%S")
445
446        def run( self ):
447                "Main thread"
448
449                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
450                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
451
452                while( 1 ):
453
454                        if not xmlthread.isAlive():
455                                # Gather XML at the same interval as gmetad
456
457                                # threaded call to: self.processXML()
458                                #
459                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
460                                xmlthread.start()
461
462                        if not storethread.isAlive():
463                                # Store metrics every .. sec
464
465                                # threaded call to: self.storeMetrics()
466                                #
467                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
468                                storethread.start()
469               
470                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
471                        time.sleep( 1 ) 
472
473        def storeMetrics( self ):
474                "Store metrics retained in memory to disk"
475
476                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
477
478                # Store metrics somewhere between every 60 and 180 seconds
479                #
480                STORE_INTERVAL = random.randint( 360, 640 )
481
482                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
483                storethread.start()
484
485                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
486                time.sleep( STORE_INTERVAL )
487                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
488
489                if storethread.isAlive():
490
491                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
492                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
493                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
494
495                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
496
497                return 0
498
499        def storeThread( self ):
500
501                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
502                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
503                ret = self.myHandler.storeMetrics()
504                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
505                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
506               
507                return ret
508
509        def processXML( self ):
510                "Process XML"
511
512                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
513
514                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
515                parsethread.start()
516
517                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
518                time.sleep( float( self.config.getLowestInterval() ) ) 
519                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
520
521                if parsethread.isAlive():
522
523                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
524                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
525                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
526
527                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
528
529                return 0
530
531        def parseThread( self ):
532
533                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
534                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
535                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
536                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
537                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
538
539                return ret
540
541class GangliaConfigParser:
542
543        sources = [ ]
544
545        def __init__( self, config ):
546
547                self.config = config
548                self.parseValues()
549
550        def parseValues( self ):
551                "Parse certain values from gmetad.conf"
552
553                readcfg = open( self.config, 'r' )
554
555                for line in readcfg.readlines():
556
557                        if line.count( '"' ) > 1:
558
559                                if line.find( 'data_source' ) != -1 and line[0] != '#':
560
561                                        source = { }
562                                        source['name'] = line.split( '"' )[1]
563                                        source_words = line.split( '"' )[2].split( ' ' )
564
565                                        for word in source_words:
566
567                                                valid_interval = 1
568
569                                                for letter in word:
570
571                                                        if letter not in string.digits:
572
573                                                                valid_interval = 0
574
575                                                if valid_interval and len(word) > 0:
576
577                                                        source['interval'] = word
578                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
579       
580                                        # No interval found, use Ganglia's default     
581                                        if not source.has_key( 'interval' ):
582                                                source['interval'] = 15
583                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
584
585                                        self.sources.append( source )
586
587        def getInterval( self, source_name ):
588
589                for source in self.sources:
590
591                        if source['name'] == source_name:
592
593                                return source['interval']
594
595                return None
596
597        def getLowestInterval( self ):
598
599                lowest_interval = 0
600
601                for source in self.sources:
602
603                        if not lowest_interval or source['interval'] <= lowest_interval:
604
605                                lowest_interval = source['interval']
606
607                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
608                if lowest_interval:
609                        return lowest_interval
610                else:
611                        return 15
612
613class RRDHandler:
614
615        myMetrics = { }
616        lastStored = { }
617        timeserials = { }
618        slot = None
619
620        def __init__( self, config, cluster ):
621                self.block = 0
622                self.cluster = cluster
623                self.config = config
624                self.slot = threading.Lock()
625                self.rrdm = RRDMutator()
626                self.gatherLastUpdates()
627
628        def gatherLastUpdates( self ):
629                "Populate the lastStored list, containing timestamps of all last updates"
630
631                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
632
633                hosts = [ ]
634
635                if os.path.exists( cluster_dir ):
636
637                        dirlist = os.listdir( cluster_dir )
638
639                        for dir in dirlist:
640
641                                hosts.append( dir )
642
643                for host in hosts:
644
645                        host_dir = cluster_dir + '/' + host
646                        dirlist = os.listdir( host_dir )
647
648                        for dir in dirlist:
649
650                                if not self.timeserials.has_key( host ):
651
652                                        self.timeserials[ host ] = [ ]
653
654                                self.timeserials[ host ].append( dir )
655
656                        last_serial = self.getLastRrdTimeSerial( host )
657                        if last_serial:
658
659                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
660                                if os.path.exists( metric_dir ):
661
662                                        dirlist = os.listdir( metric_dir )
663
664                                        for file in dirlist:
665
666                                                metricname = file.split( '.rrd' )[0]
667
668                                                if not self.lastStored.has_key( host ):
669
670                                                        self.lastStored[ host ] = { }
671
672                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
673
674        def getClusterName( self ):
675                return self.cluster
676
677        def memMetric( self, host, metric ):
678
679                if self.myMetrics.has_key( host ):
680
681                        if self.myMetrics[ host ].has_key( metric['name'] ):
682
683                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
684
685                                        if mymetric['time'] == metric['time']:
686
687                                                # Allready have this metric, abort
688                                                return 1
689                        else:
690                                self.myMetrics[ host ][ metric['name'] ] = [ ]
691                else:
692                        self.myMetrics[ host ] = { }
693                        self.myMetrics[ host ][ metric['name'] ] = [ ]
694
695                # <ATOMIC>
696                #
697                self.slot.acquire()
698
699                self.myMetrics[ host ][ metric['name'] ].append( metric )
700
701                self.slot.release()
702                #
703                # </ATOMIC>
704
705        def makeUpdateList( self, host, metriclist ):
706
707                update_list = [ ]
708                metric = None
709
710                while len( metriclist ) > 0:
711
712                        metric = metriclist.pop( 0 )
713
714                        if self.checkStoreMetric( host, metric ):
715                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
716
717                return update_list
718
719        def checkStoreMetric( self, host, metric ):
720
721                if self.lastStored.has_key( host ):
722
723                        if self.lastStored[ host ].has_key( metric['name'] ):
724
725                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
726
727                                        # This is old
728                                        return 0
729
730                return 1
731
732        def memLastUpdate( self, host, metricname, metriclist ):
733
734                if not self.lastStored.has_key( host ):
735                        self.lastStored[ host ] = { }
736
737                last_update_time = 0
738
739                for metric in metriclist:
740
741                        if metric['name'] == metricname:
742
743                                if metric['time'] > last_update_time:
744
745                                        last_update_time = metric['time']
746
747                if self.lastStored[ host ].has_key( metricname ):
748                       
749                        if last_update_time <= self.lastStored[ host ][ metricname ]:
750                                return 1
751
752                self.lastStored[ host ][ metricname ] = last_update_time
753
754        def storeMetrics( self ):
755
756                for hostname, mymetrics in self.myMetrics.items():     
757
758                        for metricname, mymetric in mymetrics.items():
759
760                                metrics_to_store = [ ]
761
762                                # <ATOMIC>
763                                #
764                                self.slot.acquire() 
765
766                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
767
768                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
769                                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
770
771                                self.slot.release()
772                                #
773                                # </ATOMIC>
774
775                                # Create a mapping table, each metric to the period where it should be stored
776                                #
777                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
778
779                                update_rets = [ ]
780
781                                for period, pmetric in metric_serial_table.items():
782
783                                        self.createCheck( hostname, metricname, period )       
784
785                                        update_ret = self.update( hostname, metricname, period, pmetric )
786
787                                        if update_ret == 0:
788
789                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
790                                        else:
791                                                debug_msg( 9, 'metric update failed' )
792
793                                        update_rets.append( update_ret )
794
795                                if not (1) in update_rets:
796
797                                        self.memLastUpdate( hostname, metricname, metrics_to_store )
798
799        def makeTimeSerial( self ):
800                "Generate a time serial. Seconds since epoch"
801
802                # Seconds since epoch
803                mytime = int( time.time() )
804
805                return mytime
806
807        def makeRrdPath( self, host, metricname, timeserial ):
808                """
809                Make a RRD location/path and filename
810                If a metric or timeserial are supplied the complete locations
811                will be made, else just the host directory
812                """
813
814                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
815                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
816
817                return rrd_dir, rrd_file
818
819        def getLastRrdTimeSerial( self, host ):
820                """
821                Find the last timeserial (directory) for this host
822                This is determined once every host
823                """
824
825                newest_timeserial = 0
826
827                for dir in self.timeserials[ host ]:
828
829                        valid_dir = 1
830
831                        for letter in dir:
832                                if letter not in string.digits:
833                                        valid_dir = 0
834
835                        if valid_dir:
836                                timeserial = dir
837                                if timeserial > newest_timeserial:
838                                        newest_timeserial = timeserial
839
840                if newest_timeserial:
841                        return newest_timeserial
842                else:
843                        return 0
844
845        def determinePeriod( self, host, check_serial ):
846
847                period_serial = 0
848
849                if self.timeserials.has_key( host ):
850
851                        for serial in self.timeserials[ host ]:
852
853                                if check_serial >= serial and period_serial < serial:
854
855                                        period_serial = serial
856
857                return period_serial
858
859        def determineSerials( self, host, metricname, metriclist ):
860                """
861                Determine the correct serial and corresponding rrd to store
862                for a list of metrics
863                """
864
865                metric_serial_table = { }
866
867                for metric in metriclist:
868
869                        if metric['name'] == metricname:
870
871                                period = self.determinePeriod( host, metric['time'] )   
872
873                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
874
875                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
876
877                                        # This one should get it's own new period
878                                        period = metric['time']
879
880                                        if not self.timeserials.has_key( host ):
881                                                self.timeserials[ host ] = [ ]
882
883                                        self.timeserials[ host ].append( period )
884
885                                if not metric_serial_table.has_key( period ):
886
887                                        metric_serial_table[ period ] = [ ]
888
889                                metric_serial_table[ period ].append( metric )
890
891                return metric_serial_table
892
893        def createCheck( self, host, metricname, timeserial ):
894                "Check if an .rrd allready exists for this metric, create if not"
895
896                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
897               
898                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
899
900                if not os.path.exists( rrd_dir ):
901
902                        try:
903                                os.makedirs( rrd_dir )
904
905                        except OSError, msg:
906
907                                if msg.find( 'File exists' ) != -1:
908
909                                        # Ignore exists errors
910                                        pass
911
912                                else:
913
914                                        print msg
915                                        return
916
917                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
918
919                if not os.path.exists( rrd_file ):
920
921                        interval = self.config.getInterval( self.cluster )
922                        heartbeat = 8 * int( interval )
923
924                        params = [ ]
925
926                        params.append( '--step' )
927                        params.append( str( interval ) )
928
929                        params.append( '--start' )
930                        params.append( str( int( timeserial ) - 1 ) )
931
932                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
933                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
934
935                        self.rrdm.create( str(rrd_file), params )
936
937                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
938
939        def update( self, host, metricname, timeserial, metriclist ):
940
941                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
942
943                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
944
945                update_list = self.makeUpdateList( host, metriclist )
946
947                if len( update_list ) > 0:
948                        ret = self.rrdm.update( str(rrd_file), update_list )
949
950                        if ret:
951                                return 1
952               
953                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
954
955                return 0
956
957def main():
958        "Program startup"
959
960        if TOGA_SERVER:
961       
962                myServer = TogaServer()
963
964                if DAEMONIZE:
965                        myServer.daemon()
966                else:
967                        myServer.run()
968
969        if ARCHIVE:
970
971                myProcessor = GangliaXMLProcessor()
972
973                if DAEMONIZE:
974                        myProcessor.daemon()
975                else:
976                        myProcessor.run()
977
978def check_dir( directory ):
979        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
980
981        if directory[-1] == '/':
982                directory = directory[:-1]
983
984        return directory
985
986def debug_msg( level, msg ):
987
988        if (DEBUG_LEVEL >= level):
989                sys.stderr.write( msg + '\n' )
990
991def printTime( ):
992        "Print current time in human readable format"
993
994        return time.strftime("%a %d %b %Y %H:%M:%S")
995
996# Let's go
997if __name__ == '__main__':
998        main()
Note: See TracBrowser for help on using the repository browser.