source: trunk/daemon/togad.py @ 62

Last change on this file since 62 was 62, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Class added for own data server for catching jobinfo from plugin.
File size: 21.8 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import string
[12]8import os
9import os.path
[17]10import time
11import re
[36]12import threading
13import mutex
[38]14import random
[40]15from types import *
[60]16import DBClass
[3]17
[8]18# Specify debugging level here;
19#
[38]20# 11 = XML: metrics
21# 10 = XML: host, cluster, grid, ganglia
22# 9  = RRD activity, gmetad config parsing
[40]23# 8  = RRD file activity
24# 7  = daemon threading
[8]25#
[55]26DEBUG_LEVEL = 7
[6]27
[9]28# Where is the gmetad.conf located
29#
30GMETAD_CONF = '/etc/gmetad.conf'
31
[60]32# Wether or not to maintain a archive of all ganglia node data
33# Note: This will require a significant amount of hd space
34#       depending on your cluster size
[9]35#
[60]36ARCHIVE = 0
[9]37
[60]38# Where to grab XML data from
39# Normally: local gmetad (port 8651)
40#
41ARCHIVE_SOURCE = "localhost:8651"
42
[9]43# List of data_source names to archive for
44#
[60]45ARCHIVE_DATASOURCES = [ "LISA Cluster" ]
[9]46
[60]47# Where to store the archived rrd's
48#
49ARCHIVE_PATH = '/data/toga/rrds'
50
[13]51# Amount of hours to store in one single archived .rrd
[9]52#
[43]53ARCHIVE_HOURS_PER_RRD = 12
[13]54
[60]55# Wether or not to run a seperate Toga jobinfo server
56#
57TOGA_SERVER = 1
58
59# On what interfaces to listen
60#
[62]61TOGA_SERVER_IP = [ '127.0.0.1' ]
[60]62
63# On what port to listen
64#
65TOGA_SERVER_PORT = 9048
66
67# Toga's SQL dbase name to use
68#
69TOGA_SERVER_SQL_DBASE = "toga"
70
[22]71# Wether or not to run as a daemon in background
72#
73DAEMONIZE = 0
74
[17]75######################
76#                    #
77# Configuration ends #
78#                    #
79######################
[13]80
[60]81###
82# You'll only want to change anything below here unless you
83# know what you are doing (i.e. your name is Ramon Bastiaans)
84###
85
[17]86# What XML data types not to store
[13]87#
[17]88UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]89
[47]90# Maximum time (in seconds) a parsethread may run
91#
92PARSE_TIMEOUT = 60
93
94# Maximum time (in seconds) a storethread may run
95#
96STORE_TIMEOUT = 360
97
[8]98"""
99This is TOrque-GAnglia's data Daemon
100"""
101
[62]102class TogaServer:
[60]103
[62]104        sockets = [ ]
[60]105
[62]106
107
108        def __init__( self ):
109
110                s = None
111                for host in TOGA_SERVER_IP:
112
113                        for res in socket.getaddrinfo( host, TOGA_SERVER_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE ):
114
115                                af, socktype, proto, canonname, sa = res
116
117                                try:
118
119                                        s = socket.socket( af, socktype, proto )
120
121                                except socket.error, msg:
122
123                                        s = None
124                                        continue
125
126                                try:
127                                        s.bind( sa )
128                                        s.listen( 1 )
129
130                                except socket.error, msg:
131
132                                        s.close()
133                                        s = None
134                                        continue
135                                        break
136
137                                if not self.s:
138
139                                        debug_msg( 6, 'Could not open socket' )
140                                        return None
141
142                                else:
143
144                                        self.sockets.append( s )
145
146        def run( self ):
147
148                for s in self.sockets:
149
150                        while( 1 ):
151
152                                conn, addr = s.accept()
153                                pid = os.fork()
154
155                                if pid == 0:
156
157                                        debug_msg( 6, 'New connection to %s' %addr[0] )
158
159                                        leesme = conn.makefile( 'r' )
160                                        for line in leesme.readlines():
161                                                print line
162                                       
163                                        conn.close()
164                                        conn.shutdown( 2 )
165
166                                        sys.exit( 0 )
167
[37]168class RRDMutator:
[38]169        "A class for handling .rrd mutations"
[37]170
171        binary = '/usr/bin/rrdtool'
172
173        def __init__( self, binary=None ):
174
175                if binary:
176                        self.binary = binary
177
178        def create( self, filename, args ):
[40]179                return self.perform( 'create', '"' + filename + '"', args )
[37]180
181        def update( self, filename, args ):
[40]182                return self.perform( 'update', '"' + filename + '"', args )
[37]183
[42]184        def grabLastUpdate( self, filename ):
185
186                last_update = 0
187
[53]188                debug_msg( 8, self.binary + ' info "' + filename + '"' )
189
[42]190                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
191
192                        if line.find( 'last_update') != -1:
193
194                                last_update = line.split( ' = ' )[1]
195
196                if last_update:
197                        return last_update
198                else:
199                        return 0
200
[40]201        def perform( self, action, filename, args ):
[37]202
203                arg_string = None
204
[40]205                if type( args ) is not ListType:
206                        debug_msg( 8, 'Arguments needs to be of type List' )
207                        return 1
208
[37]209                for arg in args:
210
211                        if not arg_string:
212
213                                arg_string = arg
214                        else:
215                                arg_string = arg_string + ' ' + arg
216
[54]217                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]218
[40]219                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
[37]220
221                        if line.find( 'ERROR' ) != -1:
222
223                                error_msg = string.join( line.split( ' ' )[1:] )
[40]224                                debug_msg( 8, error_msg )
[37]225                                return 1
226
227                return 0
228
[6]229class GangliaXMLHandler( ContentHandler ):
[8]230        "Parse Ganglia's XML"
[3]231
[33]232        def __init__( self, config ):
233                self.config = config
[35]234                self.clusters = { }
[54]235                debug_msg( 0, printTime() + ' - Checking existing toga rrd archive..' )
[44]236                self.gatherClusters()
[54]237                debug_msg( 0, printTime() + ' - Check done.' )
[33]238
[44]239        def gatherClusters( self ):
240
[45]241                archive_dir = check_dir(ARCHIVE_PATH)
[44]242
243                hosts = [ ]
244
245                if os.path.exists( archive_dir ):
246
247                        dirlist = os.listdir( archive_dir )
248
249                        for item in dirlist:
250
251                                clustername = item
252
[60]253                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]254
255                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
256
[6]257        def startElement( self, name, attrs ):
[8]258                "Store appropriate data from xml start tags"
[3]259
[7]260                if name == 'GANGLIA_XML':
[32]261
262                        self.XMLSource = attrs.get( 'SOURCE', "" )
263                        self.gangliaVersion = attrs.get( 'VERSION', "" )
264
[12]265                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]266
[7]267                elif name == 'GRID':
[32]268
269                        self.gridName = attrs.get( 'NAME', "" )
270                        self.time = attrs.get( 'LOCALTIME', "" )
271
[12]272                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]273
[7]274                elif name == 'CLUSTER':
[32]275
276                        self.clusterName = attrs.get( 'NAME', "" )
277                        self.time = attrs.get( 'LOCALTIME', "" )
278
[60]279                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]280
[34]281                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]282
[35]283                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]284
[60]285                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]286
287                        self.hostName = attrs.get( 'NAME', "" )
288                        self.hostIp = attrs.get( 'IP', "" )
289                        self.hostReported = attrs.get( 'REPORTED', "" )
290
[12]291                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]292
[60]293                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]294
[32]295                        type = attrs.get( 'TYPE', "" )
[6]296
[32]297                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
[3]298
[32]299                                myMetric = { }
300                                myMetric['name'] = attrs.get( 'NAME', "" )
301                                myMetric['val'] = attrs.get( 'VAL', "" )
302                                myMetric['time'] = self.hostReported
[3]303
[34]304                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]305
[34]306                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]307
[34]308        def storeMetrics( self ):
[9]309
[34]310                for clustername, rrdh in self.clusters.items():
[16]311
[38]312                        ret = rrdh.storeMetrics()
[9]313
[38]314                        if ret:
315                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
316                                return 1
317
318                return 0
319
[5]320class GangliaXMLGatherer:
[8]321        "Setup a connection and file object to Ganglia's XML"
[3]322
[8]323        s = None
324
325        def __init__( self, host, port ):
326                "Store host and port for connection"
327
[5]328                self.host = host
329                self.port = port
[33]330                self.connect()
[3]331
[33]332        def connect( self ):
333                "Setup connection to XML source"
[8]334
335                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]336
[5]337                        af, socktype, proto, canonname, sa = res
[32]338
[5]339                        try:
[32]340
[8]341                                self.s = socket.socket( af, socktype, proto )
[32]342
[5]343                        except socket.error, msg:
[32]344
[8]345                                self.s = None
[5]346                                continue
[32]347
[5]348                        try:
[32]349
[8]350                                self.s.connect( sa )
[32]351
[5]352                        except socket.error, msg:
[32]353
[8]354                                self.s.close()
355                                self.s = None
[5]356                                continue
[32]357
[5]358                        break
[3]359
[8]360                if self.s is None:
[32]361
[33]362                        debug_msg( 0, 'Could not open socket' )
363                        sys.exit( 1 )
[5]364
[33]365        def disconnect( self ):
366                "Close socket"
367
368                if self.s:
369                        self.s.close()
370                        self.s = None
371
372        def __del__( self ):
373                "Kill the socket before we leave"
374
375                self.disconnect()
376
377        def getFileObject( self ):
378                "Connect, and return a file object"
379
[38]380                if self.s:
381                        # Apearantly, only data is received when a connection is made
382                        # therefor, disconnect and connect
383                        #
384                        self.disconnect()
385                        self.connect()
[33]386
[8]387                return self.s.makefile( 'r' )
[5]388
[8]389class GangliaXMLProcessor:
[5]390
[33]391        def __init__( self ):
392                "Setup initial XML connection and handlers"
393
394                self.config = GangliaConfigParser( GMETAD_CONF )
395
[60]396                self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_SOURCE.split( ':' )[0], ARCHIVE_SOURCE.split( ':' )[1] ) 
[33]397                self.myParser = make_parser()   
398                self.myHandler = GangliaXMLHandler( self.config )
399                self.myParser.setContentHandler( self.myHandler )
400
[9]401        def daemon( self ):
[8]402                "Run as daemon forever"
[5]403
[8]404                self.DAEMON = 1
[5]405
[8]406                # Fork the first child
407                #
408                pid = os.fork()
[32]409
[8]410                if pid > 0:
[7]411
[32]412                        sys.exit(0)  # end parent
413
[8]414                # creates a session and sets the process group ID
415                #
416                os.setsid()
[7]417
[8]418                # Fork the second child
419                #
420                pid = os.fork()
[32]421
[8]422                if pid > 0:
[5]423
[32]424                        sys.exit(0)  # end parent
425
[8]426                # Go to the root directory and set the umask
427                #
428                os.chdir('/')
429                os.umask(0)
430
431                sys.stdin.close()
432                sys.stdout.close()
[36]433                #sys.stderr.close()
[8]434
435                os.open('/dev/null', 0)
436                os.dup(0)
437                os.dup(0)
438
439                self.run()
440
[22]441        def printTime( self ):
[33]442                "Print current time in human readable format"
[22]443
[33]444                return time.strftime("%a %d %b %Y %H:%M:%S")
[22]445
[9]446        def run( self ):
[8]447                "Main thread"
448
[36]449                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
[55]450                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]451
[36]452                while( 1 ):
453
454                        if not xmlthread.isAlive():
455                                # Gather XML at the same interval as gmetad
456
457                                # threaded call to: self.processXML()
458                                #
459                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
460                                xmlthread.start()
461
[55]462                        if not storethread.isAlive():
463                                # Store metrics every .. sec
[36]464
[55]465                                # threaded call to: self.storeMetrics()
466                                #
467                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
468                                storethread.start()
[36]469               
470                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
471                        time.sleep( 1 ) 
472
[33]473        def storeMetrics( self ):
474                "Store metrics retained in memory to disk"
[22]475
[40]476                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
[39]477
[38]478                # Store metrics somewhere between every 60 and 180 seconds
479                #
[55]480                STORE_INTERVAL = random.randint( 360, 640 )
[22]481
[39]482                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
[36]483                storethread.start()
484
[40]485                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]486                time.sleep( STORE_INTERVAL )
[40]487                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
[36]488
489                if storethread.isAlive():
490
[40]491                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
[47]492                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[40]493                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
[36]494
[40]495                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
[36]496
497                return 0
498
[39]499        def storeThread( self ):
500
[40]501                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
502                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
[39]503                ret = self.myHandler.storeMetrics()
[40]504                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
505                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
[39]506               
507                return ret
508
[8]509        def processXML( self ):
510                "Process XML"
511
[40]512                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
[8]513
[39]514                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
[36]515                parsethread.start()
516
[40]517                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]518                time.sleep( float( self.config.getLowestInterval() ) ) 
[40]519                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
[36]520
521                if parsethread.isAlive():
522
[40]523                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
[47]524                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[40]525                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
[36]526
[40]527                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
[36]528
529                return 0
530
[39]531        def parseThread( self ):
532
[40]533                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
534                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
[39]535                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
[40]536                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
537                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
[39]538
539                return ret
540
[9]541class GangliaConfigParser:
542
[34]543        sources = [ ]
[9]544
545        def __init__( self, config ):
[32]546
[9]547                self.config = config
548                self.parseValues()
549
[32]550        def parseValues( self ):
[9]551                "Parse certain values from gmetad.conf"
552
553                readcfg = open( self.config, 'r' )
554
555                for line in readcfg.readlines():
556
557                        if line.count( '"' ) > 1:
558
[10]559                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]560
[11]561                                        source = { }
562                                        source['name'] = line.split( '"' )[1]
[9]563                                        source_words = line.split( '"' )[2].split( ' ' )
564
565                                        for word in source_words:
566
567                                                valid_interval = 1
568
569                                                for letter in word:
[32]570
[9]571                                                        if letter not in string.digits:
[32]572
[9]573                                                                valid_interval = 0
574
[10]575                                                if valid_interval and len(word) > 0:
[32]576
[9]577                                                        source['interval'] = word
[12]578                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]579       
580                                        # No interval found, use Ganglia's default     
581                                        if not source.has_key( 'interval' ):
582                                                source['interval'] = 15
583                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]584
[33]585                                        self.sources.append( source )
[9]586
587        def getInterval( self, source_name ):
[32]588
[9]589                for source in self.sources:
[32]590
[12]591                        if source['name'] == source_name:
[32]592
[9]593                                return source['interval']
[32]594
[9]595                return None
596
[34]597        def getLowestInterval( self ):
598
599                lowest_interval = 0
600
601                for source in self.sources:
602
603                        if not lowest_interval or source['interval'] <= lowest_interval:
604
605                                lowest_interval = source['interval']
606
607                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
608                if lowest_interval:
609                        return lowest_interval
610                else:
611                        return 15
612
[9]613class RRDHandler:
614
[32]615        myMetrics = { }
[40]616        lastStored = { }
[47]617        timeserials = { }
[36]618        slot = None
[32]619
[33]620        def __init__( self, config, cluster ):
[44]621                self.block = 0
[9]622                self.cluster = cluster
[33]623                self.config = config
[40]624                self.slot = threading.Lock()
[37]625                self.rrdm = RRDMutator()
[42]626                self.gatherLastUpdates()
[9]627
[42]628        def gatherLastUpdates( self ):
629                "Populate the lastStored list, containing timestamps of all last updates"
630
631                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
632
633                hosts = [ ]
634
635                if os.path.exists( cluster_dir ):
636
[44]637                        dirlist = os.listdir( cluster_dir )
[42]638
[44]639                        for dir in dirlist:
[42]640
[44]641                                hosts.append( dir )
[42]642
643                for host in hosts:
644
[47]645                        host_dir = cluster_dir + '/' + host
646                        dirlist = os.listdir( host_dir )
647
648                        for dir in dirlist:
649
650                                if not self.timeserials.has_key( host ):
651
652                                        self.timeserials[ host ] = [ ]
653
654                                self.timeserials[ host ].append( dir )
655
[42]656                        last_serial = self.getLastRrdTimeSerial( host )
657                        if last_serial:
658
659                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
660                                if os.path.exists( metric_dir ):
661
[44]662                                        dirlist = os.listdir( metric_dir )
[42]663
[44]664                                        for file in dirlist:
[42]665
[44]666                                                metricname = file.split( '.rrd' )[0]
[42]667
[44]668                                                if not self.lastStored.has_key( host ):
[42]669
[44]670                                                        self.lastStored[ host ] = { }
[42]671
[44]672                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]673
[32]674        def getClusterName( self ):
675                return self.cluster
676
677        def memMetric( self, host, metric ):
678
[34]679                if self.myMetrics.has_key( host ):
[32]680
[34]681                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]682
[34]683                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]684
[34]685                                        if mymetric['time'] == metric['time']:
[32]686
[34]687                                                # Allready have this metric, abort
688                                                return 1
689                        else:
690                                self.myMetrics[ host ][ metric['name'] ] = [ ]
691                else:
[32]692                        self.myMetrics[ host ] = { }
[34]693                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]694
[53]695                # <ATOMIC>
[40]696                #
697                self.slot.acquire()
698
[32]699                self.myMetrics[ host ][ metric['name'] ].append( metric )
700
[40]701                self.slot.release()
[53]702                #
703                # </ATOMIC>
[40]704
[47]705        def makeUpdateList( self, host, metriclist ):
[37]706
707                update_list = [ ]
[41]708                metric = None
[37]709
[47]710                while len( metriclist ) > 0:
[37]711
[53]712                        metric = metriclist.pop( 0 )
[37]713
[53]714                        if self.checkStoreMetric( host, metric ):
715                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[40]716
[37]717                return update_list
718
[49]719        def checkStoreMetric( self, host, metric ):
[40]720
721                if self.lastStored.has_key( host ):
722
[47]723                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]724
[47]725                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]726
[50]727                                        # This is old
[40]728                                        return 0
729
[50]730                return 1
731
[54]732        def memLastUpdate( self, host, metricname, metriclist ):
[50]733
[54]734                if not self.lastStored.has_key( host ):
735                        self.lastStored[ host ] = { }
736
[50]737                last_update_time = 0
738
739                for metric in metriclist:
740
[54]741                        if metric['name'] == metricname:
[50]742
[54]743                                if metric['time'] > last_update_time:
[50]744
[54]745                                        last_update_time = metric['time']
[40]746
[54]747                if self.lastStored[ host ].has_key( metricname ):
[52]748                       
[54]749                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]750                                return 1
[40]751
[54]752                self.lastStored[ host ][ metricname ] = last_update_time
[52]753
[33]754        def storeMetrics( self ):
755
756                for hostname, mymetrics in self.myMetrics.items():     
757
758                        for metricname, mymetric in mymetrics.items():
759
[53]760                                metrics_to_store = [ ]
761
762                                # <ATOMIC>
[50]763                                #
[47]764                                self.slot.acquire() 
[33]765
[54]766                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]767
[54]768                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
769                                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
[53]770
771                                self.slot.release()
772                                #
773                                # </ATOMIC>
774
[47]775                                # Create a mapping table, each metric to the period where it should be stored
776                                #
[53]777                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]778
[50]779                                update_rets = [ ]
780
[47]781                                for period, pmetric in metric_serial_table.items():
782
783                                        self.createCheck( hostname, metricname, period )       
784
785                                        update_ret = self.update( hostname, metricname, period, pmetric )
786
787                                        if update_ret == 0:
788
789                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
790                                        else:
791                                                debug_msg( 9, 'metric update failed' )
792
[50]793                                        update_rets.append( update_ret )
[47]794
[50]795                                if not (1) in update_rets:
796
[54]797                                        self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]798
[17]799        def makeTimeSerial( self ):
[32]800                "Generate a time serial. Seconds since epoch"
[17]801
802                # Seconds since epoch
803                mytime = int( time.time() )
804
805                return mytime
806
[50]807        def makeRrdPath( self, host, metricname, timeserial ):
[32]808                """
809                Make a RRD location/path and filename
810                If a metric or timeserial are supplied the complete locations
811                will be made, else just the host directory
812                """
[17]813
[50]814                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
815                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[17]816
817                return rrd_dir, rrd_file
818
[20]819        def getLastRrdTimeSerial( self, host ):
[32]820                """
821                Find the last timeserial (directory) for this host
822                This is determined once every host
823                """
[17]824
[19]825                newest_timeserial = 0
826
[47]827                for dir in self.timeserials[ host ]:
[32]828
[47]829                        valid_dir = 1
[17]830
[47]831                        for letter in dir:
832                                if letter not in string.digits:
833                                        valid_dir = 0
[17]834
[47]835                        if valid_dir:
836                                timeserial = dir
837                                if timeserial > newest_timeserial:
838                                        newest_timeserial = timeserial
[17]839
840                if newest_timeserial:
[18]841                        return newest_timeserial
[17]842                else:
843                        return 0
844
[47]845        def determinePeriod( self, host, check_serial ):
846
847                period_serial = 0
848
[56]849                if self.timeserials.has_key( host ):
[47]850
[56]851                        for serial in self.timeserials[ host ]:
[47]852
[56]853                                if check_serial >= serial and period_serial < serial:
[47]854
[56]855                                        period_serial = serial
856
[47]857                return period_serial
858
859        def determineSerials( self, host, metricname, metriclist ):
860                """
861                Determine the correct serial and corresponding rrd to store
862                for a list of metrics
863                """
864
865                metric_serial_table = { }
866
867                for metric in metriclist:
868
869                        if metric['name'] == metricname:
870
871                                period = self.determinePeriod( host, metric['time'] )   
872
873                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
874
[49]875                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]876
877                                        # This one should get it's own new period
878                                        period = metric['time']
[57]879
880                                        if not self.timeserials.has_key( host ):
881                                                self.timeserials[ host ] = [ ]
882
[50]883                                        self.timeserials[ host ].append( period )
[47]884
885                                if not metric_serial_table.has_key( period ):
886
[49]887                                        metric_serial_table[ period ] = [ ]
[47]888
889                                metric_serial_table[ period ].append( metric )
890
891                return metric_serial_table
892
[33]893        def createCheck( self, host, metricname, timeserial ):
[9]894                "Check if an .rrd allready exists for this metric, create if not"
895
[35]896                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]897               
[33]898                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]899
[9]900                if not os.path.exists( rrd_dir ):
[58]901
902                        try:
903                                os.makedirs( rrd_dir )
904
905                        except OSError, msg:
906
907                                if msg.find( 'File exists' ) != -1:
908
909                                        # Ignore exists errors
910                                        pass
911
912                                else:
913
914                                        print msg
915                                        return
916
[14]917                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]918
[14]919                if not os.path.exists( rrd_file ):
[9]920
[33]921                        interval = self.config.getInterval( self.cluster )
[47]922                        heartbeat = 8 * int( interval )
[9]923
[37]924                        params = [ ]
[12]925
[37]926                        params.append( '--step' )
927                        params.append( str( interval ) )
[12]928
[37]929                        params.append( '--start' )
[47]930                        params.append( str( int( timeserial ) - 1 ) )
[12]931
[37]932                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
933                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]934
[37]935                        self.rrdm.create( str(rrd_file), params )
936
[14]937                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
938
[47]939        def update( self, host, metricname, timeserial, metriclist ):
[9]940
[35]941                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]942
[33]943                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]944
[47]945                update_list = self.makeUpdateList( host, metriclist )
[15]946
[41]947                if len( update_list ) > 0:
948                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]949
[41]950                        if ret:
951                                return 1
[27]952               
[41]953                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]954
[36]955                return 0
956
[8]957def main():
958        "Program startup"
959
[60]960        if TOGA_SERVER:
961       
962                myServer = TogaServer()
[8]963
[60]964                if DAEMONIZE:
965                        myServer.daemon()
966                else:
967                        myServer.run()
[22]968
[60]969        if ARCHIVE:
970
971                myProcessor = GangliaXMLProcessor()
972
973                if DAEMONIZE:
974                        myProcessor.daemon()
975                else:
976                        myProcessor.run()
977
[9]978def check_dir( directory ):
979        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
980
981        if directory[-1] == '/':
982                directory = directory[:-1]
983
984        return directory
985
[12]986def debug_msg( level, msg ):
987
988        if (DEBUG_LEVEL >= level):
989                sys.stderr.write( msg + '\n' )
990
[46]991def printTime( ):
992        "Print current time in human readable format"
993
994        return time.strftime("%a %d %b %Y %H:%M:%S")
995
[5]996# Let's go
[9]997if __name__ == '__main__':
998        main()
Note: See TracBrowser for help on using the repository browser.