source: trunk/daemon/togad.py @ 11

Last change on this file since 11 was 11, checked in by bastiaans, 19 years ago

daemon/togad.py:

Gmetad.conf parsing bugfixes

File size: 6.1 KB
RevLine 
[3]1#!/usr/bin/env python
2
[5]3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
[9]7import rrdtool
8import string
[3]9
[8]10# Specify debugging level here;
11#
12# >10 = metric XML
13# >9  = host,cluster,grid,ganglia XML
[9]14# >8  = RRD activity,gmetad config parsing
[8]15#
16DEBUG_LEVEL = 9
[6]17
[9]18# Where is the gmetad.conf located
19#
20GMETAD_CONF = '/etc/gmetad.conf'
21
22# Where to store the archived rrd's
23#
24ARCHIVE_PATH = '/data/toga/rrds'
25
26# List of data_source names to archive for
27#
28ARCHIVE_SOURCES = [ "LISA Cluster" ]
29
30#
31# Configuration ends
32#
33
[8]34"""
35This is TOrque-GAnglia's data Daemon
36"""
37
[6]38class GangliaXMLHandler( ContentHandler ):
[8]39        "Parse Ganglia's XML"
[3]40
[6]41        metrics = [ ]
42
43        def startElement( self, name, attrs ):
[8]44                "Store appropriate data from xml start tags"
[3]45
[7]46                if name == 'GANGLIA_XML':
47                        self.XMLSource = attrs.get('SOURCE',"")
48                        self.gangliaVersion = attrs.get('VERSION',"")
[8]49                        if (DEBUG_LEVEL>9): print 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion )
[6]50
[7]51                elif name == 'GRID':
52                        self.gridName = attrs.get('NAME',"")
[8]53                        if (DEBUG_LEVEL>9): print '`-Grid found: %s' %( self.gridName )
[6]54
[7]55                elif name == 'CLUSTER':
56                        self.clusterName = attrs.get('NAME',"")
[9]57                        self.rrd = RRDHandler( self.clusterName )
[8]58                        if (DEBUG_LEVEL>9): print ' |-Cluster found: %s' %( self.clusterName )
[6]59
[7]60                elif name == 'HOST':     
61                        self.hostName = attrs.get('NAME',"")
62                        self.hostIp = attrs.get('IP',"")
63                        self.hostReported = attrs.get('REPORTED',"")
[9]64                        # Reset the metrics list for each host
65                        self.metrics = [ ]
[8]66                        if (DEBUG_LEVEL>9): print ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported )
[6]67
[7]68                elif name == 'METRIC':
[6]69                        myMetric = { }
[7]70                        myMetric['name'] = attrs.get('NAME',"")
71                        myMetric['val'] = attrs.get('VAL',"")
[6]72
73                        self.metrics.append( myMetric ) 
[8]74                        if (DEBUG_LEVEL>10): print ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] )
[6]75
[5]76                return
[3]77
[9]78        def endElement( self, name ):
[7]79                #if name == 'ganglia_xml':
[3]80
[7]81                #if name == 'grid':
[3]82
[7]83                #if name == 'cluster':
[6]84
[9]85                if name == 'host':     
86                        self.storeMetrics( self.hostName )
[6]87
88                #if name == 'metric':
89
[9]90        def storeMetrics( self, hostname ):
91
92                for metric in self.metrics:
93                        self.rrd.checkCreate( hostname, metric['name'] )       
94                        self.rrd.update( hostname, metric['name'], metric['val'] )
95                        if (DEBUG_LEVEL>8): print 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] )
96                        sys.exit(1)
97       
98
[5]99class GangliaXMLGatherer:
[8]100        "Setup a connection and file object to Ganglia's XML"
[3]101
[8]102        s = None
103
104        def __init__( self, host, port ):
105                "Store host and port for connection"
106
[5]107                self.host = host
108                self.port = port
[3]109
[8]110        def __del__( self ):
111                "Kill the socket before we leave"
112
113                self.s.close()
114
115        def getFileObject( self ):
116                "Connect, and return a file object"
117
118                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[5]119                        af, socktype, proto, canonname, sa = res
120                        try:
[8]121                                self.s = socket.socket( af, socktype, proto )
[5]122                        except socket.error, msg:
[8]123                                self.s = None
[5]124                                continue
125                        try:
[8]126                                self.s.connect( sa )
[5]127                        except socket.error, msg:
[8]128                                self.s.close()
129                                self.s = None
[5]130                                continue
131                        break
[3]132
[8]133                if self.s is None:
134                        print 'Could not open socket'
[5]135                        sys.exit(1)
136
[8]137                return self.s.makefile( 'r' )
[5]138
[8]139class GangliaXMLProcessor:
[5]140
[9]141        def daemon( self ):
[8]142                "Run as daemon forever"
[5]143
[8]144                self.DAEMON = 1
[5]145
[8]146                # Fork the first child
147                #
148                pid = os.fork()
149                if pid > 0:
150                        sys.exit(0)  # end parrent
[7]151
[8]152                # creates a session and sets the process group ID
153                #
154                os.setsid()
[7]155
[8]156                # Fork the second child
157                #
158                pid = os.fork()
159                if pid > 0:
160                        sys.exit(0)  # end parrent
[5]161
[8]162                # Go to the root directory and set the umask
163                #
164                os.chdir('/')
165                os.umask(0)
166
167                sys.stdin.close()
168                sys.stdout.close()
169                sys.stderr.close()
170
171                os.open('/dev/null', 0)
172                os.dup(0)
173                os.dup(0)
174
175                self.run()
176
[9]177        def run( self ):
[8]178                "Main thread"
179
180                while ( 1 ):
181                        self.processXML()
182                        time.sleep( 5 )
183
184        def processXML( self ):
185                "Process XML"
186
187                myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
188
189                myParser = make_parser()   
190                myHandler = GangliaXMLHandler()
191                myParser.setContentHandler( myHandler )
192
193                myParser.parse( myXMLGatherer.getFileObject() )
194
[9]195class GangliaConfigParser:
196
197        sources = [ ]
198
199        def __init__( self, config ):
200                self.config = config
201                self.parseValues()
202
203        def parseValues(self):
204                "Parse certain values from gmetad.conf"
205
206                readcfg = open( self.config, 'r' )
207
208                for line in readcfg.readlines():
209
210                        if line.count( '"' ) > 1:
211
[10]212                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]213
[11]214                                        source = { }
215                                        source['name'] = line.split( '"' )[1]
[9]216                                        source_words = line.split( '"' )[2].split( ' ' )
217
218                                        for word in source_words:
219
220                                                valid_interval = 1
221
222                                                for letter in word:
223                                                        if letter not in string.digits:
224                                                                valid_interval = 0
225
[10]226                                                if valid_interval and len(word) > 0:
[9]227                                                        source['interval'] = word
228                                                        if (DEBUG_LEVEL>8): print 'polling interval for %s = %s' %(source['name'], source['interval'] )
229               
230                # No interval found, use Ganglia's default     
231                if not source.has_key( 'interval' ):
232                        source['interval'] = 15
[11]233                        if (DEBUG_LEVEL>8): print 'polling interval for %s defaulted to 15' %(source['name'])
[9]234
235                self.sources.append( source )
236
237        def getInterval( self, source_name ):
238                for source in self.sources:
239                        if source['name'] == name:
240                                return source['interval']
241                return None
242
243class RRDHandler:
244
245        def __init__( self, cluster ):
246                self.cluster = cluster
247
248                self.gmetad_conf = GangliaConfigParser( GMETAD_CONF )
249
250        def createCheck( self, host, metric ):
251                "Check if an .rrd allready exists for this metric, create if not"
252
253                rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
254
255                if not os.path.exists( rrd_dir ):
256                        os.makedirs( rrd_dir )
257
258                rrd_file = '%s/%s.rrd' %( rrd_dir, metric )
259
260                interval = self.gmetad_conf.getInterval( self.cluster )
261
262        def update( self, metric, timestamp, val ):
263
264                pass
265
266                #rrd.update( bla )
267               
268
[8]269def main():
270        "Program startup"
271
272        myProcessor = GangliaXMLProcessor()
273        myProcessor.processXML()
274
[9]275def check_dir( directory ):
276        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
277
278        if directory[-1] == '/':
279                directory = directory[:-1]
280
281        return directory
282
[5]283# Let's go
[9]284if __name__ == '__main__':
285        main()
Note: See TracBrowser for help on using the repository browser.