Changeset 287


Ignore:
Timestamp:
12/21/06 10:33:19 (17 years ago)
Author:
bastiaans
Message:

jobarchived/jobarchived.py:

  • changed XML retrieval to work with new gmetad
  • gather XML in same way as webfrontend now
  • use the same XMLGatherer for both XML processors to prevent unnecessary reconnecting
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobarchived/jobarchived.py

    r286 r287  
    450450        """Main class for processing XML and acting with it"""
    451451
    452         def __init__( self ):
     452        def __init__( self, XMLSource ):
    453453                """Setup initial XML connection and handlers"""
    454454
    455                 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    456                 self.myXMLSource = self.myXMLGatherer.getFileObject()
    457                 self.myXMLHandler = TorqueXMLHandler()
    458                 self.myXMLError = XMLErrorHandler()
    459                 self.config = GangliaConfigParser( GMETAD_CONF )
     455                self.myXMLGatherer      = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     456                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
     457                self.myXMLSource        = XMLSource
     458                print self.myXMLSource
     459                self.myXMLHandler       = TorqueXMLHandler()
     460                self.myXMLError         = XMLErrorHandler()
     461
     462                self.config             = GangliaConfigParser( GMETAD_CONF )
    460463
    461464        def run( self ):
     
    466469                while( 1 ):
    467470
    468                         self.myXMLSource = self.myXMLGatherer.getFileObject()
     471                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
    469472                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
    470473
     474                        my_data = self.myXMLSource.getData()
     475
    471476                        try:
    472                                 xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
     477                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    473478                        except socket.error, msg:
    474479                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
     
    769774        """Setup a connection and file object to Ganglia's XML"""
    770775
    771         s = None
    772         fd = None
     776        s               = None
     777        fd              = None
     778        data            = None
     779
     780        # Time since the last update
     781        #
     782        LAST_UPDATE     = 0
     783
     784        # Minimum interval between updates
     785        #
     786        MIN_UPDATE_INT  = 10
     787
     788        # Is a update occuring now
     789        #
     790        update_now      = False
    773791
    774792        def __init__( self, host, port ):
     
    777795                self.host = host
    778796                self.port = port
    779                 self.connect()
    780                 self.makeFileDescriptor()
    781 
    782         def connect( self ):
     797
     798                self.retrieveData()
     799
     800        def retrieveData( self ):
    783801                """Setup connection to XML source"""
     802
     803                self.update_now = True
    784804
    785805                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
     
    796816                                continue
    797817
     818                        print self.s
     819
    798820                        try:
    799821
     
    807829                        break
    808830
     831                print self.s
     832
    809833                if self.s is None:
    810834
    811835                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
     836                        self.update_now = False
    812837                        sys.exit( 1 )
     838
     839                else:
     840                        self.s.send( '\n' )
     841
     842                        my_fp                   = self.s.makefile( 'r' )
     843                        my_data                 = my_fp.readlines()
     844                        my_data                 = string.join( my_data, '' )
     845
     846                        self.data               = my_data
     847
     848                        self.LAST_UPDATE        = time.time()
     849
     850                self.update_now = False
    813851
    814852        def disconnect( self ):
     
    816854
    817855                if self.s:
    818                         self.s.shutdown( 2 )
     856                        #self.s.shutdown( 2 )
    819857                        self.s.close()
    820858                        self.s = None
     
    825863                self.disconnect()
    826864
    827         def reconnect( self ):
     865        def reGetData( self ):
    828866                """Reconnect"""
     867
     868                while self.update_now:
     869
     870                        # Must be another update in progress:
     871                        # Wait until the update is complete
     872                        #
     873                        time.sleep( 1 )
    829874
    830875                if self.s:
    831876                        self.disconnect()
    832877
    833                 self.connect()
     878                self.retrieveData()
     879
     880        def getData( self ):
     881
     882                """Return the XML data"""
     883
     884                # If more than MIN_UPDATE_INT seconds passed since last data update
     885                # update the XML first before returning it
     886                #
     887
     888                cur_time        = time.time()
     889
     890                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
     891
     892                        self.reGetData()
     893
     894                while self.update_now:
     895
     896                        # Must be another update in progress:
     897                        # Wait until the update is complete
     898                        #
     899                        time.sleep( 1 )
     900                       
     901                return self.data
    834902
    835903        def makeFileDescriptor( self ):
     
    852920        """Main class for processing XML and acting with it"""
    853921
    854         def __init__( self ):
     922        def __init__( self, XMLSource ):
    855923                """Setup initial XML connection and handlers"""
    856924
    857                 self.config = GangliaConfigParser( GMETAD_CONF )
    858 
    859                 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    860                 self.myXMLSource = self.myXMLGatherer.getFileObject()
    861                 self.myXMLHandler = GangliaXMLHandler( self.config )
    862                 self.myXMLError = XMLErrorHandler()
     925                self.config             = GangliaConfigParser( GMETAD_CONF )
     926
     927                self.myXMLGatherer      = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     928                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
     929                self.myXMLSource        = XMLSource
     930                print self.myXMLSource
     931                self.myXMLHandler       = GangliaXMLHandler( self.config )
     932                self.myXMLError         = XMLErrorHandler()
    863933
    864934        def run( self ):
     
    9711041                debug_msg( 1, 'ganglia_parse_thread(): started.' )
    9721042                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
    973                 self.myXMLSource = self.myXMLGatherer.getFileObject()
     1043                #self.myXMLSource = self.myXMLGatherer.getFileObject()
     1044               
     1045                my_data = self.myXMLSource.getData()
    9741046
    9751047                try:
    976                         xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
     1048                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    9771049                except socket.error, msg:
    9781050                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
     
    14791551        """Threading start"""
    14801552
    1481         myTorqueProcessor = TorqueXMLProcessor()
    1482         myGangliaProcessor = GangliaXMLProcessor()
     1553        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
     1554
     1555        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource )
     1556        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource )
    14831557
    14841558        try:
Note: See TracChangeset for help on using the changeset viewer.