- Timestamp:
- 04/04/05 16:05:26 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r35 r36 11 11 import time 12 12 import re 13 import threading 14 import mutex 13 15 14 16 # Specify debugging level here; … … 181 183 "Connect, and return a file object" 182 184 183 if not self.s:184 185 self.disconnect() 186 self.connect() 185 187 186 188 return self.s.makefile( 'r' ) … … 230 232 sys.stdin.close() 231 233 sys.stdout.close() 232 sys.stderr.close()234 #sys.stderr.close() 233 235 234 236 os.open('/dev/null', 0) … … 279 281 "Main thread" 280 282 281 # Daemonized not working yet 282 if DAEMONIZE: 283 pid = os.fork() 284 285 # Handle XML grabbing in Child 286 if pid == 0: 287 288 while( 1 ): 289 self.grabXML() 290 291 # Do scheduled RRD storing in Parent 292 #elif pid > ): 293 294 else: 295 #self.grabXML() 296 self.processXML() 297 self.storeMetrics() 283 #self.processXML() 284 #self.storeMetrics() 285 286 #time.sleep( 20 ) 287 288 #self.processXML() 289 #self.storeMetrics() 290 291 #sys.exit(1) 292 293 xmlthread = threading.Thread( None, self.processXML, 'xmlthread' ) 294 storethread = threading.Thread( None, self.storeMetrics, 'storethread' ) 295 296 while( 1 ): 297 298 if not xmlthread.isAlive(): 299 # Gather XML at the same interval as gmetad 300 301 # threaded call to: self.processXML() 302 # 303 xmlthread = threading.Thread( None, self.processXML, 'xmlthread' ) 304 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' ) 305 xmlthread.start() 306 307 if not storethread.isAlive(): 308 # Store metrics every .. sec 309 310 # threaded call to: self.storeMetrics() 311 # 312 storethread = threading.Thread( None, self.storeMetrics, 'storethread' ) 313 debug_msg( 7, self.printTime() + ' - mainthread() - storethread() started' ) 314 storethread.start() 315 316 # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway 317 time.sleep( 1 ) 298 318 299 319 def storeMetrics( self ): 300 320 "Store metrics retained in memory to disk" 301 321 302 self.myHandler.storeMetrics() 322 STORE_INTERVAL = 30 323 324 debug_msg( 7, self.printTime() + ' - storethread() - Storing data..' ) 325 326 # threaded call to: self.myHandler.storeMetrics() 327 # 328 storethread = threading.Thread( None, self.myHandler.storeMetrics(), 'storemetricthread' ) 329 storethread.start() 330 331 debug_msg( 7, self.printTime() + ' - storethread() - Sleeping.. (%ss)' %STORE_INTERVAL ) 332 time.sleep( STORE_INTERVAL ) 333 debug_msg( 7, self.printTime() + ' - storethread() - Done sleeping.' ) 334 335 if storethread.isAlive(): 336 337 debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() still running, waiting to finish..' ) 338 parsethread.join( 180 ) # Maximum time is 3 minutes for storing thread to finish - more then enough 339 debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() finished.' ) 340 341 debug_msg( 7, self.printTime() + ' - storethread() finished' ) 342 343 return 0 303 344 304 345 def processXML( self ): 305 346 "Process XML" 306 347 307 self.myParser.parse( self.myXMLGatherer.getFileObject() ) 348 debug_msg( 7, self.printTime() + ' - xmlthread() - Parsing..' ) 349 350 # threaded call to: self.myParser.parse( self.myXMLGatherer.getFileObject() ) 351 # 352 parsethread = threading.Thread( None, self.myParser.parse, 'parsethread', [ self.myXMLGatherer.getFileObject() ] ) 353 parsethread.start() 354 355 debug_msg( 7, self.printTime() + ' - xmlthread() - Sleeping.. (%ss)' %self.config.getLowestInterval() ) 356 time.sleep( float( self.config.getLowestInterval() ) ) 357 debug_msg( 7, self.printTime() + ' - xmlthread() - Done sleeping.' ) 358 359 if parsethread.isAlive(): 360 361 debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() still running, waiting to finish..' ) 362 parsethread.join( 180 ) # Maximum time is 3 minutes for XML thread to finish - more then enough 363 debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() finished.' ) 364 365 debug_msg( 7, self.printTime() + ' - xmlthread() finished.' ) 366 367 return 0 308 368 309 369 class GangliaConfigParser: … … 382 442 383 443 myMetrics = { } 444 slot = None 384 445 385 446 def __init__( self, config, cluster ): 386 447 self.cluster = cluster 387 448 self.config = config 449 self.slot = mutex.mutex() 388 450 389 451 def getClusterName( self ): … … 408 470 self.myMetrics[ host ][ metric['name'] ] = [ ] 409 471 410 472 self.slot.testandset() 411 473 self.myMetrics[ host ][ metric['name'] ].append( metric ) 474 self.slot.unlock() 412 475 413 476 def makeUpdateString( self, host, metricname ): … … 429 492 mytime = self.makeTimeSerial() 430 493 self.createCheck( hostname, metricname, mytime ) 431 update_okay = self.update( hostname, metricname, mytime ) 432 433 if not update_okay: 434 494 update_ret = self.update( hostname, metricname, mytime ) 495 496 if update_ret == 0: 497 498 self.slot.testandset() 435 499 del self.myMetrics[ hostname ][ metricname ] 500 self.slot.unlock() 436 501 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) ) 437 502 else: 438 503 debug_msg( 9, 'metric update failed' ) 439 504 440 sys.exit(1)505 return 1 441 506 442 507 def makeTimeSerial( self ): … … 590 655 debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), update_string ) ) 591 656 657 return 0 658 592 659 def main(): 593 660 "Program startup"
Note: See TracChangeset
for help on using the changeset viewer.