source: trunk/daemon/togad.py @ 10

Last change on this file since 10 was 10, checked in by bastiaans, 19 years ago

daemon/togad.py:

Interval parsing working

File size: 6.0 KB
Line 
1#!/usr/bin/env python
2
3from xml.sax import make_parser
4from xml.sax.handler import ContentHandler
5import socket
6import sys
7import rrdtool
8import string
9
10# Specify debugging level here;
11#
12# >10 = metric XML
13# >9  = host,cluster,grid,ganglia XML
14# >8  = RRD activity,gmetad config parsing
15#
16DEBUG_LEVEL = 9
17
18# Where is the gmetad.conf located
19#
20GMETAD_CONF = '/etc/gmetad.conf'
21
22# Where to store the archived rrd's
23#
24ARCHIVE_PATH = '/data/toga/rrds'
25
26# List of data_source names to archive for
27#
28ARCHIVE_SOURCES = [ "LISA Cluster" ]
29
30#
31# Configuration ends
32#
33
34"""
35This is TOrque-GAnglia's data Daemon
36"""
37
38class GangliaXMLHandler( ContentHandler ):
39        "Parse Ganglia's XML"
40
41        metrics = [ ]
42
43        def startElement( self, name, attrs ):
44                "Store appropriate data from xml start tags"
45
46                if name == 'GANGLIA_XML':
47                        self.XMLSource = attrs.get('SOURCE',"")
48                        self.gangliaVersion = attrs.get('VERSION',"")
49                        if (DEBUG_LEVEL>9): print 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion )
50
51                elif name == 'GRID':
52                        self.gridName = attrs.get('NAME',"")
53                        if (DEBUG_LEVEL>9): print '`-Grid found: %s' %( self.gridName )
54
55                elif name == 'CLUSTER':
56                        self.clusterName = attrs.get('NAME',"")
57                        self.rrd = RRDHandler( self.clusterName )
58                        if (DEBUG_LEVEL>9): print ' |-Cluster found: %s' %( self.clusterName )
59
60                elif name == 'HOST':     
61                        self.hostName = attrs.get('NAME',"")
62                        self.hostIp = attrs.get('IP',"")
63                        self.hostReported = attrs.get('REPORTED',"")
64                        # Reset the metrics list for each host
65                        self.metrics = [ ]
66                        if (DEBUG_LEVEL>9): print ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported )
67
68                elif name == 'METRIC':
69                        myMetric = { }
70                        myMetric['name'] = attrs.get('NAME',"")
71                        myMetric['val'] = attrs.get('VAL',"")
72
73                        self.metrics.append( myMetric ) 
74                        if (DEBUG_LEVEL>10): print ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] )
75
76                return
77
78        def endElement( self, name ):
79                #if name == 'ganglia_xml':
80
81                #if name == 'grid':
82
83                #if name == 'cluster':
84
85                if name == 'host':     
86                        self.storeMetrics( self.hostName )
87
88                #if name == 'metric':
89
90        def storeMetrics( self, hostname ):
91
92                for metric in self.metrics:
93                        self.rrd.checkCreate( hostname, metric['name'] )       
94                        self.rrd.update( hostname, metric['name'], metric['val'] )
95                        if (DEBUG_LEVEL>8): print 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] )
96                        sys.exit(1)
97       
98
99class GangliaXMLGatherer:
100        "Setup a connection and file object to Ganglia's XML"
101
102        s = None
103
104        def __init__( self, host, port ):
105                "Store host and port for connection"
106
107                self.host = host
108                self.port = port
109
110        def __del__( self ):
111                "Kill the socket before we leave"
112
113                self.s.close()
114
115        def getFileObject( self ):
116                "Connect, and return a file object"
117
118                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
119                        af, socktype, proto, canonname, sa = res
120                        try:
121                                self.s = socket.socket( af, socktype, proto )
122                        except socket.error, msg:
123                                self.s = None
124                                continue
125                        try:
126                                self.s.connect( sa )
127                        except socket.error, msg:
128                                self.s.close()
129                                self.s = None
130                                continue
131                        break
132
133                if self.s is None:
134                        print 'Could not open socket'
135                        sys.exit(1)
136
137                return self.s.makefile( 'r' )
138
139class GangliaXMLProcessor:
140
141        def daemon( self ):
142                "Run as daemon forever"
143
144                self.DAEMON = 1
145
146                # Fork the first child
147                #
148                pid = os.fork()
149                if pid > 0:
150                        sys.exit(0)  # end parrent
151
152                # creates a session and sets the process group ID
153                #
154                os.setsid()
155
156                # Fork the second child
157                #
158                pid = os.fork()
159                if pid > 0:
160                        sys.exit(0)  # end parrent
161
162                # Go to the root directory and set the umask
163                #
164                os.chdir('/')
165                os.umask(0)
166
167                sys.stdin.close()
168                sys.stdout.close()
169                sys.stderr.close()
170
171                os.open('/dev/null', 0)
172                os.dup(0)
173                os.dup(0)
174
175                self.run()
176
177        def run( self ):
178                "Main thread"
179
180                while ( 1 ):
181                        self.processXML()
182                        time.sleep( 5 )
183
184        def processXML( self ):
185                "Process XML"
186
187                myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 
188
189                myParser = make_parser()   
190                myHandler = GangliaXMLHandler()
191                myParser.setContentHandler( myHandler )
192
193                myParser.parse( myXMLGatherer.getFileObject() )
194
195class GangliaConfigParser:
196
197        sources = [ ]
198
199        def __init__( self, config ):
200                self.config = config
201                self.parseValues()
202
203        def parseValues(self):
204                "Parse certain values from gmetad.conf"
205
206                readcfg = open( self.config, 'r' )
207
208                for line in readcfg.readlines():
209
210                        if line.count( '"' ) > 1:
211
212                                source = { }
213                                source['name'] = line.split( '"' )[1]
214
215                                if line.find( 'data_source' ) != -1 and line[0] != '#':
216
217                                        source_words = line.split( '"' )[2].split( ' ' )
218
219                                        for word in source_words:
220
221                                                valid_interval = 1
222
223                                                for letter in word:
224                                                        if letter not in string.digits:
225                                                                valid_interval = 0
226
227                                                if valid_interval and len(word) > 0:
228                                                        source['interval'] = word
229                                                        if (DEBUG_LEVEL>8): print 'polling interval for %s = %s' %(source['name'], source['interval'] )
230               
231                # No interval found, use Ganglia's default     
232                if not source.has_key( 'interval' ):
233                        source['interval'] = 15
234
235                self.sources.append( source )
236
237        def getInterval( self, source_name ):
238                for source in self.sources:
239                        if source['name'] == name:
240                                return source['interval']
241                return None
242
243class RRDHandler:
244
245        def __init__( self, cluster ):
246                self.cluster = cluster
247
248                self.gmetad_conf = GangliaConfigParser( GMETAD_CONF )
249
250        def createCheck( self, host, metric ):
251                "Check if an .rrd allready exists for this metric, create if not"
252
253                rrd_dir = '%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host )
254
255                if not os.path.exists( rrd_dir ):
256                        os.makedirs( rrd_dir )
257
258                rrd_file = '%s/%s.rrd' %( rrd_dir, metric )
259
260                interval = self.gmetad_conf.getInterval( self.cluster )
261
262        def update( self, metric, timestamp, val ):
263
264                pass
265
266                #rrd.update( bla )
267               
268
269def main():
270        "Program startup"
271
272        myProcessor = GangliaXMLProcessor()
273        myProcessor.processXML()
274
275def check_dir( directory ):
276        "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
277
278        if directory[-1] == '/':
279                directory = directory[:-1]
280
281        return directory
282
283# Let's go
284if __name__ == '__main__':
285        main()
Note: See TracBrowser for help on using the repository browser.