Changeset 287
- Timestamp:
- 12/21/06 10:33:19 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobarchived/jobarchived.py
r286 r287 450 450 """Main class for processing XML and acting with it""" 451 451 452 def __init__( self ):452 def __init__( self, XMLSource ): 453 453 """Setup initial XML connection and handlers""" 454 454 455 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 456 self.myXMLSource = self.myXMLGatherer.getFileObject() 457 self.myXMLHandler = TorqueXMLHandler() 458 self.myXMLError = XMLErrorHandler() 459 self.config = GangliaConfigParser( GMETAD_CONF ) 455 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 456 #self.myXMLSource = self.myXMLGatherer.getFileObject() 457 self.myXMLSource = XMLSource 458 print self.myXMLSource 459 self.myXMLHandler = TorqueXMLHandler() 460 self.myXMLError = XMLErrorHandler() 461 462 self.config = GangliaConfigParser( GMETAD_CONF ) 460 463 461 464 def run( self ): … … 466 469 while( 1 ): 467 470 468 self.myXMLSource = self.myXMLGatherer.getFileObject()471 #self.myXMLSource = self.mXMLGatherer.getFileObject() 469 472 debug_msg( 1, 'torque_xml_thread(): Parsing..' ) 470 473 474 my_data = self.myXMLSource.getData() 475 471 476 try: 472 xml.sax.parse ( self.myXMLSource, self.myXMLHandler, self.myXMLError )477 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 473 478 except socket.error, msg: 474 479 debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg ) … … 769 774 """Setup a connection and file object to Ganglia's XML""" 770 775 771 s = None 772 fd = None 776 s = None 777 fd = None 778 data = None 779 780 # Time since the last update 781 # 782 LAST_UPDATE = 0 783 784 # Minimum interval between updates 785 # 786 MIN_UPDATE_INT = 10 787 788 # Is a update occuring now 789 # 790 update_now = False 773 791 774 792 def __init__( self, host, port ): … … 777 795 self.host = host 778 796 self.port = port 779 self.connect() 780 self. makeFileDescriptor()781 782 def connect( self ):797 798 self.retrieveData() 799 800 def retrieveData( self ): 783 801 """Setup connection to XML source""" 802 803 self.update_now = True 784 804 785 805 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 796 816 continue 797 817 818 print self.s 819 798 820 try: 799 821 … … 807 829 break 808 830 831 print self.s 832 809 833 if self.s is None: 810 834 811 835 debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' ) 836 self.update_now = False 812 837 sys.exit( 1 ) 838 839 else: 840 self.s.send( '\n' ) 841 842 my_fp = self.s.makefile( 'r' ) 843 my_data = my_fp.readlines() 844 my_data = string.join( my_data, '' ) 845 846 self.data = my_data 847 848 self.LAST_UPDATE = time.time() 849 850 self.update_now = False 813 851 814 852 def disconnect( self ): … … 816 854 817 855 if self.s: 818 self.s.shutdown( 2 )856 #self.s.shutdown( 2 ) 819 857 self.s.close() 820 858 self.s = None … … 825 863 self.disconnect() 826 864 827 def re connect( self ):865 def reGetData( self ): 828 866 """Reconnect""" 867 868 while self.update_now: 869 870 # Must be another update in progress: 871 # Wait until the update is complete 872 # 873 time.sleep( 1 ) 829 874 830 875 if self.s: 831 876 self.disconnect() 832 877 833 self.connect() 878 self.retrieveData() 879 880 def getData( self ): 881 882 """Return the XML data""" 883 884 # If more than MIN_UPDATE_INT seconds passed since last data update 885 # update the XML first before returning it 886 # 887 888 cur_time = time.time() 889 890 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 891 892 self.reGetData() 893 894 while self.update_now: 895 896 # Must be another update in progress: 897 # Wait until the update is complete 898 # 899 time.sleep( 1 ) 900 901 return self.data 834 902 835 903 def makeFileDescriptor( self ): … … 852 920 """Main class for processing XML and acting with it""" 853 921 854 def __init__( self ):922 def __init__( self, XMLSource ): 855 923 """Setup initial XML connection and handlers""" 856 924 857 self.config = GangliaConfigParser( GMETAD_CONF ) 858 859 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 860 self.myXMLSource = self.myXMLGatherer.getFileObject() 861 self.myXMLHandler = GangliaXMLHandler( self.config ) 862 self.myXMLError = XMLErrorHandler() 925 self.config = GangliaConfigParser( GMETAD_CONF ) 926 927 self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 928 #self.myXMLSource = self.myXMLGatherer.getFileObject() 929 self.myXMLSource = XMLSource 930 print self.myXMLSource 931 self.myXMLHandler = GangliaXMLHandler( self.config ) 932 self.myXMLError = XMLErrorHandler() 863 933 864 934 def run( self ): … … 971 1041 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 972 1042 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 973 self.myXMLSource = self.myXMLGatherer.getFileObject() 1043 #self.myXMLSource = self.myXMLGatherer.getFileObject() 1044 1045 my_data = self.myXMLSource.getData() 974 1046 975 1047 try: 976 xml.sax.parse ( self.myXMLSource, self.myXMLHandler, self.myXMLError )1048 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 977 1049 except socket.error, msg: 978 1050 debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg ) … … 1479 1551 """Threading start""" 1480 1552 1481 myTorqueProcessor = TorqueXMLProcessor() 1482 myGangliaProcessor = GangliaXMLProcessor() 1553 myXMLSource = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 1554 1555 myTorqueProcessor = TorqueXMLProcessor( myXMLSource ) 1556 myGangliaProcessor = GangliaXMLProcessor( myXMLSource ) 1483 1557 1484 1558 try:
Note: See TracChangeset
for help on using the changeset viewer.