Changeset 782
- Timestamp:
- 03/31/13 21:25:23 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/0.4/jobarchived/jobarchived.py
r774 r782 446 446 def mutateJob( self, action, job_id, jobattrs ): 447 447 448 job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]449 450 insert_col_str 451 insert_val_str 452 update_str = None448 job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ] 449 450 insert_col_str = 'job_id' 451 insert_val_str = "'%s'" %job_id 452 update_str = None 453 453 454 454 debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id)) … … 770 770 pass 771 771 772 class TorqueXMLProcessor( XMLProcessor ):772 class JobXMLProcessor( XMLProcessor ): 773 773 """Main class for processing XML and acting with it""" 774 774 … … 776 776 """Setup initial XML connection and handlers""" 777 777 778 self.myXMLSource = XMLSource 779 self.myXMLHandler = TorqueXMLHandler( DataStore ) 780 self.myXMLError = XMLErrorHandler() 781 782 self.config = GangliaConfigParser( GMETAD_CONF ) 778 self.myXMLSource = XMLSource 779 self.myXMLHandler = JobXMLHandler( DataStore ) 780 self.myXMLError = XMLErrorHandler() 781 782 self.config = GangliaConfigParser( GMETAD_CONF ) 783 784 self.kill_thread = False 785 786 def killThread( self ): 787 788 self.kill_thread = True 783 789 784 790 def run( self ): 785 791 """Main XML processing""" 786 792 787 debug_msg( 1, ' torque_xml_thread(): started.' )793 debug_msg( 1, 'job_xml_thread(): started.' ) 788 794 789 795 while( 1 ): 790 796 791 #self.myXMLSource = self.mXMLGatherer.getFileObject() 792 debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' ) 797 debug_msg( 1, 'job_xml_thread(): Retrieving XML data..' ) 793 798 794 799 my_data = self.myXMLSource.getData() 795 #print my_data 796 #print "size my data: %d" %len( my_data ) 797 798 debug_msg( 1, 'torque_xml_thread(): Done retrieving.' ) 800 801 debug_msg( 1, 'job_xml_thread(): Done retrieving: data size %d' %len(my_data) ) 799 802 800 803 if my_data: 801 debug_msg( 1, ' ganglia_parse_thread(): Parsing XML..' )804 debug_msg( 1, 'job_xml_thread(): Parsing XML..' ) 802 805 803 806 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 804 807 805 debug_msg( 1, ' ganglia_parse_thread(): Done parsing.' )808 debug_msg( 1, 'job_xml_thread(): Done parsing.' ) 806 809 else: 807 debug_msg( 1, 'torque_xml_thread(): Got no data.' ) 808 810 debug_msg( 1, 'job_xml_thread(): Got no data.' ) 811 812 if self.kill_thread: 813 814 debug_msg( 1, 'job_xml_thread(): killed.' ) 815 return None 809 816 810 debug_msg( 1, ' torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )817 debug_msg( 1, 'job_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) ) 811 818 time.sleep( self.config.getLowestInterval() ) 812 819 813 class TorqueXMLHandler( xml.sax.handler.ContentHandler ): 814 """Parse Torque's jobinfo XML from our plugin""" 815 816 jobAttrs = { } 820 class JobXMLHandler( xml.sax.handler.ContentHandler ): 821 """Parse Job's jobinfo XML from our plugin""" 817 822 818 823 def __init__( self, datastore ): 819 824 820 #self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 821 self.ds = datastore 822 self.jobs_processed = [ ] 823 self.jobs_to_store = [ ] 825 self.ds = datastore 826 self.jobs_processed = [ ] 827 self.jobs_to_store = [ ] 828 self.jobAttrs = { } 829 self.jobAttrsSaved = { } 830 824 831 debug_msg( 1, "XML: Handler created" ) 825 832 826 833 def startDocument( self ): 827 834 828 self.heartbeat = 0 829 self.elementct = 0 835 self.jobs_processed = [ ] 836 self.heartbeat = 0 837 self.elementct = 0 838 830 839 debug_msg( 1, "XML: Start document" ) 831 840 … … 836 845 just one XML statement with all info 837 846 """ 838 847 839 848 jobinfo = { } 840 849 … … 850 859 851 860 if metricname == 'zplugin_monarch_heartbeat': 861 852 862 self.heartbeat = str( attrs.get( 'VAL', "" ) ) 853 863 854 864 elif metricname.find( 'zplugin_monarch_job' ) != -1: 855 865 856 job_id = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1] 857 val = str( attrs.get( 'VAL', "" ) ) 858 859 if not job_id in self.jobs_processed: 860 861 self.jobs_processed.append( job_id ) 862 863 check_change = 0 864 865 if self.jobAttrs.has_key( job_id ): 866 867 check_change = 1 866 job_id = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1] 867 val = str( attrs.get( 'VAL', "" ) ) 868 868 869 869 valinfo = val.split( ' ' ) … … 873 873 if len( myval.split( '=' ) ) > 1: 874 874 875 valname 876 value 875 valname = myval.split( '=' )[0] 876 value = myval.split( '=' )[1] 877 877 878 878 if valname == 'nodes': 879 879 880 value = value.split( ';' ) 880 881 881 882 jobinfo[ valname ] = value 882 883 883 if check_change: 884 if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]: 885 self.jobAttrs[ job_id ]['stop_timestamp'] = '' 886 self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo ) 887 if not job_id in self.jobs_to_store: 888 self.jobs_to_store.append( job_id ) 889 890 debug_msg( 10, 'jobinfo for job %s has changed' %job_id ) 891 else: 892 self.jobAttrs[ job_id ] = jobinfo 893 894 if not job_id in self.jobs_to_store: 895 self.jobs_to_store.append( job_id ) 896 897 debug_msg( 10, 'jobinfo for job %s has changed' %job_id ) 884 self.jobAttrs[ job_id ] = jobinfo 885 886 self.jobs_processed.append( job_id ) 898 887 899 888 def endDocument( self ): 900 889 """When all metrics have gone, check if any jobs have finished""" 901 890 902 debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" ) 903 904 if self.heartbeat: 905 for jobid, jobinfo in self.jobAttrs.items(): 906 907 # This is an old job, not in current jobinfo list anymore 908 # it must have finished, since we _did_ get a new heartbeat 909 # 910 mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] ) 911 912 if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'): 913 914 if not jobid in self.jobs_processed: 915 self.jobs_processed.append( jobid ) 916 891 debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_processed))+" jobs" ) 892 893 if self.heartbeat == 0: 894 return None 895 896 for jobid, jobinfo in self.jobAttrs.items(): 897 898 if jobinfo['reported'] != self.heartbeat: 899 900 if (jobinfo['status'] != 'R'): 901 debug_msg( 1, 'job %s report time %s does not match current heartbeat %s : ignoring job' %(jobid, jobinfo['reported'], self.heartbeat ) ) 902 del self.jobAttrs[ jobid ] 903 904 if jobid in self.jobs_to_store: 905 del self.jobs_to_store[ jobid ] 906 907 continue 908 909 elif jobid not in self.jobs_processed: 910 # Was running previous heartbeat but not anymore: must be finished 917 911 self.jobAttrs[ jobid ]['status'] = 'F' 918 912 self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat ) 913 debug_msg( 1, 'job %s appears to have finished' %jobid ) 919 914 920 915 if not jobid in self.jobs_to_store: 921 916 self.jobs_to_store.append( jobid ) 922 917 923 debug_msg( 1, 'torque_xml_thread(): Storing..' ) 924 925 for jobid in self.jobs_to_store: 926 if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]: 927 928 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 929 930 if self.jobAttrs[ jobid ]['status'] == 'F': 931 del self.jobAttrs[ jobid ] 932 933 debug_msg( 1, 'torque_xml_thread(): Done storing.' ) 934 935 self.jobs_processed = [ ] 936 self.jobs_to_store = [ ] 918 continue 919 920 elif self.jobAttrsSaved.has_key( jobid ): 921 922 if self.jobinfoChanged( jobid, jobinfo ): 923 924 self.jobAttrs[ jobid ]['stop_timestamp'] = '' 925 self.jobAttrs[ jobid ] = self.setJobAttrs( self.jobAttrs[ jobid ], jobinfo ) 926 927 if not jobid in self.jobs_to_store: 928 929 self.jobs_to_store.append( jobid ) 930 931 debug_msg( 10, 'jobinfo for job %s has changed' %jobid ) 932 else: 933 debug_msg( 1, 'new job %s' %jobid ) 934 935 if not jobid in self.jobs_to_store: 936 937 self.jobs_to_store.append( jobid ) 938 939 debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) ) 940 941 if len( self.jobs_to_store ) > 0: 942 943 debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' ) 944 945 while len( self.jobs_to_store ) > 0: 946 947 jobid = self.jobs_to_store.pop( 0 ) 948 949 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 950 951 self.jobAttrsSaved[ jobid ] = self.jobAttrs[ jobid ] 952 953 if self.jobAttrs[ jobid ]['status'] == 'F': 954 955 del self.jobAttrs[ jobid ] 956 957 debug_msg( 1, 'job_xml_thread(): Done storing.' ) 958 959 else: 960 debug_msg( 1, 'job_xml_thread(): No jobs to store.' ) 961 962 self.jobs_processed = [ ] 937 963 938 964 def setJobAttrs( self, old, new ): … … 948 974 949 975 950 def jobinfoChanged( self, job attrs, jobid, jobinfo ):976 def jobinfoChanged( self, jobid, jobinfo ): 951 977 """ 952 978 Check if jobinfo has changed from jobattrs[jobid] … … 957 983 ignore_changes = [ 'reported' ] 958 984 959 if jobattrs.has_key( jobid ):985 if self.jobAttrsSaved.has_key( jobid ): 960 986 961 987 for valname, value in jobinfo.items(): … … 963 989 if valname not in ignore_changes: 964 990 965 if jobattrs[ jobid ].has_key( valname ): 966 967 if value != jobattrs[ jobid ][ valname ]: 968 969 if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat: 991 if self.jobAttrsSaved[ jobid ].has_key( valname ): 992 993 if value != self.jobAttrsSaved[ jobid ][ valname ]: 994 995 if jobinfo['reported'] > self.jobAttrsSaved[ jobid ][ 'reported' ]: 996 997 debug_msg( 1, "job %s field '%s' changed since saved from: %s to: %s" %( jobid, valname, value, self.jobAttrsSaved[ jobid ][ valname ] ) ) 998 970 999 return True 971 1000 972 1001 else: 1002 debug_msg( 1, "job %s did not have field '%s'" %( jobid, valname ) ) 973 1003 return True 974 1004 … … 1036 1066 if name == 'GANGLIA_XML': 1037 1067 1038 self.XMLSource = str( attrs.get( 'SOURCE',"" ) )1039 self.gangliaVersion 1068 self.XMLSource = str( attrs.get( 'SOURCE', "" ) ) 1069 self.gangliaVersion = str( attrs.get( 'VERSION', "" ) ) 1040 1070 1041 1071 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) … … 1050 1080 elif name == 'CLUSTER': 1051 1081 1052 self.clusterName = str( attrs.get( 'NAME',"" ) )1082 self.clusterName = str( attrs.get( 'NAME', "" ) ) 1053 1083 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1054 1084 … … 1061 1091 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 1062 1092 1063 self.hostName = str( attrs.get( 'NAME',"" ) )1064 self.hostIp = str( attrs.get( 'IP',"" ) )1065 self.hostReported 1093 self.hostName = str( attrs.get( 'NAME', "" ) ) 1094 self.hostIp = str( attrs.get( 'IP', "" ) ) 1095 self.hostReported = str( attrs.get( 'REPORTED', "" ) ) 1066 1096 1067 1097 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) … … 1087 1117 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 1088 1118 1089 myMetric = { }1090 myMetric['name'] 1091 myMetric['val'] = str( attrs.get( 'VAL',"" ) )1092 myMetric['time'] 1119 myMetric = { } 1120 myMetric['name'] = str( attrs.get( 'NAME', "" ) ) 1121 myMetric['val'] = str( attrs.get( 'VAL', "" ) ) 1122 myMetric['time'] = self.hostReported 1093 1123 1094 1124 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) … … 1159 1189 self.host = host 1160 1190 self.port = port 1161 self.slot 1191 self.slot = threading.Lock() 1162 1192 1163 1193 self.retrieveData() … … 1290 1320 """Setup initial XML connection and handlers""" 1291 1321 1292 self.config = GangliaConfigParser( GMETAD_CONF ) 1293 1294 #self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 1295 #self.myXMLSource = self.myXMLGatherer.getFileObject() 1296 self.myXMLSource = XMLSource 1297 self.ds = DataStore 1298 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds ) 1299 self.myXMLError = XMLErrorHandler() 1322 self.config = GangliaConfigParser( GMETAD_CONF ) 1323 self.myXMLSource = XMLSource 1324 self.ds = DataStore 1325 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds ) 1326 self.myXMLError = XMLErrorHandler() 1300 1327 1301 1328 def run( self ): 1302 1329 """Main XML processing; start a xml and storethread""" 1303 1330 1304 xml_thread = threading.Thread( None, self.processXML,'xmlthread' )1331 xml_thread = threading.Thread( None, self.processXML, 'xmlthread' ) 1305 1332 store_thread = threading.Thread( None, self.storeMetrics, 'storethread' ) 1306 1333 … … 1343 1370 if DEBUG_LEVEL >= 1: 1344 1371 STORE_INTERVAL = 60 1345 #STORE_INTERVAL = random.randint( 360, 640 )1346 1372 else: 1347 1373 STORE_INTERVAL = random.randint( 360, 640 ) … … 1362 1388 if store_metric_thread.isAlive(): 1363 1389 1364 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to terminate..' )1390 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' ) 1365 1391 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1366 debug_msg( 1, 'ganglia_store_thread(): Done waiting: terminated storemetricthread()' ) 1392 1393 if store_metric_thread.isAlive(): 1394 1395 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' ) 1396 else: 1397 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' ) 1367 1398 1368 1399 debug_msg( 1, 'ganglia_store_thread(): finished.' ) … … 1375 1406 debug_msg( 1, 'ganglia_store_metric_thread(): started.' ) 1376 1407 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' ) 1408 1377 1409 ret = self.myXMLHandler.storeMetrics() 1378 1410 if ret > 0: 1379 1411 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) ) 1412 1380 1413 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' ) 1381 1414 debug_msg( 1, 'ganglia_store_metric_thread(): finished.' ) … … 1385 1418 def processXML( self ): 1386 1419 """Process XML""" 1387 1388 debug_msg( 5, "Entering processXML()")1389 1420 1390 1421 try: … … 1403 1434 if parsethread.isAlive(): 1404 1435 1405 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to terminate..' %PARSE_TIMEOUT )1436 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT ) 1406 1437 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1407 debug_msg( 1, 'ganglia_xml_thread(): Done waiting. parsethread() terminated' ) 1438 1439 if parsethread.isAlive(): 1440 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' ) 1441 else: 1442 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' ) 1408 1443 1409 1444 debug_msg( 1, 'ganglia_xml_thread(): finished.' ) 1410 1411 debug_msg( 5, "Leaving processXML()")1412 1445 1413 1446 return 0 … … 1420 1453 1421 1454 my_data = self.myXMLSource.getData() 1422 debug_msg( 1, 'ganglia_parse_thread(): data size %d.' %len(my_data) ) 1423 1424 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' ) 1455 1456 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) ) 1425 1457 1426 1458 if my_data: … … 1692 1724 debug_msg( 5, "Entering storeMetrics()") 1693 1725 1694 count_values 1695 count_metrics 1726 count_values = 0 1727 count_metrics = 0 1696 1728 count_bits = 0 1697 1729 … … 1706 1738 count_values += 1 1707 1739 1708 count_bits 1709 count_bits 1740 count_bits += len( dmetric['time'] ) 1741 count_bits += len( dmetric['val'] ) 1710 1742 1711 1743 count_bytes = count_bits / 8 … … 1975 2007 """Threading start""" 1976 2008 1977 config = GangliaConfigParser( GMETAD_CONF )1978 s_timeout = int( config.getLowestInterval() - 1 )2009 config = GangliaConfigParser( GMETAD_CONF ) 2010 s_timeout = int( config.getLowestInterval() - 1 ) 1979 2011 1980 2012 socket.setdefaulttimeout( s_timeout ) … … 1983 2015 myDataStore = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 1984 2016 1985 my TorqueProcessor = TorqueXMLProcessor( myXMLSource, myDataStore )1986 myGangliaProcessor 2017 myJobProcessor = JobXMLProcessor( myXMLSource, myDataStore ) 2018 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 1987 2019 1988 2020 try: 1989 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )1990 ganglia_xml_thread 1991 1992 torque_xml_thread.start()2021 job_xml_thread = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' ) 2022 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 2023 2024 job_xml_thread.start() 1993 2025 ganglia_xml_thread.start() 1994 2026
Note: See TracChangeset
for help on using the changeset viewer.