1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | from xml.sax import make_parser |
---|
4 | from xml.sax.handler import ContentHandler |
---|
5 | import socket |
---|
6 | import sys |
---|
7 | |
---|
8 | # Specify debugging level here; |
---|
9 | # |
---|
10 | # >10 = metric XML |
---|
11 | # >9 = host,cluster,grid,ganglia XML |
---|
12 | # |
---|
13 | DEBUG_LEVEL = 9 |
---|
14 | |
---|
15 | """ |
---|
16 | This is TOrque-GAnglia's data Daemon |
---|
17 | """ |
---|
18 | |
---|
19 | class GangliaXMLHandler( ContentHandler ): |
---|
20 | "Parse Ganglia's XML" |
---|
21 | |
---|
22 | metrics = [ ] |
---|
23 | |
---|
24 | def startElement( self, name, attrs ): |
---|
25 | "Store appropriate data from xml start tags" |
---|
26 | |
---|
27 | if name == 'GANGLIA_XML': |
---|
28 | self.XMLSource = attrs.get('SOURCE',"") |
---|
29 | self.gangliaVersion = attrs.get('VERSION',"") |
---|
30 | if (DEBUG_LEVEL>9): print 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) |
---|
31 | |
---|
32 | elif name == 'GRID': |
---|
33 | self.gridName = attrs.get('NAME',"") |
---|
34 | if (DEBUG_LEVEL>9): print '`-Grid found: %s' %( self.gridName ) |
---|
35 | |
---|
36 | elif name == 'CLUSTER': |
---|
37 | self.clusterName = attrs.get('NAME',"") |
---|
38 | if (DEBUG_LEVEL>9): print ' |-Cluster found: %s' %( self.clusterName ) |
---|
39 | |
---|
40 | elif name == 'HOST': |
---|
41 | self.hostName = attrs.get('NAME',"") |
---|
42 | self.hostIp = attrs.get('IP',"") |
---|
43 | self.hostReported = attrs.get('REPORTED',"") |
---|
44 | if (DEBUG_LEVEL>9): print ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) |
---|
45 | |
---|
46 | elif name == 'METRIC': |
---|
47 | myMetric = { } |
---|
48 | myMetric['name'] = attrs.get('NAME',"") |
---|
49 | myMetric['val'] = attrs.get('VAL',"") |
---|
50 | |
---|
51 | self.metrics.append( myMetric ) |
---|
52 | if (DEBUG_LEVEL>10): print ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) |
---|
53 | |
---|
54 | return |
---|
55 | |
---|
56 | #def endElement( self, name ): |
---|
57 | #if name == 'ganglia_xml': |
---|
58 | |
---|
59 | #if name == 'grid': |
---|
60 | |
---|
61 | #if name == 'cluster': |
---|
62 | |
---|
63 | #if name == 'host': |
---|
64 | |
---|
65 | #if name == 'metric': |
---|
66 | |
---|
67 | class GangliaXMLGatherer: |
---|
68 | "Setup a connection and file object to Ganglia's XML" |
---|
69 | |
---|
70 | s = None |
---|
71 | |
---|
72 | def __init__( self, host, port ): |
---|
73 | "Store host and port for connection" |
---|
74 | |
---|
75 | self.host = host |
---|
76 | self.port = port |
---|
77 | |
---|
78 | def __del__( self ): |
---|
79 | "Kill the socket before we leave" |
---|
80 | |
---|
81 | self.s.close() |
---|
82 | |
---|
83 | def getFileObject( self ): |
---|
84 | "Connect, and return a file object" |
---|
85 | |
---|
86 | for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): |
---|
87 | af, socktype, proto, canonname, sa = res |
---|
88 | try: |
---|
89 | self.s = socket.socket( af, socktype, proto ) |
---|
90 | except socket.error, msg: |
---|
91 | self.s = None |
---|
92 | continue |
---|
93 | try: |
---|
94 | self.s.connect( sa ) |
---|
95 | except socket.error, msg: |
---|
96 | self.s.close() |
---|
97 | self.s = None |
---|
98 | continue |
---|
99 | break |
---|
100 | |
---|
101 | if self.s is None: |
---|
102 | print 'Could not open socket' |
---|
103 | sys.exit(1) |
---|
104 | |
---|
105 | return self.s.makefile( 'r' ) |
---|
106 | |
---|
107 | class GangliaXMLProcessor: |
---|
108 | |
---|
109 | def daemon(self): |
---|
110 | "Run as daemon forever" |
---|
111 | |
---|
112 | self.DAEMON = 1 |
---|
113 | |
---|
114 | # Fork the first child |
---|
115 | # |
---|
116 | pid = os.fork() |
---|
117 | if pid > 0: |
---|
118 | sys.exit(0) # end parrent |
---|
119 | |
---|
120 | # creates a session and sets the process group ID |
---|
121 | # |
---|
122 | os.setsid() |
---|
123 | |
---|
124 | # Fork the second child |
---|
125 | # |
---|
126 | pid = os.fork() |
---|
127 | if pid > 0: |
---|
128 | sys.exit(0) # end parrent |
---|
129 | |
---|
130 | # Go to the root directory and set the umask |
---|
131 | # |
---|
132 | os.chdir('/') |
---|
133 | os.umask(0) |
---|
134 | |
---|
135 | sys.stdin.close() |
---|
136 | sys.stdout.close() |
---|
137 | sys.stderr.close() |
---|
138 | |
---|
139 | os.open('/dev/null', 0) |
---|
140 | os.dup(0) |
---|
141 | os.dup(0) |
---|
142 | |
---|
143 | self.run() |
---|
144 | |
---|
145 | def run(self): |
---|
146 | "Main thread" |
---|
147 | |
---|
148 | while ( 1 ): |
---|
149 | self.processXML() |
---|
150 | time.sleep( 5 ) |
---|
151 | |
---|
152 | def processXML( self ): |
---|
153 | "Process XML" |
---|
154 | |
---|
155 | myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) |
---|
156 | |
---|
157 | myParser = make_parser() |
---|
158 | myHandler = GangliaXMLHandler() |
---|
159 | myParser.setContentHandler( myHandler ) |
---|
160 | |
---|
161 | myParser.parse( myXMLGatherer.getFileObject() ) |
---|
162 | |
---|
163 | def main(): |
---|
164 | "Program startup" |
---|
165 | |
---|
166 | myProcessor = GangliaXMLProcessor() |
---|
167 | myProcessor.processXML() |
---|
168 | |
---|
169 | # Let's go |
---|
170 | main() |
---|