Changeset 33 for trunk/daemon
- Timestamp:
- 04/04/05 11:11:07 (19 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r32 r33 20 20 # <=7 = daemon threading - NOTE: Daemon will 'halt on all errors' from this level 21 21 # 22 DEBUG_LEVEL = 822 DEBUG_LEVEL = 9 23 23 24 24 # Where is the gmetad.conf located … … 37 37 # 38 38 ARCHIVE_HOURS_PER_RRD = 12 39 40 # Interval at which to grab&store XML41 #42 GRAB_INTERVAL = 1543 39 44 40 # Wether or not to run as a daemon in background … … 63 59 "Parse Ganglia's XML" 64 60 65 def __init__( self ): 66 self.metrics = [ ] 67 self.clusters = { } 61 clusters = { } 62 config = None 63 64 def __init__( self, config ): 65 self.config = config 68 66 69 67 def startElement( self, name, attrs ): … … 89 87 self.time = attrs.get( 'LOCALTIME', "" ) 90 88 91 self.clusters[ self.clusterName ] = RRDHandler( self.clusterName ) 89 if not self.clusters.has_key( self.clusterName ): 90 91 self.clusters[ self.clusterName ] = RRDHandler( self.clusterName ) 92 92 93 93 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) … … 118 118 def storeMetrics( self, hostname, timeserial ): 119 119 120 for metric in self.metrics: 121 if metric['type'] not in UNSUPPORTED_ARCHIVE_TYPES: 122 123 self.rrd.createCheck( hostname, metric, timeserial ) 124 self.rrd.update( hostname, metric, timeserial ) 125 debug_msg( 9, 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] ) ) 126 #sys.exit(1) 120 for cluster in self.clusters: 121 122 cluster.storeMetrics() 127 123 128 124 class GangliaXMLGatherer: … … 136 132 self.host = host 137 133 self.port = port 138 139 def __del__( self ): 140 "Kill the socket before we leave" 141 142 self.s.close() 143 144 def getFileObject( self ): 145 "Connect, and return a file object" 134 self.connect() 135 136 def connect( self ): 137 "Setup connection to XML source" 146 138 147 139 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 172 164 if self.s is None: 173 165 174 print 'Could not open socket' 175 sys.exit(1) 166 debug_msg( 0, 'Could not open socket' ) 167 sys.exit( 1 ) 168 169 def disconnect( self ): 170 "Close socket" 171 172 if self.s: 173 self.s.close() 174 self.s = None 175 176 def __del__( self ): 177 "Kill the socket before we leave" 178 179 self.disconnect() 180 181 def getFileObject( self ): 182 "Connect, and return a file object" 183 184 if not self.s: 185 self.connect() 176 186 177 187 return self.s.makefile( 'r' ) 178 188 179 189 class GangliaXMLProcessor: 190 191 def __init__( self ): 192 "Setup initial XML connection and handlers" 193 194 self.config = GangliaConfigParser( GMETAD_CONF ) 195 196 self.myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 197 self.myParser = make_parser() 198 self.myHandler = GangliaXMLHandler( self.config ) 199 self.myParser.setContentHandler( self.myHandler ) 180 200 181 201 def daemon( self ): … … 220 240 221 241 def printTime( self ): 222 223 return time.strftime("%a, %d %b %Y %H:%M:%S") 242 "Print current time in human readable format" 243 244 return time.strftime("%a %d %b %Y %H:%M:%S") 245 246 def grabXML( self ): 247 248 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' ) 249 pid = os.fork() 250 251 if pid == 0: 252 # Child - XML Thread 253 # 254 # Process XML and exit 255 256 debug_msg( 7, self.printTime() + ' - xmlthread() - Start XML processing..' ) 257 self.processXML() 258 debug_msg( 7, self.printTime() + ' - xmlthread() - Done processing; exiting.' ) 259 sys.exit( 0 ) 260 261 elif pid > 0: 262 # Parent - Time/sleep Thread 263 # 264 # Make sure XML is processed on time and at regular intervals 265 266 debug_msg( 7, self.printTime() + ' - mainthread() - Sleep '+ str( self.config.getInterval() ) +'s: zzzzz..' ) 267 time.sleep( self.config.getInterval() ) 268 debug_msg( 7, self.printTime() + ' - mainthread() - Awoken: waiting for XML thread..' ) 269 270 r = os.wait() 271 ret = r[1] 272 if ret != 0: 273 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: ERROR! xmlthread() exited with status %d' %(ret) ) 274 if DEBUG_LEVEL>=7: sys.exit( 1 ) 275 else: 276 277 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: xmlthread() finished succesfully' ) 224 278 225 279 def run( self ): 226 280 "Main thread" 227 281 228 while( 1 ): 229 230 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' ) 282 # Daemonized not working yet 283 if DAEMONIZE: 231 284 pid = os.fork() 232 285 286 # Handle XML grabbing in Child 233 287 if pid == 0: 234 # Child - XML Thread 235 # 236 # Process XML and exit 237 238 debug_msg( 7, self.printTime() + ' - xmlthread() - Start XML processing..' ) 239 self.processXML() 240 debug_msg( 7, self.printTime() + ' - xmlthread() - Done processing; exiting.' ) 241 sys.exit( 0 ) 242 243 elif pid > 0: 244 # Parent - Daemon Thread 245 246 debug_msg( 7, self.printTime() + ' - mainthread() - Sleep '+ str(GRAB_INTERVAL) +'s: zzzzz..' ) 247 time.sleep( GRAB_INTERVAL ) 248 debug_msg( 7, self.printTime() + ' - mainthread() - Awoken: waiting for XML thread..' ) 249 250 r = os.wait() 251 ret = r[1] 252 if ret != 0: 253 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: ERROR! xmlthread() exited with status %d' %(ret) ) 254 if DEBUG_LEVEL>=7: sys.exit( 1 ) 255 else: 256 257 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: xmlthread() finished succesfully' ) 288 289 while( 1 ): 290 self.grabXML() 291 292 # Do scheduled RRD storing in Parent 293 #elif pid > ): 294 295 else: 296 self.grabXML() 297 self.storeMetrics() 298 299 def storeMetrics( self ): 300 "Store metrics retained in memory to disk" 301 302 self.myHandler.storeMetrics() 258 303 259 304 def processXML( self ): 260 305 "Process XML" 261 306 262 myXMLGatherer = GangliaXMLGatherer( 'localhost', 8651 ) 263 264 myParser = make_parser() 265 myHandler = GangliaXMLHandler() 266 myParser.setContentHandler( myHandler ) 267 268 myParser.parse( myXMLGatherer.getFileObject() ) 307 self.myParser.parse( self.myXMLGatherer.getFileObject() ) 269 308 270 309 class GangliaConfigParser: 271 310 272 sources = [ ]311 sources = { } 273 312 274 313 def __init__( self, config ): … … 306 345 source['interval'] = word 307 346 debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) ) 308 309 # No interval found, use Ganglia's default 310 if not source.has_key( 'interval' ): 311 312 source['interval'] = 15 313 debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) ) 314 315 self.sources.append( source ) 347 348 # No interval found, use Ganglia's default 349 if not source.has_key( 'interval' ): 350 source['interval'] = 15 351 debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) ) 352 353 self.sources.append( source ) 316 354 317 355 def getInterval( self, source_name ): … … 329 367 myMetrics = { } 330 368 331 def __init__( self, c luster ):369 def __init__( self, config, cluster ): 332 370 self.cluster = cluster 333 self. gmetad_conf = GangliaConfigParser( GMETAD_CONF )371 self.config = config 334 372 335 373 def getClusterName( self ): … … 365 403 return update_string 366 404 405 def storeMetrics( self ): 406 407 for hostname, mymetrics in self.myMetrics.items(): 408 409 for metricname, mymetric in mymetrics.items(): 410 411 self.rrd.createCheck( hostname, metricname, timeserial ) 412 update_okay = self.rrd.update( hostname, metricname, timeserial ) 413 414 if not update_okay: 415 416 del self.myMetrics[ hostname ][ metricname ] 417 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) ) 418 else: 419 debug_msg( 9, 'metric update failed' ) 420 421 sys.exit(1) 422 367 423 def makeTimeSerial( self ): 368 424 "Generate a time serial. Seconds since epoch" 369 425 370 # YYYYMMDDhhmmss: 20050321143411371 #mytime = time.strftime( "%Y%m%d%H%M%S" )372 373 426 # Seconds since epoch 374 427 mytime = int( time.time() ) … … 376 429 return mytime 377 430 378 def makeRrdPath( self, host, metric =None, timeserial=None ):431 def makeRrdPath( self, host, metricname=None, timeserial=None ): 379 432 """ 380 433 Make a RRD location/path and filename … … 388 441 rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial ) 389 442 if metric: 390 rrd_file = '%s/%s.rrd' %( rrd_dir, metric ['name'])443 rrd_file = '%s/%s.rrd' %( rrd_dir, metricname ) 391 444 else: 392 445 rrd_file = None … … 450 503 return serial 451 504 452 def createCheck( self, host, metric, timeserial ): 505 def getFirstTime( self, host, metricname ): 506 "Get the first time of a metric we know of" 507 508 first_time = 0 509 510 for metric in self.myMetrics[ host ][ metricname ]: 511 512 if not first_time or metric['time'] <= first_time: 513 514 first_time = metric['time'] 515 516 def createCheck( self, host, metricname, timeserial ): 453 517 "Check if an .rrd allready exists for this metric, create if not" 454 518 455 519 debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metric['name'] ) ) 456 520 457 rrd_dir, rrd_file = self.makeRrdPath( host, metric , timeserial )521 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 458 522 459 523 if not os.path.exists( rrd_dir ): … … 463 527 if not os.path.exists( rrd_file ): 464 528 465 interval = self. gmetad_conf.getInterval( self.cluster )529 interval = self.config.getInterval( self.cluster ) 466 530 heartbeat = 8 * int(interval) 467 531 … … 470 534 471 535 param_start1 = '--start' 472 param_start2 = str( int( metric['time']) - 1 )536 param_start2 = str( int( self.getFirstTime( host, metricname ) ) - 1 ) 473 537 474 538 param_ds = 'DS:sum:GAUGE:%d:U:U' %heartbeat … … 479 543 debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) ) 480 544 481 def update( self, host, metric , timeserial ):545 def update( self, host, metricname, timeserial ): 482 546 483 547 debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metric['name'] ) ) 484 548 485 rrd_dir, rrd_file = self.makeRrdPath( host, metric, timeserial ) 486 487 timestamp = metric['time'] 488 val = metric['val'] 489 490 update_string = '%s:%s' %(timestamp, val) 549 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 550 551 #timestamp = metric['time'] 552 #val = metric['val'] 553 554 #update_string = '%s:%s' %(timestamp, val) 555 update_string = self.makeUpdateString( host, metricname ) 491 556 492 557 try: … … 500 565 debug_msg( 0, str(detail) ) 501 566 502 sys.exit( 1 )567 return 1 503 568 504 569 debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), update_string ) )
Note: See TracChangeset
for help on using the changeset viewer.