- Timestamp:
- 04/05/05 09:42:57 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r37 r38 12 12 import threading 13 13 import mutex 14 import random 14 15 15 16 # Specify debugging level here; 16 17 # 17 # <=11 = metric XML 18 # <=10 = host,cluster,grid,ganglia XML 19 # <=9 = RRD activity,gmetad config parsing 20 # <=8 = host processing 21 # <=7 = daemon threading - NOTE: Daemon will 'halt on all errors' from this level 22 # 23 DEBUG_LEVEL = 10 18 # 11 = XML: metrics 19 # 10 = XML: host, cluster, grid, ganglia 20 # 9 = RRD activity, gmetad config parsing 21 # 8 = daemon threading 22 # 23 DEBUG_LEVEL = 8 24 24 25 25 # Where is the gmetad.conf located … … 37 37 # Amount of hours to store in one single archived .rrd 38 38 # 39 ARCHIVE_HOURS_PER_RRD = 1 239 ARCHIVE_HOURS_PER_RRD = 1 40 40 41 41 # Wether or not to run as a daemon in background … … 58 58 59 59 class RRDMutator: 60 "A class for handling .rrd mutations" 60 61 61 62 binary = '/usr/bin/rrdtool' … … 84 85 arg_string = arg_string + ' ' + arg 85 86 86 debug_msg( 7, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + arg_string )87 debug_msg( 9, 'rrdm.perform(): ' + self.binary + ' ' + action + ' ' + arg_string ) 87 88 88 89 for line in os.popen( self.binary + ' ' + action + ' ' + arg_string ).readlines(): … … 91 92 92 93 error_msg = string.join( line.split( ' ' )[1:] ) 93 debu _msg( 7, error_msg )94 debug_msg( 9, error_msg ) 94 95 return 1 95 96 … … 158 159 for clustername, rrdh in self.clusters.items(): 159 160 160 print 'storing metrics of cluster ' + clustername 161 rrdh.storeMetrics() 161 ret = rrdh.storeMetrics() 162 163 if ret: 164 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 165 return 1 166 167 return 0 162 168 163 169 class GangliaXMLGatherer: … … 221 227 "Connect, and return a file object" 222 228 223 self.disconnect() 224 self.connect() 229 if self.s: 230 # Apearantly, only data is received when a connection is made 231 # therefor, disconnect and connect 232 # 233 self.disconnect() 234 self.connect() 225 235 226 236 return self.s.makefile( 'r' ) … … 283 293 return time.strftime("%a %d %b %Y %H:%M:%S") 284 294 285 def grabXML( self ):286 287 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' )288 pid = os.fork()289 290 if pid == 0:291 # Child - XML Thread292 #293 # Process XML and exit294 295 debug_msg( 7, self.printTime() + ' - xmlthread() - Start XML processing..' )296 self.processXML()297 debug_msg( 7, self.printTime() + ' - xmlthread() - Done processing; exiting.' )298 sys.exit( 0 )299 300 elif pid > 0:301 # Parent - Time/sleep Thread302 #303 # Make sure XML is processed on time and at regular intervals304 305 debug_msg( 7, self.printTime() + ' - mainthread() - Sleep '+ str( self.config.getLowestInterval() ) +'s: zzzzz..' )306 time.sleep( float( self.config.getLowestInterval() ) )307 debug_msg( 7, self.printTime() + ' - mainthread() - Awoken: waiting for XML thread..' )308 309 r = os.wait()310 ret = r[1]311 if ret != 0:312 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: ERROR! xmlthread() exited with status %d' %(ret) )313 if DEBUG_LEVEL>=7: sys.exit( 1 )314 else:315 316 debug_msg( 7, self.printTime() + ' - mainthread() - Done waiting: xmlthread() finished succesfully' )317 318 295 def run( self ): 319 296 "Main thread" 320 321 #self.processXML()322 #self.storeMetrics()323 324 #time.sleep( 20 )325 326 #self.processXML()327 #self.storeMetrics()328 329 #sys.exit(1)330 297 331 298 xmlthread = threading.Thread( None, self.processXML, 'xmlthread' ) … … 340 307 # 341 308 xmlthread = threading.Thread( None, self.processXML, 'xmlthread' ) 342 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' )309 debug_msg( 8, self.printTime() + ' - mainthread() - xmlthread() started' ) 343 310 xmlthread.start() 344 311 … … 349 316 # 350 317 storethread = threading.Thread( None, self.storeMetrics, 'storethread' ) 351 debug_msg( 7, self.printTime() + ' - mainthread() - storethread() started' )318 debug_msg( 8, self.printTime() + ' - mainthread() - storethread() started' ) 352 319 storethread.start() 353 320 … … 358 325 "Store metrics retained in memory to disk" 359 326 360 STORE_INTERVAL = 5 361 362 debug_msg( 7, self.printTime() + ' - storethread() - Storing data..' ) 327 # Store metrics somewhere between every 60 and 180 seconds 328 # 329 STORE_INTERVAL = random.randint( 60, 180 ) 330 331 debug_msg( 8, self.printTime() + ' - storethread() - storemetricthread() started: Storing data..' ) 363 332 364 333 # threaded call to: self.myHandler.storeMetrics() … … 367 336 storethread.start() 368 337 369 debug_msg( 7, self.printTime() + ' - storethread() - Sleeping.. (%ss)' %STORE_INTERVAL )338 debug_msg( 8, self.printTime() + ' - storethread() - Sleeping.. (%ss)' %STORE_INTERVAL ) 370 339 time.sleep( STORE_INTERVAL ) 371 debug_msg( 7, self.printTime() + ' - storethread() - Done sleeping.' )340 debug_msg( 8, self.printTime() + ' - storethread() - Done sleeping.' ) 372 341 373 342 if storethread.isAlive(): 374 343 375 debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() still running, waiting to finish..' )344 debug_msg( 8, self.printTime() + ' - storethread() - storemetricthread() still running, waiting to finish..' ) 376 345 parsethread.join( 180 ) # Maximum time is 3 minutes for storing thread to finish - more then enough 377 debug_msg( 7, self.printTime() + ' - storethread() - storemetricthread() finished.' ) 378 379 debug_msg( 7, self.printTime() + ' - storethread() finished' ) 346 347 debug_msg( 8, self.printTime() + ' - storethread() - storemetricthread() finished.' ) 348 349 debug_msg( 8, self.printTime() + ' - storethread() finished' ) 380 350 381 351 return 0 … … 384 354 "Process XML" 385 355 386 debug_msg( 7, self.printTime() + ' - xmlthread() -Parsing..' )356 debug_msg( 8, self.printTime() + ' - xmlthread() - parsethread() started: Parsing..' ) 387 357 388 358 # threaded call to: self.myParser.parse( self.myXMLGatherer.getFileObject() ) … … 391 361 parsethread.start() 392 362 393 debug_msg( 7, self.printTime() + ' - xmlthread() - Sleeping.. (%ss)' %self.config.getLowestInterval() )363 debug_msg( 8, self.printTime() + ' - xmlthread() - Sleeping.. (%ss)' %self.config.getLowestInterval() ) 394 364 time.sleep( float( self.config.getLowestInterval() ) ) 395 debug_msg( 7, self.printTime() + ' - xmlthread() - Done sleeping.' )365 debug_msg( 8, self.printTime() + ' - xmlthread() - Done sleeping.' ) 396 366 397 367 if parsethread.isAlive(): 398 368 399 debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() still running, waiting to finish..' )369 debug_msg( 8, self.printTime() + ' - xmlthread() - parsethread() still running, waiting to finish..' ) 400 370 parsethread.join( 180 ) # Maximum time is 3 minutes for XML thread to finish - more then enough 401 debug_msg( 7, self.printTime() + ' - xmlthread() - parsethread() finished.' ) 402 403 debug_msg( 7, self.printTime() + ' - xmlthread() finished.' ) 371 372 debug_msg( 8, self.printTime() + ' - xmlthread() - parsethread() finished.' ) 373 374 debug_msg( 8, self.printTime() + ' - xmlthread() finished.' ) 404 375 405 376 return 0 … … 513 484 self.slot.unlock() 514 485 515 def makeUpdateString( self, host, metricname ):516 517 update_string = None518 519 print self.myMetrics[ host ][ metricname ]520 521 for m in self.myMetrics[ host ][ metricname ]:522 523 print m524 525 if not update_string:526 update_string = '%s:%s' %( m['time'], m['val'] )527 else:528 update_string = update_string + ' %s:%s' %( m['time'], m['val'] )529 530 return update_string531 532 486 def makeUpdateList( self, host, metricname ): 533 487 … … 558 512 else: 559 513 debug_msg( 9, 'metric update failed' ) 560 561 return 1 514 return 1 562 515 563 516 def makeTimeSerial( self ): … … 693 646 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 694 647 695 #timestamp = metric['time']696 #val = metric['val']697 698 #update_string = '%s:%s' %(timestamp, val)699 648 update_list = self.makeUpdateList( host, metricname ) 700 649 701 #try: 702 703 self.rrdm.update( str(rrd_file), update_list ) 704 705 #except rrdtool.error, detail: 706 707 # debug_msg( 0, 'EXCEPTION! While trying to update rrd:' ) 708 # debug_msg( 0, '\trrd %s with %s' %( str(rrd_file), update_string ) ) 709 # debug_msg( 0, str(detail) ) 710 711 # return 1 650 ret = self.rrdm.update( str(rrd_file), update_list ) 651 652 if ret: 653 return 1 712 654 713 655 debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
Note: See TracChangeset
for help on using the changeset viewer.