- Timestamp:
- 04/15/05 15:39:03 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r77 r78 154 154 return 0 155 155 156 class XMLProcessor: 157 """Skeleton class for XML processor's""" 158 159 def daemon( self ): 160 """Run as daemon forever""" 161 162 # Fork the first child 163 # 164 pid = os.fork() 165 166 if pid > 0: 167 168 sys.exit(0) # end parent 169 170 # creates a session and sets the process group ID 171 # 172 os.setsid() 173 174 # Fork the second child 175 # 176 pid = os.fork() 177 178 if pid > 0: 179 180 sys.exit(0) # end parent 181 182 # Go to the root directory and set the umask 183 # 184 os.chdir('/') 185 os.umask(0) 186 187 sys.stdin.close() 188 sys.stdout.close() 189 #sys.stderr.close() 190 191 os.open('/dev/null', 0) 192 os.dup(0) 193 os.dup(0) 194 195 self.run() 196 197 def run( self ): 198 """Do main processing of XML here""" 199 200 pass 201 202 class TorqueXMLProcessor( XMLProcessor ): 203 """Main class for processing XML and acting with it""" 204 205 def __init__( self ): 206 """Setup initial XML connection and handlers""" 207 208 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 209 self.myXMLSource = self.myXMLGatherer.getFileObject() 210 self.myXMLHandler = TorqueXMLHandler() 211 self.myXMLError = XMLErrorHandler() 212 213 def run( self ): 214 """Main XML processing""" 215 216 while( 1 ): 217 218 print 'parse' 219 self.myXMLSource = self.myXMLGatherer.getFileObject() 220 xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 221 print self.myXMLHandler.jobAttrs 222 print 'sleep' 223 time.sleep( 1 ) 224 156 225 class TorqueXMLHandler( xml.sax.handler.ContentHandler ): 157 226 """Parse Torque's jobinfo XML from our plugin""" … … 363 432 debug_msg( 0, 'Warning ' + str( exception ) ) 364 433 365 class GangliaXMLGatherer:434 class XMLGatherer: 366 435 """Setup a connection and file object to Ganglia's XML""" 367 436 … … 441 510 """Connect, and return a file object""" 442 511 512 self.makeFileDescriptor() 513 443 514 if self.fd: 444 515 return self.fd 445 516 446 class GangliaXMLProcessor :517 class GangliaXMLProcessor( XMLProcessor ): 447 518 """Main class for processing XML and acting with it""" 448 519 … … 452 523 self.config = GangliaConfigParser( GMETAD_CONF ) 453 524 454 self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )525 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 455 526 self.myXMLSource = self.myXMLGatherer.getFileObject() 456 self.myTXHandler = TorqueXMLHandler() 457 self.myXMLerror = XMLErrorHandler() 458 459 while( 1 ): 460 461 print 'parse' 462 self.myXMLGatherer.makeFileDescriptor() 463 self.myXMLSource = self.myXMLGatherer.getFileObject() 464 xml.sax.parse( self.myXMLSource, self.myTXHandler, self.myXMLerror ) 465 print self.myTXHandler.jobAttrs 466 print 'sleep' 467 time.sleep( 1 ) 468 469 #self.myGXHandler = GangliaXMLHandler( self.config ) 470 #self.myHandler = GangliaXMLHandler( self.config ) 471 #self.myHandler = TorqueXMLHandler( ) 472 #self.myParser.setContentHandler( self.myHandler ) 473 474 def daemon( self ): 475 """Run as daemon forever""" 476 477 # Fork the first child 478 # 479 pid = os.fork() 480 481 if pid > 0: 482 483 sys.exit(0) # end parent 484 485 # creates a session and sets the process group ID 486 # 487 os.setsid() 488 489 # Fork the second child 490 # 491 pid = os.fork() 492 493 if pid > 0: 494 495 sys.exit(0) # end parent 496 497 # Go to the root directory and set the umask 498 # 499 os.chdir('/') 500 os.umask(0) 501 502 sys.stdin.close() 503 sys.stdout.close() 504 #sys.stderr.close() 505 506 os.open('/dev/null', 0) 507 os.dup(0) 508 os.dup(0) 509 510 self.run() 511 512 def printTime( self ): 513 """Print current time in human readable format for logging""" 514 515 return time.strftime("%a %d %b %Y %H:%M:%S") 527 self.myXMLHandler = GangliaXMLHandler( self.config ) 528 self.myXMLError = XMLErrorHandler() 516 529 517 530 def run( self ): … … 545 558 """Store metrics retained in memory to disk""" 546 559 547 debug_msg( 7, self.printTime() + ' - storethread(): started.' )560 debug_msg( 7, printTime() + ' - storethread(): started.' ) 548 561 549 562 # Store metrics somewhere between every 360 and 640 seconds … … 554 567 storethread.start() 555 568 556 debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )569 debug_msg( 7, printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 557 570 time.sleep( STORE_INTERVAL ) 558 debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )571 debug_msg( 7, printTime() + ' - storethread(): Done sleeping.' ) 559 572 560 573 if storethread.isAlive(): 561 574 562 debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )575 debug_msg( 7, printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' ) 563 576 storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 564 debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )565 566 debug_msg( 7, self.printTime() + ' - storethread(): finished.' )577 debug_msg( 7, printTime() + ' - storethread(): Done waiting.' ) 578 579 debug_msg( 7, printTime() + ' - storethread(): finished.' ) 567 580 568 581 return 0 … … 571 584 """Actual metric storing thread""" 572 585 573 debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )574 debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )575 ret = self.my Handler.storeMetrics()576 debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )577 debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )586 debug_msg( 7, printTime() + ' - storemetricthread(): started.' ) 587 debug_msg( 7, printTime() + ' - storemetricthread(): Storing data..' ) 588 ret = self.myXMLHandler.storeMetrics() 589 debug_msg( 7, printTime() + ' - storemetricthread(): Done storing.' ) 590 debug_msg( 7, printTime() + ' - storemetricthread(): finished.' ) 578 591 579 592 return ret … … 582 595 """Process XML""" 583 596 584 debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )597 debug_msg( 7, printTime() + ' - xmlthread(): started.' ) 585 598 586 599 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) 587 600 parsethread.start() 588 601 589 debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )602 debug_msg( 7, printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 590 603 time.sleep( float( self.config.getLowestInterval() ) ) 591 debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )604 debug_msg( 7, printTime() + ' - xmlthread(): Done sleeping.' ) 592 605 593 606 if parsethread.isAlive(): 594 607 595 debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )608 debug_msg( 7, printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' ) 596 609 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 597 debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )598 599 debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )610 debug_msg( 7, printTime() + ' - xmlthread(): Done waiting.' ) 611 612 debug_msg( 7, printTime() + ' - xmlthread(): finished.' ) 600 613 601 614 return 0 … … 604 617 """Actual parsing thread""" 605 618 606 debug_msg( 7, self.printTime() + ' - parsethread(): started.' ) 607 debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' ) 608 #ret = self.myParser.parse( self.myXMLGatherer.getFileObject() ) 609 debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' ) 610 debug_msg( 7, self.printTime() + ' - parsethread(): finished.' ) 619 debug_msg( 7, printTime() + ' - parsethread(): started.' ) 620 debug_msg( 7, printTime() + ' - parsethread(): Parsing XML..' ) 621 self.myXMLSource = self.myXMLGatherer.getFileObject() 622 ret = xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError ) 623 debug_msg( 7, printTime() + ' - parsethread(): Done parsing.' ) 624 debug_msg( 7, printTime() + ' - parsethread(): finished.' ) 611 625 612 626 return ret … … 697 711 def __init__( self, config, cluster ): 698 712 """Setup initial variables""" 713 699 714 self.block = 0 700 715 self.cluster = cluster … … 1056 1071 """Program startup""" 1057 1072 1058 myProcessor = GangliaXMLProcessor() 1073 #myTProcessor = TorqueXMLProcessor() 1074 myGProcessor = GangliaXMLProcessor() 1059 1075 1060 1076 if DAEMONIZE: 1061 myProcessor.daemon() 1077 #torquexmlthread = threading.Thread( None, myTProcessor.daemon, 'tprocxmlthread' ) 1078 gangliaxmlthread = threading.Thread( None, myGProcessor.daemon, 'gprocxmlthread' ) 1062 1079 else: 1063 myProcessor.run() 1080 #torquexmlthread = threading.Thread( None, myTProcessor.run, 'tprocxmlthread' ) 1081 gangliaxmlthread = threading.Thread( None, myGProcessor.run, 'gprocxmlthread' ) 1082 1083 #torquexmlthread.start() 1084 gangliaxmlthread.start() 1064 1085 1065 1086 def check_dir( directory ):
Note: See TracChangeset
for help on using the changeset viewer.