Changeset 78


Ignore:
Timestamp:
04/15/05 15:39:03 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Restructured class/processor layout
  • There is now a superclass for processor classes
  • Torque's jobinfo XML processing will be done now in same manner Ganglia's will
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r77 r78  
    154154                return 0
    155155
     156class XMLProcessor:
     157        """Skeleton class for XML processor's"""
     158
     159        def daemon( self ):
     160                """Run as daemon forever"""
     161
     162                # Fork the first child
     163                #
     164                pid = os.fork()
     165
     166                if pid > 0:
     167
     168                        sys.exit(0)  # end parent
     169
     170                # creates a session and sets the process group ID
     171                #
     172                os.setsid()
     173
     174                # Fork the second child
     175                #
     176                pid = os.fork()
     177
     178                if pid > 0:
     179
     180                        sys.exit(0)  # end parent
     181
     182                # Go to the root directory and set the umask
     183                #
     184                os.chdir('/')
     185                os.umask(0)
     186
     187                sys.stdin.close()
     188                sys.stdout.close()
     189                #sys.stderr.close()
     190
     191                os.open('/dev/null', 0)
     192                os.dup(0)
     193                os.dup(0)
     194
     195                self.run()
     196
     197        def run( self ):
     198                """Do main processing of XML here"""
     199
     200                pass
     201
     202class TorqueXMLProcessor( XMLProcessor ):
     203        """Main class for processing XML and acting with it"""
     204
     205        def __init__( self ):
     206                """Setup initial XML connection and handlers"""
     207
     208                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     209                self.myXMLSource = self.myXMLGatherer.getFileObject()
     210                self.myXMLHandler = TorqueXMLHandler()
     211                self.myXMLError = XMLErrorHandler()
     212
     213        def run( self ):
     214                """Main XML processing"""
     215
     216                while( 1 ):
     217
     218                        print 'parse'
     219                        self.myXMLSource = self.myXMLGatherer.getFileObject()
     220                        xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
     221                        print self.myXMLHandler.jobAttrs
     222                        print 'sleep'
     223                        time.sleep( 1 )
     224
    156225class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
    157226        """Parse Torque's jobinfo XML from our plugin"""
     
    363432                debug_msg( 0, 'Warning ' + str( exception ) )
    364433
    365 class GangliaXMLGatherer:
     434class XMLGatherer:
    366435        """Setup a connection and file object to Ganglia's XML"""
    367436
     
    441510                """Connect, and return a file object"""
    442511
     512                self.makeFileDescriptor()
     513
    443514                if self.fd:
    444515                        return self.fd
    445516
    446 class GangliaXMLProcessor:
     517class GangliaXMLProcessor( XMLProcessor ):
    447518        """Main class for processing XML and acting with it"""
    448519
     
    452523                self.config = GangliaConfigParser( GMETAD_CONF )
    453524
    454                 self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     525                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    455526                self.myXMLSource = self.myXMLGatherer.getFileObject()
    456                 self.myTXHandler = TorqueXMLHandler()
    457                 self.myXMLerror = XMLErrorHandler()
    458 
    459                 while( 1 ):
    460 
    461                         print 'parse'
    462                         self.myXMLGatherer.makeFileDescriptor()
    463                         self.myXMLSource = self.myXMLGatherer.getFileObject()
    464                         xml.sax.parse( self.myXMLSource, self.myTXHandler, self.myXMLerror )
    465                         print self.myTXHandler.jobAttrs
    466                         print 'sleep'
    467                         time.sleep( 1 )
    468 
    469                 #self.myGXHandler = GangliaXMLHandler( self.config )
    470                 #self.myHandler = GangliaXMLHandler( self.config )
    471                 #self.myHandler = TorqueXMLHandler( )
    472                 #self.myParser.setContentHandler( self.myHandler )
    473 
    474         def daemon( self ):
    475                 """Run as daemon forever"""
    476 
    477                 # Fork the first child
    478                 #
    479                 pid = os.fork()
    480 
    481                 if pid > 0:
    482 
    483                         sys.exit(0)  # end parent
    484 
    485                 # creates a session and sets the process group ID
    486                 #
    487                 os.setsid()
    488 
    489                 # Fork the second child
    490                 #
    491                 pid = os.fork()
    492 
    493                 if pid > 0:
    494 
    495                         sys.exit(0)  # end parent
    496 
    497                 # Go to the root directory and set the umask
    498                 #
    499                 os.chdir('/')
    500                 os.umask(0)
    501 
    502                 sys.stdin.close()
    503                 sys.stdout.close()
    504                 #sys.stderr.close()
    505 
    506                 os.open('/dev/null', 0)
    507                 os.dup(0)
    508                 os.dup(0)
    509 
    510                 self.run()
    511 
    512         def printTime( self ):
    513                 """Print current time in human readable format for logging"""
    514 
    515                 return time.strftime("%a %d %b %Y %H:%M:%S")
     527                self.myXMLHandler = GangliaXMLHandler( self.config )
     528                self.myXMLError = XMLErrorHandler()
    516529
    517530        def run( self ):
     
    545558                """Store metrics retained in memory to disk"""
    546559
    547                 debug_msg( 7, self.printTime() + ' - storethread(): started.' )
     560                debug_msg( 7, printTime() + ' - storethread(): started.' )
    548561
    549562                # Store metrics somewhere between every 360 and 640 seconds
     
    554567                storethread.start()
    555568
    556                 debug_msg( 7, self.printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
     569                debug_msg( 7, printTime() + ' - storethread(): Sleeping.. (%ss)' %STORE_INTERVAL )
    557570                time.sleep( STORE_INTERVAL )
    558                 debug_msg( 7, self.printTime() + ' - storethread(): Done sleeping.' )
     571                debug_msg( 7, printTime() + ' - storethread(): Done sleeping.' )
    559572
    560573                if storethread.isAlive():
    561574
    562                         debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
     575                        debug_msg( 7, printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' )
    563576                        storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    564                         debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' )
    565 
    566                 debug_msg( 7, self.printTime() + ' - storethread(): finished.' )
     577                        debug_msg( 7, printTime() + ' - storethread(): Done waiting.' )
     578
     579                debug_msg( 7, printTime() + ' - storethread(): finished.' )
    567580
    568581                return 0
     
    571584                """Actual metric storing thread"""
    572585
    573                 debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
    574                 debug_msg( 7, self.printTime() + ' - storemetricthread(): Storing data..' )
    575                 ret = self.myHandler.storeMetrics()
    576                 debug_msg( 7, self.printTime() + ' - storemetricthread(): Done storing.' )
    577                 debug_msg( 7, self.printTime() + ' - storemetricthread(): finished.' )
     586                debug_msg( 7, printTime() + ' - storemetricthread(): started.' )
     587                debug_msg( 7, printTime() + ' - storemetricthread(): Storing data..' )
     588                ret = self.myXMLHandler.storeMetrics()
     589                debug_msg( 7, printTime() + ' - storemetricthread(): Done storing.' )
     590                debug_msg( 7, printTime() + ' - storemetricthread(): finished.' )
    578591               
    579592                return ret
     
    582595                """Process XML"""
    583596
    584                 debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
     597                debug_msg( 7, printTime() + ' - xmlthread(): started.' )
    585598
    586599                parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
    587600                parsethread.start()
    588601
    589                 debug_msg( 7, self.printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
     602                debug_msg( 7, printTime() + ' - xmlthread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
    590603                time.sleep( float( self.config.getLowestInterval() ) ) 
    591                 debug_msg( 7, self.printTime() + ' - xmlthread(): Done sleeping.' )
     604                debug_msg( 7, printTime() + ' - xmlthread(): Done sleeping.' )
    592605
    593606                if parsethread.isAlive():
    594607
    595                         debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
     608                        debug_msg( 7, printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' )
    596609                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    597                         debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' )
    598 
    599                 debug_msg( 7, self.printTime() + ' - xmlthread(): finished.' )
     610                        debug_msg( 7, printTime() + ' - xmlthread(): Done waiting.' )
     611
     612                debug_msg( 7, printTime() + ' - xmlthread(): finished.' )
    600613
    601614                return 0
     
    604617                """Actual parsing thread"""
    605618
    606                 debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
    607                 debug_msg( 7, self.printTime() + ' - parsethread(): Parsing XML..' )
    608                 #ret = self.myParser.parse( self.myXMLGatherer.getFileObject() )
    609                 debug_msg( 7, self.printTime() + ' - parsethread(): Done parsing.' )
    610                 debug_msg( 7, self.printTime() + ' - parsethread(): finished.' )
     619                debug_msg( 7, printTime() + ' - parsethread(): started.' )
     620                debug_msg( 7, printTime() + ' - parsethread(): Parsing XML..' )
     621                self.myXMLSource = self.myXMLGatherer.getFileObject()
     622                ret = xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
     623                debug_msg( 7, printTime() + ' - parsethread(): Done parsing.' )
     624                debug_msg( 7, printTime() + ' - parsethread(): finished.' )
    611625
    612626                return ret
     
    697711        def __init__( self, config, cluster ):
    698712                """Setup initial variables"""
     713
    699714                self.block = 0
    700715                self.cluster = cluster
     
    10561071        """Program startup"""
    10571072
    1058         myProcessor = GangliaXMLProcessor()
     1073        #myTProcessor = TorqueXMLProcessor()
     1074        myGProcessor = GangliaXMLProcessor()
    10591075
    10601076        if DAEMONIZE:
    1061                 myProcessor.daemon()
     1077                #torquexmlthread = threading.Thread( None, myTProcessor.daemon, 'tprocxmlthread' )
     1078                gangliaxmlthread = threading.Thread( None, myGProcessor.daemon, 'gprocxmlthread' )
    10621079        else:
    1063                 myProcessor.run()
     1080                #torquexmlthread = threading.Thread( None, myTProcessor.run, 'tprocxmlthread' )
     1081                gangliaxmlthread = threading.Thread( None, myGProcessor.run, 'gprocxmlthread' )
     1082
     1083        #torquexmlthread.start()
     1084        gangliaxmlthread.start()
    10641085
    10651086def check_dir( directory ):
Note: See TracChangeset for help on using the changeset viewer.