source: trunk/daemon/togad.py @ 44

Last change on this file since 44 was 44, checked in by bastiaans, 18 years ago

daemon/togad.py:

  • Grabbing of last serials and updates of existing .rrds will now be done on application boot instead of from the parsing thread. Calling from parsing thread caused XML parsing errors because of the time needed.
  • Also replaced os.walk with os.listdir, we don't need a recursive dirlist, just the root list
File size: 18.9 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import string
8import os
9import os.path
10import time
11import re
12import threading
13import mutex
14import random
15from types import *
16
17# Specify debugging level here;
18#
19# 11 = XML: metrics
20# 10 = XML: host, cluster, grid, ganglia
21# 9  = RRD activity, gmetad config parsing
22# 8  = RRD file activity
23# 7  = daemon threading
24#
25DEBUG_LEVEL = 8
26
27# Where is the gmetad.conf located
28#
29GMETAD_CONF = '/etc/gmetad.conf'
30
31# Where to store the archived rrd's
32#
33ARCHIVE_PATH = '/data/toga/rrds'
34
35# List of data_source names to archive for
36#
37ARCHIVE_SOURCES = [ "LISA Cluster" ]
38
39# Amount of hours to store in one single archived .rrd
40#
41ARCHIVE_HOURS_PER_RRD = 12
42
43# Wether or not to run as a daemon in background
44#
45DAEMONIZE = 0
46
47######################
48#                    #
49# Configuration ends #
50#                    #
51######################
52
53# What XML data types not to store
54#
55UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
56
57"""
58This is TOrque-GAnglia's data Daemon
59"""
60
61class RRDMutator:
62        "A class for handling .rrd mutations"
63
64        binary = '/usr/bin/rrdtool'
65
66        def __init__( self, binary=None ):
67
68                if binary:
69                        self.binary = binary
70
71        def create( self, filename, args ):
72                return self.perform( 'create', '"' + filename + '"', args )
73
74        def update( self, filename, args ):
75                return self.perform( 'update', '"' + filename + '"', args )
76
77        def grabLastUpdate( self, filename ):
78
79                last_update = 0
80
81                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
82
83                        if line.find( 'last_update') != -1:
84
85                                last_update = line.split( ' = ' )[1]
86
87                if last_update:
88                        return last_update
89                else:
90                        return 0
91
92        def perform( self, action, filename, args ):
93
94                arg_string = None
95
96                if type( args ) is not ListType:
97                        debug_msg( 8, 'Arguments needs to be of type List' )
98                        return 1
99
100                for arg in args:
101
102                        if not arg_string:
103
104                                arg_string = arg
105                        else:
106                                arg_string = arg_string + ' ' + arg
107
108                debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
109
110                for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines():
111
112                        if line.find( 'ERROR' ) != -1:
113
114                                error_msg = string.join( line.split( ' ' )[1:] )
115                                debug_msg( 8, error_msg )
116                                return 1
117
118                return 0
119
120class GangliaXMLHandler( ContentHandler ):
121        "Parse Ganglia's XML"
122
123        def __init__( self, config ):
124                self.config = config
125                self.clusters = { }
126                self.gatherClusters()
127
128        def gatherClusters( self ):
129
130                archive_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
131
132                hosts = [ ]
133
134                if os.path.exists( archive_dir ):
135
136                        dirlist = os.listdir( archive_dir )
137
138                        for item in dirlist:
139
140                                clustername = item
141
142                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_SOURCES:
143
144                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
145
146        def startElement( self, name, attrs ):
147                "Store appropriate data from xml start tags"
148
149                if name == 'GANGLIA_XML':
150
151                        self.XMLSource = attrs.get( 'SOURCE', "" )
152                        self.gangliaVersion = attrs.get( 'VERSION', "" )
153
154                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
155
156                elif name == 'GRID':
157
158                        self.gridName = attrs.get( 'NAME', "" )
159                        self.time = attrs.get( 'LOCALTIME', "" )
160
161                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
162
163                elif name == 'CLUSTER':
164
165                        self.clusterName = attrs.get( 'NAME', "" )
166                        self.time = attrs.get( 'LOCALTIME', "" )
167
168                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_SOURCES:
169
170                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
171
172                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
173
174                elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:     
175
176                        self.hostName = attrs.get( 'NAME', "" )
177                        self.hostIp = attrs.get( 'IP', "" )
178                        self.hostReported = attrs.get( 'REPORTED', "" )
179
180                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
181
182                elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES:
183
184                        type = attrs.get( 'TYPE', "" )
185
186                        if type not in UNSUPPORTED_ARCHIVE_TYPES:
187
188                                myMetric = { }
189                                myMetric['name'] = attrs.get( 'NAME', "" )
190                                myMetric['val'] = attrs.get( 'VAL', "" )
191                                myMetric['time'] = self.hostReported
192
193                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
194
195                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
196
197        def storeMetrics( self ):
198
199                for clustername, rrdh in self.clusters.items():
200
201                        ret = rrdh.storeMetrics()
202
203                        if ret:
204                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
205                                return 1
206
207                return 0
208
209class GangliaXMLGatherer:
210        "Setup a connection and file object to Ganglia's XML"
211
212        s = None
213
214        def __init__( self, host, port ):
215                "Store host and port for connection"
216
217                self.host = host
218                self.port = port
219                self.connect()
220
221        def connect( self ):
222                "Setup connection to XML source"
223
224                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
225
226                        af, socktype, proto, canonname, sa = res
227
228                        try:
229
230                                self.s = socket.socket( af, socktype, proto )
231
232                        except socket.error, msg:
233
234                                self.s = None
235                                continue
236
237                        try:
238
239                                self.s.connect( sa )
240
241                        except socket.error, msg:
242
243                                self.s.close()
244                                self.s = None
245                                continue
246
247                        break
248
249                if self.s is None:
250
251                        debug_msg( 0, 'Could not open socket' )
252                        sys.exit( 1 )
253
254        def disconnect( self ):
255                "Close socket"
256
257                if self.s:
258                        self.s.close()
259                        self.s = None
260
261        def __del__( self ):
262                "Kill the socket before we leave"
263
264                self.disconnect()
265
266        def getFileObject( self ):
267                "Connect, and return a file object"
268
269                if self.s:
270                        # Apearantly, only data is received when a connection is made
271                        # therefor, disconnect and connect
272                        #
273                        self.disconnect()
274                        self.connect()
275
276                return self.s.makefile( 'r' )
277
278class GangliaXMLProcessor:
279
280        def __init__( self ):
281                "Setup initial XML connection and handlers"
282
283                self.config = GangliaConfigParser( GMETAD_CONF )
284
285                self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
286                self.myParser = make_parser()   
287                self.myHandler = GangliaXMLHandler( self.config )
288                self.myParser.setContentHandler( self.myHandler )
289
290        def daemon( self ):
291                "Run as daemon forever"
292
293                self.DAEMON = 1
294
295                # Fork the first child
296                #
297                pid = os.fork()
298
299                if pid > 0:
300
301                        sys.exit(0)  # end parent
302
303                # creates a session and sets the process group ID
304                #
305                os.setsid()
306
307                # Fork the second child
308                #
309                pid = os.fork()
310
311                if pid > 0:
312
313                        sys.exit(0)  # end parent
314
315                # Go to the root directory and set the umask
316                #
317                os.chdir('/')
318                os.umask(0)
319
320                sys.stdin.close()
321                sys.stdout.close()
322                #sys.stderr.close()
323
324                os.open('/dev/null', 0)
325                os.dup(0)
326                os.dup(0)
327
328                self.run()
329
330        def printTime( self ):
331                "Print current time in human readable format"
332
333                return time.strftime("%a %d %b %Y %H:%M:%S")
334
335        def run( self ):
336                "Main thread"
337
338                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
339                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
340
341                while( 1 ):
342
343                        if not xmlthread.isAlive():
344                                # Gather XML at the same interval as gmetad
345
346                                # threaded call to: self.processXML()
347                                #
348                                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
349                                xmlthread.start()
350
351                        if not storethread.isAlive():
352                                # Store metrics every .. sec
353
354                                # threaded call to: self.storeMetrics()
355                                #
356                                storethread = threading.Thread( None, self.storeMetrics, 'storethread' )
357                                storethread.start()
358               
359                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
360                        time.sleep( 1 ) 
361
362        def storeMetrics( self ):
363                "Store metrics retained in memory to disk"
364
365                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
366
367                # Store metrics somewhere between every 60 and 180 seconds
368                #
369                #STORE_INTERVAL = random.randint( 360, 640 )
370                STORE_INTERVAL = 16
371
372                storethread = threading.Thread( None, self.storeThread, 'storemetricthread' )
373                storethread.start()
374
375                debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
376                time.sleep( STORE_INTERVAL )
377                debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
378
379                if storethread.isAlive():
380
381                        debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
382                        storethread.join( 180 ) # Maximum time is for storing thread to finish
383                        debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
384
385                debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
386
387                return 0
388
389        def storeThread( self ):
390
391                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
392                debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
393                ret = self.myHandler.storeMetrics()
394                debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
395                debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
396               
397                return ret
398
399        def processXML( self ):
400                "Process XML"
401
402                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
403
404                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
405                parsethread.start()
406
407                debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
408                time.sleep( float( self.config.getLowestInterval() ) ) 
409                debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
410
411                if parsethread.isAlive():
412
413                        debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
414                        parsethread.join( 180 ) # Maximum time for XML thread to finish
415                        debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
416
417                debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
418
419                return 0
420
421        def parseThread( self ):
422
423                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
424                debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
425                ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
426                debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
427                debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
428
429                return ret
430
431class GangliaConfigParser:
432
433        sources = [ ]
434
435        def __init__( self, config ):
436
437                self.config = config
438                self.parseValues()
439
440        def parseValues( self ):
441                "Parse certain values from gmetad.conf"
442
443                readcfg = open( self.config, 'r' )
444
445                for line in readcfg.readlines():
446
447                        if line.count( '"' ) > 1:
448
449                                if line.find( 'data_source' ) != -1 and line[0] != '#':
450
451                                        source = { }
452                                        source['name'] = line.split( '"' )[1]
453                                        source_words = line.split( '"' )[2].split( ' ' )
454
455                                        for word in source_words:
456
457                                                valid_interval = 1
458
459                                                for letter in word:
460
461                                                        if letter not in string.digits:
462
463                                                                valid_interval = 0
464
465                                                if valid_interval and len(word) > 0:
466
467                                                        source['interval'] = word
468                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
469       
470                                        # No interval found, use Ganglia's default     
471                                        if not source.has_key( 'interval' ):
472                                                source['interval'] = 15
473                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
474
475                                        self.sources.append( source )
476
477        def getInterval( self, source_name ):
478
479                for source in self.sources:
480
481                        if source['name'] == source_name:
482
483                                return source['interval']
484
485                return None
486
487        def getLowestInterval( self ):
488
489                lowest_interval = 0
490
491                for source in self.sources:
492
493                        if not lowest_interval or source['interval'] <= lowest_interval:
494
495                                lowest_interval = source['interval']
496
497                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
498                if lowest_interval:
499                        return lowest_interval
500                else:
501                        return 15
502
503class RRDHandler:
504
505        myMetrics = { }
506        lastStored = { }
507        slot = None
508
509        def __init__( self, config, cluster ):
510                self.block = 0
511                self.cluster = cluster
512                self.config = config
513                self.slot = threading.Lock()
514                self.rrdm = RRDMutator()
515                self.gatherLastUpdates()
516
517        def isBlocking( self ):
518
519                return self.block
520
521        def gatherLastUpdates( self ):
522                "Populate the lastStored list, containing timestamps of all last updates"
523
524                self.block = 1
525
526                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
527
528                hosts = [ ]
529
530                if os.path.exists( cluster_dir ):
531
532                        dirlist = os.listdir( cluster_dir )
533
534                        for dir in dirlist:
535
536                                hosts.append( dir )
537
538                for host in hosts:
539
540                        last_serial = self.getLastRrdTimeSerial( host )
541                        if last_serial:
542
543                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
544                                if os.path.exists( metric_dir ):
545
546                                        dirlist = os.listdir( metric_dir )
547
548                                        for file in dirlist:
549
550                                                metricname = file.split( '.rrd' )[0]
551
552                                                if not self.lastStored.has_key( host ):
553
554                                                        self.lastStored[ host ] = { }
555
556                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
557
558        def getClusterName( self ):
559                return self.cluster
560
561        def memMetric( self, host, metric ):
562
563                if self.myMetrics.has_key( host ):
564
565                        if self.myMetrics[ host ].has_key( metric['name'] ):
566
567                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
568
569                                        if mymetric['time'] == metric['time']:
570
571                                                # Allready have this metric, abort
572                                                return 1
573                        else:
574                                self.myMetrics[ host ][ metric['name'] ] = [ ]
575                else:
576                        self.myMetrics[ host ] = { }
577                        self.myMetrics[ host ][ metric['name'] ] = [ ]
578
579                # Ah, psst, push it
580                #
581                # <atomic>
582                self.slot.acquire()
583
584                self.myMetrics[ host ][ metric['name'] ].append( metric )
585
586                self.slot.release()
587                # </atomic>
588
589        def makeUpdateList( self, host, metricname ):
590
591                update_list = [ ]
592                metric = None
593
594                while len( self.myMetrics[ host ][ metricname ] ) > 0:
595
596                        # Kabouter pop
597                        #
598                        # <atomic>     
599                        self.slot.acquire()
600
601                        # len might have changed since loop start
602                        #
603                        if len( self.myMetrics[ host ][ metricname ] ) > 0:
604                                metric = self.myMetrics[ host ][ metricname ].pop( 0 )
605
606                        self.slot.release()
607                        # </atomic>
608
609                        if metric:
610                                if self.checkStoreMetric( host, metricname, metric ):
611                                        update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
612                                #else:
613                                        #print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )
614
615                return update_list
616
617        def checkStoreMetric( self, host, metricname, metric ):
618
619                if self.lastStored.has_key( host ):
620
621                        if self.lastStored[ host ].has_key( metricname ):
622
623                                if metric['time'] <= self.lastStored[ host ][ metricname ]:
624
625                                        # Allready wrote a value with this timestamp, skip tnx
626                                        return 0
627
628                else:
629                        self.lastStored[ host ] = { }
630
631                self.lastStored[ host ][ metricname ] = metric['time']
632
633                return 1
634
635        def storeMetrics( self ):
636
637                for hostname, mymetrics in self.myMetrics.items():     
638
639                        for metricname, mymetric in mymetrics.items():
640
641                                mytime = self.makeTimeSerial()
642                                correct_serial = self.checkNewRrdPeriod( hostname, mytime )
643                                self.createCheck( hostname, metricname, correct_serial )       
644                                update_ret = self.update( hostname, metricname, correct_serial )
645
646                                if update_ret == 0:
647
648                                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
649                                else:
650                                        debug_msg( 9, 'metric update failed' )
651
652                                #sys.exit(1)
653
654        def makeTimeSerial( self ):
655                "Generate a time serial. Seconds since epoch"
656
657                # Seconds since epoch
658                mytime = int( time.time() )
659
660                return mytime
661
662        def makeRrdPath( self, host, metricname=None, timeserial=None ):
663                """
664                Make a RRD location/path and filename
665                If a metric or timeserial are supplied the complete locations
666                will be made, else just the host directory
667                """
668
669                if not timeserial:     
670                        rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
671                else:
672                        rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
673                if metricname:
674                        rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
675                else:
676                        rrd_file = None
677
678                return rrd_dir, rrd_file
679
680        def getLastRrdTimeSerial( self, host ):
681                """
682                Find the last timeserial (directory) for this host
683                This is determined once every host
684                """
685
686                rrd_dir, rrd_file = self.makeRrdPath( host )
687
688                newest_timeserial = 0
689
690                if os.path.exists( rrd_dir ):
691
692                        dirlist = os.listdir( rrd_dir )
693
694                        for dir in dirlist:
695
696                                valid_dir = 1
697
698                                for letter in dir:
699                                        if letter not in string.digits:
700                                                valid_dir = 0
701
702                                if valid_dir:
703                                        timeserial = dir
704                                        if timeserial > newest_timeserial:
705                                                newest_timeserial = timeserial
706
707                if newest_timeserial:
708                        return newest_timeserial
709                else:
710                        return 0
711
712        def checkNewRrdPeriod( self, host, current_timeserial ):
713                """
714                Check if current timeserial belongs to recent time period
715                or should become a new period (and file).
716
717                Returns the serial of the correct time period
718                """
719
720                last_timeserial = int( self.getLastRrdTimeSerial( host ) )
721                debug_msg( 9, 'last timeserial of %s is %s' %( host, last_timeserial ) )
722
723                if not last_timeserial:
724                        serial = current_timeserial
725                else:
726
727                        archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
728
729                        if (current_timeserial - last_timeserial) >= archive_secs:
730                                serial = current_timeserial
731                        else:
732                                serial = last_timeserial
733
734                return serial
735
736        def getFirstTime( self, host, metricname ):
737                "Get the first time of a metric we know of"
738
739                first_time = 0
740
741                for metric in self.myMetrics[ host ][ metricname ]:
742
743                        if not first_time or metric['time'] <= first_time:
744
745                                first_time = metric['time']
746
747                return first_time
748
749        def createCheck( self, host, metricname, timeserial ):
750                "Check if an .rrd allready exists for this metric, create if not"
751
752                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
753
754                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
755
756                if not os.path.exists( rrd_dir ):
757                        os.makedirs( rrd_dir )
758                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
759
760                if not os.path.exists( rrd_file ):
761
762                        interval = self.config.getInterval( self.cluster )
763                        heartbeat = 8 * int(interval)
764
765                        params = [ ]
766
767                        params.append( '--step' )
768                        params.append( str( interval ) )
769
770                        params.append( '--start' )
771                        params.append( str( int( self.getFirstTime( host, metricname ) ) - 1 ) )
772
773                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
774                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
775
776                        self.rrdm.create( str(rrd_file), params )
777
778                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
779
780        def update( self, host, metricname, timeserial ):
781
782                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
783
784                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
785
786                update_list = self.makeUpdateList( host, metricname )
787
788                if len( update_list ) > 0:
789                        ret = self.rrdm.update( str(rrd_file), update_list )
790
791                        if ret:
792                                return 1
793               
794                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
795
796                return 0
797
798def main():
799        "Program startup"
800
801        myProcessor = GangliaXMLProcessor()
802
803        if DAEMONIZE:
804                myProcessor.daemon()
805        else:
806                myProcessor.run()
807
808def check_dir( directory ):
809        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
810
811        if directory[-1] == '/':
812                directory = directory[:-1]
813
814        return directory
815
816def debug_msg( level, msg ):
817
818        if (DEBUG_LEVEL >= level):
819                sys.stderr.write( msg + '\n' )
820
821# Let's go
822if __name__ == '__main__':
823        main()
Note: See TracBrowser for help on using the repository browser.