source: trunk/daemon/togad.py @ 9

Last change on this file since 9 was 9, checked in by bastiaans, 18 years ago

daemon/togad.py:

First attempt at rrd handling

  • Need to parse gmetad.conf first: class added
File size: 6.1 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import rrdtool
8import string
9
10# Specify debugging level here;
11#
12# >10 = metric XML
13# >9  = host,cluster,grid,ganglia XML
14# >8  = RRD activity,gmetad config parsing
15#
16DEBUG_LEVEL = 9
17
18# Where is the gmetad.conf located
19#
20GMETAD_CONF = '/etc/gmetad.conf'
21
22# Where to store the archived rrd's
23#
24ARCHIVE_PATH = '/data/toga/rrds'
25
26# List of data_source names to archive for
27#
28ARCHIVE_SOURCES = [ "LISA Cluster" ]
29
30#
31# Configuration ends
32#
33
34"""
35This is TOrque-GAnglia's data Daemon
36"""
37
38class GangliaXMLHandler( ContentHandler ):
39        "Parse Ganglia's XML"
40
41        metrics = [ ]
42
43        def startElement( self, name, attrs ):
44                "Store appropriate data from xml start tags"
45
46                if name == 'GANGLIA_XML':
47                        self.XMLSource = attrs.get('SOURCE',"")
48                        self.gangliaVersion = attrs.get('VERSION',"")
49                        if (DEBUG_LEVEL>9): print 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion )
50
51                elif name == 'GRID':
52                        self.gridName = attrs.get('NAME',"")
53                        if (DEBUG_LEVEL>9): print '`-Grid found: %s' %( self.gridName )
54
55                elif name == 'CLUSTER':
56                        self.clusterName = attrs.get('NAME',"")
57                        self.rrd = RRDHandler( self.clusterName )
58                        if (DEBUG_LEVEL>9): print ' |-Cluster found: %s' %( self.clusterName )
59
60                elif name == 'HOST':     
61                        self.hostName = attrs.get('NAME',"")
62                        self.hostIp = attrs.get('IP',"")
63                        self.hostReported = attrs.get('REPORTED',"")
64                        # Reset the metrics list for each host
65                        self.metrics = [ ]
66                        if (DEBUG_LEVEL>9): print ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported )
67
68                elif name == 'METRIC':
69                        myMetric = { }
70                        myMetric['name'] = attrs.get('NAME',"")
71                        myMetric['val'] = attrs.get('VAL',"")
72
73                        self.metrics.append( myMetric ) 
74                        if (DEBUG_LEVEL>10): print ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] )
75
76                return
77
78        def endElement( self, name ):
79                #if name == 'ganglia_xml':
80
81                #if name == 'grid':
82
83                #if name == 'cluster':
84
85                if name == 'host':     
86                        self.storeMetrics( self.hostName )
87
88                #if name == 'metric':
89
90        def storeMetrics( self, hostname ):
91
92                for metric in self.metrics:
93                        self.rrd.checkCreate( hostname, metric['name'] )       
94                        self.rrd.update( hostname, metric['name'], metric['val'] )
95                        if (DEBUG_LEVEL>8): print 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] )
96                        sys.exit(1)
97       
98
99class GangliaXMLGatherer:
100        "Setup a connection and file object to Ganglia's XML"
101
102        s = None
103
104        def __init__( self, host, port ):
105                "Store host and port for connection"
106
107                self.host = host
108                self.port = port
109
110        def __del__( self ):
111                "Kill the socket before we leave"
112
113                self.s.close()
114
115        def getFileObject( self ):
116                "Connect, and return a file object"
117
118                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
119                        af, socktype, proto, canonname, sa = res
120                        try:
121                                self.s = socket.socket( af, socktype, proto )
122                        except socket.error, msg:
123                                self.s = None
124                                continue
125                        try:
126                                self.s.connect( sa )
127                        except socket.error, msg:
128                                self.s.close()
129                                self.s = None
130                                continue
131                        break
132
133                if self.s is None:
134                        print 'Could not open socket'
135                        sys.exit(1)
136
137                return self.s.makefile( 'r' )
138
139class GangliaXMLProcessor:
140
141        def daemon( self ):
142                "Run as daemon forever"
143
144                self.DAEMON = 1
145
146                # Fork the first child
147                #
148                pid = os.fork()
149                if pid > 0:
150                        sys.exit(0)  # end parrent
151
152                # creates a session and sets the process group ID
153                #
154                os.setsid()
155
156                # Fork the second child
157                #
158                pid = os.fork()
159                if pid > 0:
160                        sys.exit(0)  # end parrent
161
162                # Go to the root directory and set the umask
163                #
164                os.chdir('/')
165                os.umask(0)
166
167                sys.stdin.close()
168                sys.stdout.close()
169                sys.stderr.close()
170
171                os.open('/dev/null', 0)
172                os.dup(0)
173                os.dup(0)
174
175                self.run()
176
177        def run( self ):
178                "Main thread"
179
180                while ( 1 ):
181                        self.processXML()
182                        time.sleep( 5 )
183
184        def processXML( self ):
185                "Process XML"
186
187                myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
188
189                myParser = make_parser()   
190                myHandler = GangliaXMLHandler()
191                myParser.setContentHandler( myHandler )
192
193                myParser.parse( myXMLGatherer.getFileObject() )
194
195class GangliaConfigParser:
196
197        sources = [ ]
198
199        def __init__( self, config ):
200                self.config = config
201                self.parseValues()
202
203        def parseValues(self):
204                "Parse certain values from gmetad.conf"
205
206                readcfg = open( self.config, 'r' )
207
208                for line in readcfg.readlines():
209
210                        if line.count( '"' ) > 1:
211
212                                source = { }
213                                source['name'] = line.split( '"' )[1]
214
215                                if line.find( 'data_source' ) and line[0] != '#':
216
217                                        source_words = line.split( '"' )[2].split( ' ' )
218                                        print line.split( '"' )
219                                        print source_words
220
221                                        for word in source_words:
222
223                                                print 'word %s' %word
224                                                valid_interval = 1
225
226                                                for letter in word:
227                                                        print 'letter %s' %letter
228                                                        if letter not in string.digits:
229                                                                valid_interval = 0
230
231                                                if valid_interval:
232                                                        source['interval'] = word
233                                                        if (DEBUG_LEVEL>8): print 'polling interval for %s = %s' %(source['name'], source['interval'] )
234               
235                # No interval found, use Ganglia's default     
236                if not source.has_key( 'interval' ):
237                        source['interval'] = 15
238
239                self.sources.append( source )
240
241        def getInterval( self, source_name ):
242                for source in self.sources:
243                        if source['name'] == name:
244                                return source['interval']
245                return None
246
247class RRDHandler:
248
249        def __init__( self, cluster ):
250                self.cluster = cluster
251
252                self.gmetad_conf = GangliaConfigParser( GMETAD_CONF )
253
254        def createCheck( self, host, metric ):
255                "Check if an .rrd allready exists for this metric, create if not"
256
257                rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
258
259                if not os.path.exists( rrd_dir ):
260                        os.makedirs( rrd_dir )
261
262                rrd_file = '%s/%s.rrd' %( rrd_dir, metric )
263
264                interval = self.gmetad_conf.getInterval( self.cluster )
265
266        def update( self, metric, timestamp, val ):
267
268                pass
269
270                #rrd.update( bla )
271               
272
273def main():
274        "Program startup"
275
276        myProcessor = GangliaXMLProcessor()
277        myProcessor.processXML()
278
279def check_dir( directory ):
280        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
281
282        if directory[-1] == '/':
283                directory = directory[:-1]
284
285        return directory
286
287# Let's go
288if __name__ == '__main__':
289        main()
Note: See TracBrowser for help on using the repository browser.