source: trunk/daemon/togad.py @ 12

Last change on this file since 12 was 12, checked in by bastiaans, 18 years ago

daemon/togad.py:

  • Miscellanious debug msg restyling
  • Correct time handling for metrics
File size: 6.5 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import rrdtool
8import string
9import os
10import os.path
11
12# Specify debugging level here;
13#
14# >10 = metric XML
15# >9  = host,cluster,grid,ganglia XML
16# >8  = RRD activity,gmetad config parsing
17#
18DEBUG_LEVEL = 9
19
20# Where is the gmetad.conf located
21#
22GMETAD_CONF = '/etc/gmetad.conf'
23
24# Where to store the archived rrd's
25#
26ARCHIVE_PATH = '/data/toga/rrds'
27
28# List of data_source names to archive for
29#
30ARCHIVE_SOURCES = [ "LISA Cluster" ]
31
32#
33# Configuration ends
34#
35
36"""
37This is TOrque-GAnglia's data Daemon
38"""
39
40class GangliaXMLHandler( ContentHandler ):
41        "Parse Ganglia's XML"
42
43        metrics = [ ]
44
45        def startElement( self, name, attrs ):
46                "Store appropriate data from xml start tags"
47
48                if name == 'GANGLIA_XML':
49                        self.XMLSource = attrs.get('SOURCE',"")
50                        self.gangliaVersion = attrs.get('VERSION',"")
51                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
52
53                elif name == 'GRID':
54                        self.gridName = attrs.get('NAME',"")
55                        self.time = attrs.get('LOCALTIME',"")
56                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
57
58                elif name == 'CLUSTER':
59                        self.clusterName = attrs.get('NAME',"")
60                        self.time = attrs.get('LOCALTIME',"")
61                        self.rrd = RRDHandler( self.clusterName )
62                        debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
63
64                elif name == 'HOST':     
65                        self.hostName = attrs.get('NAME',"")
66                        self.hostIp = attrs.get('IP',"")
67                        self.hostReported = attrs.get('REPORTED',"")
68                        # Reset the metrics list for each host
69                        self.metrics = [ ]
70                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
71
72                elif name == 'METRIC':
73                        myMetric = { }
74                        myMetric['name'] = attrs.get('NAME',"")
75                        myMetric['val'] = attrs.get('VAL',"")
76                        myMetric['time'] = self.time
77
78                        self.metrics.append( myMetric ) 
79                        debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
80
81                return
82
83        def endElement( self, name ):
84                #if name == 'GANGLIA_XML':
85
86                #if name == 'GRID':
87
88                #if name == 'CLUSTER':
89
90                if name == 'HOST':     
91                        self.storeMetrics( self.hostName )
92
93                #if name == 'METRIC':
94
95        def storeMetrics( self, hostname ):
96
97                for metric in self.metrics:
98                        self.rrd.createCheck( hostname, metric )       
99                        self.rrd.update( hostname, metric['name'], metric['val'] )
100                        debug_msg( 9, 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] ) )
101                        sys.exit(1)
102       
103
104class GangliaXMLGatherer:
105        "Setup a connection and file object to Ganglia's XML"
106
107        s = None
108
109        def __init__( self, host, port ):
110                "Store host and port for connection"
111
112                self.host = host
113                self.port = port
114
115        def __del__( self ):
116                "Kill the socket before we leave"
117
118                self.s.close()
119
120        def getFileObject( self ):
121                "Connect, and return a file object"
122
123                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
124                        af, socktype, proto, canonname, sa = res
125                        try:
126                                self.s = socket.socket( af, socktype, proto )
127                        except socket.error, msg:
128                                self.s = None
129                                continue
130                        try:
131                                self.s.connect( sa )
132                        except socket.error, msg:
133                                self.s.close()
134                                self.s = None
135                                continue
136                        break
137
138                if self.s is None:
139                        print 'Could not open socket'
140                        sys.exit(1)
141
142                return self.s.makefile( 'r' )
143
144class GangliaXMLProcessor:
145
146        def daemon( self ):
147                "Run as daemon forever"
148
149                self.DAEMON = 1
150
151                # Fork the first child
152                #
153                pid = os.fork()
154                if pid > 0:
155                        sys.exit(0)  # end parrent
156
157                # creates a session and sets the process group ID
158                #
159                os.setsid()
160
161                # Fork the second child
162                #
163                pid = os.fork()
164                if pid > 0:
165                        sys.exit(0)  # end parrent
166
167                # Go to the root directory and set the umask
168                #
169                os.chdir('/')
170                os.umask(0)
171
172                sys.stdin.close()
173                sys.stdout.close()
174                if (DEBUGLEVEL == 0):
175                        sys.stderr.close()
176
177                os.open('/dev/null', 0)
178                os.dup(0)
179                os.dup(0)
180
181                self.run()
182
183        def run( self ):
184                "Main thread"
185
186                while ( 1 ):
187                        self.processXML()
188                        time.sleep( 5 )
189
190        def processXML( self ):
191                "Process XML"
192
193                myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
194
195                myParser = make_parser()   
196                myHandler = GangliaXMLHandler()
197                myParser.setContentHandler( myHandler )
198
199                myParser.parse( myXMLGatherer.getFileObject() )
200
201class GangliaConfigParser:
202
203        sources = [ ]
204
205        def __init__( self, config ):
206                self.config = config
207                self.parseValues()
208
209        def parseValues(self):
210                "Parse certain values from gmetad.conf"
211
212                readcfg = open( self.config, 'r' )
213
214                for line in readcfg.readlines():
215
216                        if line.count( '"' ) > 1:
217
218                                if line.find( 'data_source' ) != -1 and line[0] != '#':
219
220                                        source = { }
221                                        source['name'] = line.split( '"' )[1]
222                                        source_words = line.split( '"' )[2].split( ' ' )
223
224                                        for word in source_words:
225
226                                                valid_interval = 1
227
228                                                for letter in word:
229                                                        if letter not in string.digits:
230                                                                valid_interval = 0
231
232                                                if valid_interval and len(word) > 0:
233                                                        source['interval'] = word
234                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
235               
236                # No interval found, use Ganglia's default     
237                if not source.has_key( 'interval' ):
238                        source['interval'] = 15
239                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
240
241                self.sources.append( source )
242
243        def getInterval( self, source_name ):
244                for source in self.sources:
245                        if source['name'] == source_name:
246                                return source['interval']
247                return None
248
249class RRDHandler:
250
251        def __init__( self, cluster ):
252                self.cluster = cluster
253
254                self.gmetad_conf = GangliaConfigParser( GMETAD_CONF )
255
256        def createCheck( self, host, metric ):
257                "Check if an .rrd allready exists for this metric, create if not"
258
259                rrd_parameters = [ ]
260                rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
261
262                if not os.path.exists( rrd_dir ):
263                        os.makedirs( rrd_dir )
264
265                rrd_file = '%s/%s.rrd' %( rrd_dir, metric['name'] )
266
267                interval = self.gmetad_conf.getInterval( self.cluster )
268                heartbeat = 8 * int(interval)
269
270                rrd_parameters.append( '--step' )
271                rrd_parameters.append( interval )
272
273                rrd_parameters.append( '--start' )
274                rrd_parameters.append( metric['time'] )
275
276                print rrd_parameters
277
278        def update( self, metric, timestamp, val ):
279
280                pass
281
282                #rrd.update( bla )
283               
284
285def main():
286        "Program startup"
287
288        myProcessor = GangliaXMLProcessor()
289        myProcessor.processXML()
290
291def check_dir( directory ):
292        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
293
294        if directory[-1] == '/':
295                directory = directory[:-1]
296
297        return directory
298
299def debug_msg( level, msg ):
300
301        if (DEBUG_LEVEL >= level):
302                sys.stderr.write( msg + '\n' )
303
304# Let's go
305if __name__ == '__main__':
306        main()
Note: See TracBrowser for help on using the repository browser.