source: trunk/daemon/togad.py @ 41

Last change on this file since 41 was 41, checked in by bastiaans, 19 years ago

daemon/togad.py:

  • Added queue len check in atomic code before pop, len might change before start
File size: 17.3 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import string
8import os
9import os.path
10import time
11import re
12import threading
13import mutex
14import random
15from types import *
16
17# Specify debugging level here;
18#
19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
22# 8  = RRD file activity
23# 7  = daemon threading
24#
25DEBUG_LEVEL = 9
26
27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
39# Amount of hours to store in one single archived .rrd
40#
41ARCHIVE_HOURS_PER_RRD = 1
42
43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
52
53# What XML data types not to store
54#
55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
56
57"""
58This is TOrque-GAnglia's data Daemon
59"""
60
61class RRDMutator:
62        "A class for handling .rrd mutations"
63
64        binary = '/usr/bin/rrdtool'
65
66        def __init__( self, binary=None ):
67
68                if binary:
69                        self.binary = binary
70
71        def create( self, filename, args ):
72                return self.perform( 'create', '"' + filename + '"', args )
73
74        def update( self, filename, args ):
75                return self.perform( 'update', '"' + filename + '"', args )
76
77        def perform( self, action, filename, args ):
78
79                arg_string = None
80
81                if type( args ) is not ListType:
82                        debug_msg( 8, 'Arguments needs to be of type List' )
83                        return 1
84
85                for arg in args:
86
87                        if not arg_string:
88
89                                arg_string = arg
90                        else:
91                                arg_string = arg_string + ' ' + arg
92
93                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
94
95                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
96
97                        if line.find( 'ERROR' ) != -1:
98
99                                error_msg = string.join( line.split( ' ' )[1:] )
100                                debug_msg( 8, error_msg )
101                                return 1
102
103                return 0
104
105class GangliaXMLHandler( ContentHandler ):
106        "Parse Ganglia's XML"
107
108        def __init__( self, config ):
109                self.config = config
110                self.clusters = { }
111
112        def startElement( self, name, attrs ):
113                "Store appropriate data from xml start tags"
114
115                if name == 'GANGLIA_XML':
116
117                        self.XMLSource = attrs.get( 'SOURCE', "" )
118                        self.gangliaVersion = attrs.get( 'VERSION', "" )
119
120                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
121
122                elif name == 'GRID':
123
124                        self.gridName = attrs.get( 'NAME', "" )
125                        self.time = attrs.get( 'LOCALTIME', "" )
126
127                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
128
129                elif name == 'CLUSTER':
130
131                        self.clusterName = attrs.get( 'NAME', "" )
132                        self.time = attrs.get( 'LOCALTIME', "" )
133
134                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
135
136                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
137
138                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
139
140                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
141
142                        self.hostName = attrs.get( 'NAME', "" )
143                        self.hostIp = attrs.get( 'IP', "" )
144                        self.hostReported = attrs.get( 'REPORTED', "" )
145
146                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
147
148                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
149
150                        type = attrs.get( 'TYPE', "" )
151
152                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
153
154                                myMetric = { }
155                                myMetric['name'] = attrs.get( 'NAME', "" )
156                                myMetric['val'] = attrs.get( 'VAL', "" )
157                                myMetric['time'] = self.hostReported
158
159                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
160
161                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
162
163        def storeMetrics( self ):
164
165                for clustername, rrdh in self.clusters.items():
166
167                        ret = rrdh.storeMetrics()
168
169                        if ret:
170                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
171                                return 1
172
173                return 0
174
175class GangliaXMLGatherer:
176        "Setup a connection and file object to Ganglia's XML"
177
178        s = None
179
180        def __init__( self, host, port ):
181                "Store host and port for connection"
182
183                self.host = host
184                self.port = port
185                self.connect()
186
187        def connect( self ):
188                "Setup connection to XML source"
189
190                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
191
192                        af, socktype, proto, canonname, sa = res
193
194                        try:
195
196                                self.s = socket.socket( af, socktype, proto )
197
198                        except socket.error, msg:
199
200                                self.s = None
201                                continue
202
203                        try:
204
205                                self.s.connect( sa )
206
207                        except socket.error, msg:
208
209                                self.s.close()
210                                self.s = None
211                                continue
212
213                        break
214
215                if self.s is None:
216
217                        debug_msg( 0, 'Could not open socket' )
218                        sys.exit( 1 )
219
220        def disconnect( self ):
221                "Close socket"
222
223                if self.s:
224                        self.s.close()
225                        self.s = None
226
227        def __del__( self ):
228                "Kill the socket before we leave"
229
230                self.disconnect()
231
232        def getFileObject( self ):
233                "Connect, and return a file object"
234
235                if self.s:
236                        # Apearantly, only data is received when a connection is made
237                        # therefor, disconnect and connect
238                        #
239                        self.disconnect()
240                        self.connect()
241
242                return self.s.makefile( 'r' )
243
244class GangliaXMLProcessor:
245
246        def __init__( self ):
247                "Setup initial XML connection and handlers"
248
249                self.config = GangliaConfigParser( GMETAD_CONF )
250
251                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
252                self.myParser = make_parser()   
253                self.myHandler = GangliaXMLHandler( self.config )
254                self.myParser.setContentHandler( self.myHandler )
255
256        def daemon( self ):
257                "Run as daemon forever"
258
259                self.DAEMON = 1
260
261                # Fork the first child
262                #
263                pid = os.fork()
264
265                if pid > 0:
266
267                        sys.exit(0)  # end parent
268
269                # creates a session and sets the process group ID
270                #
271                os.setsid()
272
273                # Fork the second child
274                #
275                pid = os.fork()
276
277                if pid > 0:
278
279                        sys.exit(0)  # end parent
280
281                # Go to the root directory and set the umask
282                #
283                os.chdir('/')
284                os.umask(0)
285
286                sys.stdin.close()
287                sys.stdout.close()
288                #sys.stderr.close()
289
290                os.open('/dev/null', 0)
291                os.dup(0)
292                os.dup(0)
293
294                self.run()
295
296        def printTime( self ):
297                "Print current time in human readable format"
298
299                return time.strftime("%a %d %b %Y %H:%M:%S")
300
301        def run( self ):
302                "Main thread"
303
304                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
305                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
306
307                while( 1 ):
308
309                        if not xmlthread.isAlive():
310                                # Gather XML at the same interval as gmetad
311
312                                # threaded call to: self.processXML()
313                                #
314                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
315                                xmlthread.start()
316
317                        if not storethread.isAlive():
318                                # Store metrics every .. sec
319
320                                # threaded call to: self.storeMetrics()
321                                #
322                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
323                                storethread.start()
324               
325                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
326                        time.sleep( 1 ) 
327
328        def storeMetrics( self ):
329                "Store metrics retained in memory to disk"
330
331                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
332
333                # Store metrics somewhere between every 60 and 180 seconds
334                #
335                #STORE_INTERVAL = random.randint( 60, 180 )
336                STORE_INTERVAL = 40
337
338                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
339                storethread.start()
340
341                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
342                time.sleep( STORE_INTERVAL )
343                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
344
345                if storethread.isAlive():
346
347                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
348                        storethread.join( 180 ) # Maximum time is for storing thread to finish
349                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
350
351                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
352
353                return 0
354
355        def storeThread( self ):
356
357                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
358                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
359                ret = self.myHandler.storeMetrics()
360                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
361                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
362               
363                return ret
364
365        def processXML( self ):
366                "Process XML"
367
368                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
369
370                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
371                parsethread.start()
372
373                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
374                time.sleep( float( self.config.getLowestInterval() ) ) 
375                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
376
377                if parsethread.isAlive():
378
379                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
380                        parsethread.join( 60 ) # Maximum time for XML thread to finish
381                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
382
383                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
384
385                return 0
386
387        def parseThread( self ):
388
389                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
390                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
391                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
392                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
393                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
394
395                return ret
396
397class GangliaConfigParser:
398
399        sources = [ ]
400
401        def __init__( self, config ):
402
403                self.config = config
404                self.parseValues()
405
406        def parseValues( self ):
407                "Parse certain values from gmetad.conf"
408
409                readcfg = open( self.config, 'r' )
410
411                for line in readcfg.readlines():
412
413                        if line.count( '"' ) > 1:
414
415                                if line.find( 'data_source' ) != -1 and line[0] != '#':
416
417                                        source = { }
418                                        source['name'] = line.split( '"' )[1]
419                                        source_words = line.split( '"' )[2].split( ' ' )
420
421                                        for word in source_words:
422
423                                                valid_interval = 1
424
425                                                for letter in word:
426
427                                                        if letter not in string.digits:
428
429                                                                valid_interval = 0
430
431                                                if valid_interval and len(word) > 0:
432
433                                                        source['interval'] = word
434                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
435       
436                                        # No interval found, use Ganglia's default     
437                                        if not source.has_key( 'interval' ):
438                                                source['interval'] = 15
439                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
440
441                                        self.sources.append( source )
442
443        def getInterval( self, source_name ):
444
445                for source in self.sources:
446
447                        if source['name'] == source_name:
448
449                                return source['interval']
450
451                return None
452
453        def getLowestInterval( self ):
454
455                lowest_interval = 0
456
457                for source in self.sources:
458
459                        if not lowest_interval or source['interval'] <= lowest_interval:
460
461                                lowest_interval = source['interval']
462
463                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
464                if lowest_interval:
465                        return lowest_interval
466                else:
467                        return 15
468
469class RRDHandler:
470
471        myMetrics = { }
472        lastStored = { }
473        slot = None
474
475        def __init__( self, config, cluster ):
476                self.cluster = cluster
477                self.config = config
478                self.slot = threading.Lock()
479                self.rrdm = RRDMutator()
480
481        def getClusterName( self ):
482                return self.cluster
483
484        def memMetric( self, host, metric ):
485
486                if self.myMetrics.has_key( host ):
487
488                        if self.myMetrics[ host ].has_key( metric['name'] ):
489
490                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
491
492                                        if mymetric['time'] == metric['time']:
493
494                                                # Allready have this metric, abort
495                                                return 1
496                        else:
497                                self.myMetrics[ host ][ metric['name'] ] = [ ]
498                else:
499                        self.myMetrics[ host ] = { }
500                        self.myMetrics[ host ][ metric['name'] ] = [ ]
501
502                # Ah, psst, push it
503                #
504                # <atomic>
505                self.slot.acquire()
506
507                self.myMetrics[ host ][ metric['name'] ].append( metric )
508
509                self.slot.release()
510                # </atomic>
511
512        def makeUpdateList( self, host, metricname ):
513
514                update_list = [ ]
515                metric = None
516
517                while len( self.myMetrics[ host ][ metricname ] ) > 0:
518
519                        # Kabouter pop
520                        #
521                        # <atomic>     
522                        self.slot.acquire()
523
524                        # len might have changed since loop start
525                        #
526                        if len( self.myMetrics[ host ][ metricname ] ) > 0:
527                                metric = self.myMetrics[ host ][ metricname ].pop()
528
529                        self.slot.release()
530                        # </atomic>
531
532                        if metric:
533                                if self.checkStoreMetric( host, metricname, metric ):
534                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
535                                else:
536                                        print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
537
538                return update_list
539
540        def checkStoreMetric( self, host, metricname, metric ):
541
542                if self.lastStored.has_key( host ):
543
544                        if self.lastStored[ host ].has_key( metricname ):
545
546                                if self.lastStored[ host ][ metricname ] <= metric['time']:
547
548                                        # Allready wrote a value with this timestamp, skip tnx
549                                        return 0
550
551                else:
552                        self.lastStored[ host ] = { }
553
554                self.lastStored[ host ][ metricname ] = metric['time']
555
556                return 1
557
558        def storeMetrics( self ):
559
560                for hostname, mymetrics in self.myMetrics.items():     
561
562                        for metricname, mymetric in mymetrics.items():
563
564                                mytime = self.makeTimeSerial()
565                                correct_serial = self.checkNewRrdPeriod( hostname, mytime )
566                                self.createCheck( hostname, metricname, correct_serial )       
567                                update_ret = self.update( hostname, metricname, correct_serial )
568
569                                if update_ret == 0:
570
571                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
572                                else:
573                                        debug_msg( 9, 'metric update failed' )
574
575        def makeTimeSerial( self ):
576                "Generate a time serial. Seconds since epoch"
577
578                # Seconds since epoch
579                mytime = int( time.time() )
580
581                return mytime
582
583        def makeRrdPath( self, host, metricname=None, timeserial=None ):
584                """
585                Make a RRD location/path and filename
586                If a metric or timeserial are supplied the complete locations
587                will be made, else just the host directory
588                """
589
590                if not timeserial:     
591                        rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
592                else:
593                        rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
594                if metricname:
595                        rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
596                else:
597                        rrd_file = None
598
599                return rrd_dir, rrd_file
600
601        def getLastRrdTimeSerial( self, host ):
602                """
603                Find the last timeserial (directory) for this host
604                This is determined once every host
605                """
606
607                rrd_dir, rrd_file = self.makeRrdPath( host )
608
609                newest_timeserial = 0
610
611                if os.path.exists( rrd_dir ):
612
613                        for root, dirs, files in os.walk( rrd_dir ):
614
615                                for dir in dirs:
616
617                                        valid_dir = 1
618
619                                        for letter in dir:
620                                                if letter not in string.digits:
621                                                        valid_dir = 0
622
623                                        if valid_dir:
624                                                timeserial = dir
625                                                if timeserial > newest_timeserial:
626                                                        newest_timeserial = timeserial
627
628                if newest_timeserial:
629                        return newest_timeserial
630                else:
631                        return 0
632
633        def checkNewRrdPeriod( self, host, current_timeserial ):
634                """
635                Check if current timeserial belongs to recent time period
636                or should become a new period (and file).
637
638                Returns the serial of the correct time period
639                """
640
641                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
642                debug_msg( 9, 'last timeserial of %s is %s' %( host, last_timeserial ) )
643
644                if not last_timeserial:
645                        serial = current_timeserial
646                else:
647
648                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
649
650                        if (current_timeserial - last_timeserial) >= archive_secs:
651                                serial = current_timeserial
652                        else:
653                                serial = last_timeserial
654
655                return serial
656
657        def getFirstTime( self, host, metricname ):
658                "Get the first time of a metric we know of"
659
660                first_time = 0
661
662                for metric in self.myMetrics[ host ][ metricname ]:
663
664                        if not first_time or metric['time'] <= first_time:
665
666                                first_time = metric['time']
667
668                return first_time
669
670        def createCheck( self, host, metricname, timeserial ):
671                "Check if an .rrd allready exists for this metric, create if not"
672
673                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
674
675                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
676
677                if not os.path.exists( rrd_dir ):
678                        os.makedirs( rrd_dir )
679                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
680
681                if not os.path.exists( rrd_file ):
682
683                        interval = self.config.getInterval( self.cluster )
684                        heartbeat = 8 * int(interval)
685
686                        params = [ ]
687
688                        params.append( '--step' )
689                        params.append( str( interval ) )
690
691                        params.append( '--start' )
692                        params.append( str( int( self.getFirstTime( host, metricname ) ) - 1 ) )
693
694                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
695                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
696
697                        self.rrdm.create( str(rrd_file), params )
698
699                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
700
701        def update( self, host, metricname, timeserial ):
702
703                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
704
705                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
706
707                update_list = self.makeUpdateList( host, metricname )
708
709                if len( update_list ) > 0:
710                        ret = self.rrdm.update( str(rrd_file), update_list )
711
712                        if ret:
713                                return 1
714               
715                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
716
717                return 0
718
719def main():
720        "Program startup"
721
722        myProcessor = GangliaXMLProcessor()
723
724        if DAEMONIZE:
725                myProcessor.daemon()
726        else:
727                myProcessor.run()
728
729def check_dir( directory ):
730        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
731
732        if directory[-1] == '/':
733                directory = directory[:-1]
734
735        return directory
736
737def debug_msg( level, msg ):
738
739        if (DEBUG_LEVEL >= level):
740                sys.stderr.write( msg + '\n' )
741
742# Let's go
743if __name__ == '__main__':
744        main()
Note: See TracBrowser for help on using the repository browser.