- Timestamp:
- 04/05/05 12:01:16 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r39 r40 13 13 import mutex 14 14 import random 15 from types import * 15 16 16 17 # Specify debugging level here; … … 19 20 # 10 = XML: host, cluster, grid, ganglia 20 21 # 9 = RRD activity, gmetad config parsing 21 # 8 = daemon threading 22 # 8 = RRD file activity 23 # 7 = daemon threading 22 24 # 23 DEBUG_LEVEL = 825 DEBUG_LEVEL = 9 24 26 25 27 # Where is the gmetad.conf located … … 68 70 69 71 def create( self, filename, args ): 70 return self.perform( 'create' + '"' + filename + '"', args )72 return self.perform( 'create', '"' + filename + '"', args ) 71 73 72 74 def update( self, filename, args ): 73 return self.perform( 'update' + '"' + filename + '"', args )74 75 def perform( self, action, args ):75 return self.perform( 'update', '"' + filename + '"', args ) 76 77 def perform( self, action, filename, args ): 76 78 77 79 arg_string = None 80 81 if type( args ) is not ListType: 82 debug_msg( 8, 'Arguments needs to be of type List' ) 83 return 1 78 84 79 85 for arg in args: … … 85 91 arg_string = arg_string + ' ' + arg 86 92 87 debug_msg( 9, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + arg_string ) 88 89 for line in os.popen( self.binary + ' ' + action + ' ' + arg_string ).readlines(): 93 debug_msg( 8, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 94 95 for line in os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ).readlines(): 96 97 print line 90 98 91 99 if line.find( 'ERROR' ) != -1: 92 100 93 101 error_msg = string.join( line.split( ' ' )[1:] ) 94 debug_msg( 9, error_msg )102 debug_msg( 8, error_msg ) 95 103 return 1 96 104 … … 323 331 "Store metrics retained in memory to disk" 324 332 325 debug_msg( 8, self.printTime() + ' - storethread(): started.' )333 debug_msg( 7, self.printTime() + ' - storethread(): started.' ) 326 334 327 335 # Store metrics somewhere between every 60 and 180 seconds 328 336 # 329 STORE_INTERVAL = random.randint( 60, 180 ) 337 #STORE_INTERVAL = random.randint( 60, 180 ) 338 STORE_INTERVAL = 40 330 339 331 340 storethread = threading.Thread( None, self.storeThread, 'storemetricthread' ) 332 341 storethread.start() 333 342 334 debug_msg( 8, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )343 debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 335 344 time.sleep( STORE_INTERVAL ) 336 debug_msg( 8, self.printTime() + ' - storethread(): Done sleeping.' )345 debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' ) 337 346 338 347 if storethread.isAlive(): 339 348 340 debug_msg( 8, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )341 parsethread.join( 180 ) # Maximum time is for storing thread to finish342 debug_msg( 8, self.printTime() + ' - storethread(): Done waiting.' )343 344 debug_msg( 8, self.printTime() + ' - storethread(): finished.' )349 debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' ) 350 storethread.join( 180 ) # Maximum time is for storing thread to finish 351 debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' ) 352 353 debug_msg( 7, self.printTime() + ' - storethread(): finished.' ) 345 354 346 355 return 0 … … 348 357 def storeThread( self ): 349 358 350 debug_msg( 8, self.printTime() + ' - storemetricthread(): started.' )351 debug_msg( 8, self.printTime() + ' - storemetricthread(): Storing data..' )359 debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' ) 360 debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' ) 352 361 ret = self.myHandler.storeMetrics() 353 debug_msg( 8, self.printTime() + ' - storemetricthread(): Done storing.' )354 debug_msg( 8, self.printTime() + ' - storemetricthread(): finished.' )362 debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' ) 363 debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' ) 355 364 356 365 return ret … … 359 368 "Process XML" 360 369 361 debug_msg( 8, self.printTime() + ' - xmlthread(): started.' )370 debug_msg( 7, self.printTime() + ' - xmlthread(): started.' ) 362 371 363 372 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) 364 373 parsethread.start() 365 374 366 debug_msg( 8, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )375 debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 367 376 time.sleep( float( self.config.getLowestInterval() ) ) 368 debug_msg( 8, self.printTime() + ' - xmlthread(): Done sleeping.' )377 debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' ) 369 378 370 379 if parsethread.isAlive(): 371 380 372 debug_msg( 8, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )381 debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' ) 373 382 parsethread.join( 60 ) # Maximum time for XML thread to finish 374 debug_msg( 8, self.printTime() + ' - xmlthread(): Done waiting.' )375 376 debug_msg( 8, self.printTime() + ' - xmlthread(): finished.' )383 debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' ) 384 385 debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' ) 377 386 378 387 return 0 … … 380 389 def parseThread( self ): 381 390 382 debug_msg( 8, self.printTime() + ' - parsethread(): started.' )383 debug_msg( 8, self.printTime() + ' - parsethread(): Parsing XML..' )391 debug_msg( 7, self.printTime() + ' - parsethread(): started.' ) 392 debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' ) 384 393 ret = self.myParser.parse( self.myXMLGatherer.getFileObject() ) 385 debug_msg( 8, self.printTime() + ' - parsethread(): Done parsing.' )386 debug_msg( 8, self.printTime() + ' - parsethread(): finished.' )394 debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' ) 395 debug_msg( 7, self.printTime() + ' - parsethread(): finished.' ) 387 396 388 397 return ret … … 463 472 464 473 myMetrics = { } 474 lastStored = { } 465 475 slot = None 466 476 … … 468 478 self.cluster = cluster 469 479 self.config = config 470 self.slot = mutex.mutex()480 self.slot = threading.Lock() 471 481 self.rrdm = RRDMutator() 472 482 … … 492 502 self.myMetrics[ host ][ metric['name'] ] = [ ] 493 503 494 self.slot.testandset() 504 # Ah, psst, push it 505 # 506 # <atomic> 507 self.slot.acquire() 508 495 509 self.myMetrics[ host ][ metric['name'] ].append( metric ) 496 self.slot.unlock() 510 511 self.slot.release() 512 # </atomic> 497 513 498 514 def makeUpdateList( self, host, metricname ): … … 500 516 update_list = [ ] 501 517 502 for metric in self.myMetrics[ host ][ metricname ]: 503 504 update_list.append( '%s:%s' %( metric['time'], metric['val'] ) ) 518 while len( self.myMetrics[ host ][ metricname ] ) > 0: 519 520 # Kabouter pop 521 # 522 # <atomic> 523 self.slot.acquire() 524 525 metric = self.myMetrics[ host ][ metricname ].pop() 526 527 self.slot.release() 528 # </atomic> 529 530 if self.checkStoreMetric( host, metricname, metric ): 531 update_list.append( '%s:%s' %( metric['time'], metric['val'] ) ) 532 else: 533 print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] ) 505 534 506 535 return update_list 507 536 537 def checkStoreMetric( self, host, metricname, metric ): 538 539 if self.lastStored.has_key( host ): 540 541 if self.lastStored[ host ].has_key( metricname ): 542 543 if self.lastStored[ host ][ metricname ] <= metric['time']: 544 545 # Allready wrote a value with this timestamp, skip tnx 546 return 0 547 548 else: 549 self.lastStored[ host ] = { } 550 551 self.lastStored[ host ][ metricname ] = metric['time'] 552 553 return 1 554 508 555 def storeMetrics( self ): 509 556 … … 513 560 514 561 mytime = self.makeTimeSerial() 515 self.createCheck( hostname, metricname, mytime ) 516 update_ret = self.update( hostname, metricname, mytime ) 562 correct_serial = self.checkNewRrdPeriod( hostname, mytime ) 563 self.createCheck( hostname, metricname, correct_serial ) 564 update_ret = self.update( hostname, metricname, correct_serial ) 517 565 518 566 if update_ret == 0: 519 567 520 self.slot.testandset()521 del self.myMetrics[ hostname ][ metricname ]522 self.slot.unlock()523 568 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) ) 524 569 else: 525 570 debug_msg( 9, 'metric update failed' ) 526 571 return 1 572 573 return 1 527 574 528 575 def makeTimeSerial( self ):
Note: See TracChangeset
for help on using the changeset viewer.