Changeset 663 for trunk/jobmond/jobmond.py
- Timestamp:
- 09/04/12 12:24:10 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r662 r663 3 3 # This file is part of Jobmonarch 4 4 # 5 # Copyright (C) 2006-2007 Ramon Bastiaans 6 # Copyright (C) 2007, 2009 Dave Love (SGE code) 5 # Copyright (C) 2006-2012 Ramon Bastiaans 7 6 # 8 7 # Jobmonarch is free software; you can redistribute it and/or modify … … 25 24 import sys, getopt, ConfigParser, time, os, socket, string, re 26 25 import xdrlib, socket, syslog, xml, xml.sax 27 from xml.sax.handler import feature_namespaces28 from collections import deque29 26 from types import * 30 27 31 VERSION=' 0.3.1'28 VERSION='TRUNK+SVN' 32 29 33 30 def usage( ver ): … … 719 716 time.sleep( BATCH_POLL_INTERVAL ) 720 717 721 # SGE code by Dave Love <fx@gnu.org>. Tested with SGE 6.0u8 and 6.0u11. May722 # work with SGE 6.1 (else should be easily fixable), but definitely doesn't723 # with 6.2. See also the fixmes.724 725 class NoJobs (Exception):726 """Exception raised by empty job list in qstat output."""727 pass728 729 class SgeQstatXMLParser(xml.sax.handler.ContentHandler):730 """SAX handler for XML output from Sun Grid Engine's `qstat'."""731 732 def __init__(self):733 self.value = ""734 self.joblist = []735 self.job = {}736 self.queue = ""737 self.in_joblist = False738 self.lrequest = False739 self.eltq = deque()740 xml.sax.handler.ContentHandler.__init__(self)741 742 # The structure of the output is as follows (for SGE 6.0). It's743 # similar for 6.1, but radically different for 6.2, and is744 # undocumented generally. Unfortunately it's voluminous, and probably745 # doesn't scale to large clusters/queues.746 747 # <detailed_job_info xmlns:xsd="http://www.w3.org/2001/XMLSchema">748 # <djob_info>749 # <qmaster_response> <!-- job -->750 # ...751 # <JB_ja_template>752 # <ulong_sublist>753 # ... <!-- start_time, state ... -->754 # </ulong_sublist>755 # </JB_ja_template>756 # <JB_ja_tasks>757 # <ulong_sublist>758 # ... <!-- task info759 # </ulong_sublist>760 # ...761 # </JB_ja_tasks>762 # ...763 # </qmaster_response>764 # </djob_info>765 # <messages>766 # ...767 768 # NB. We might treat each task as a separate job, like769 # straight qstat output, but the web interface expects jobs to770 # be identified by integers, not, say, <job number>.<task>.771 772 # So, I lied. If the job list is empty, we get invalid XML773 # like this, which we need to defend against:774 775 # <unknown_jobs xmlns:xsd="http://www.w3.org/2001/XMLSchema">776 # <>777 # <ST_name>*</ST_name>778 # </>779 # </unknown_jobs>780 781 def startElement(self, name, attrs):782 self.value = ""783 if name == "djob_info": # job list784 self.in_joblist = True785 # The job container is "qmaster_response" in SGE 6.0786 # and 6.1, but "element" in 6.2. This is only the very787 # start of what's necessary for 6.2, though (sigh).788 elif (name == "qmaster_response" or name == "element") \789 and self.eltq[-1] == "djob_info": # job790 self.job = {"job_state": "U", "slots": 0,791 "nodes": [], "queued_timestamp": "",792 "queued_timestamp": "", "queue": "",793 "ppn": "0", "RN_max": 0,794 # fixme in endElement795 "requested_memory": 0, "requested_time": 0796 }797 self.joblist.append(self.job)798 elif name == "qstat_l_requests": # resource request799 self.lrequest = True800 elif name == "unknown_jobs":801 raise NoJobs802 self.eltq.append (name)803 804 def characters(self, ch):805 self.value += ch806 807 def endElement(self, name):808 """Snarf job elements contents into job dictionary.809 Translate keys if appropriate."""810 811 name_trans = {812 "JB_job_number": "number",813 "JB_job_name": "name", "JB_owner": "owner",814 "queue_name": "queue", "JAT_start_time": "start_timestamp",815 "JB_submission_time": "queued_timestamp"816 }817 value = self.value818 self.eltq.pop ()819 820 if name == "djob_info":821 self.in_joblist = False822 self.job = {}823 elif name == "JAT_master_queue":824 self.job["queue"] = value.split("@")[0]825 elif name == "JG_qhostname":826 if not (value in self.job["nodes"]):827 self.job["nodes"].append(value)828 elif name == "JG_slots": # slots in use829 self.job["slots"] += int(value)830 elif name == "RN_max": # requested slots (tasks or parallel)831 self.job["RN_max"] = max (self.job["RN_max"],832 int(value))833 elif name == "JAT_state": # job state (bitwise or)834 value = int (value)835 # Status values from sge_jobL.h836 #define JIDLE 0x00000000837 #define JHELD 0x00000010838 #define JMIGRATING 0x00000020839 #define JQUEUED 0x00000040840 #define JRUNNING 0x00000080841 #define JSUSPENDED 0x00000100842 #define JTRANSFERING 0x00000200843 #define JDELETED 0x00000400844 #define JWAITING 0x00000800845 #define JEXITING 0x00001000846 #define JWRITTEN 0x00002000847 #define JSUSPENDED_ON_THRESHOLD 0x00010000848 #define JFINISHED 0x00010000849 if value & 0x80:850 self.job["status"] = "R"851 elif value & 0x40:852 self.job["status"] = "Q"853 else:854 self.job["status"] = "O" # `other'855 elif name == "CE_name" and self.lrequest and self.value in \856 ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):857 # We're in a container for an interesting resource858 # request; record which type.859 self.lrequest = self.value860 elif name == "CE_doubleval" and self.lrequest:861 # if we're in a container for an interesting862 # resource request, use the maxmimum of the hard863 # and soft requests to record the requested CPU864 # or core. Fixme: I'm not sure if this logic is865 # right.866 if self.lrequest in ("h_core", "s_core"):867 self.job["requested_memory"] = \868 max (float (value),869 self.job["requested_memory"])870 # Fixme: Check what cpu means, c.f [hs]_cpu.871 elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):872 self.job["requested_time"] = \873 max (float (value),874 self.job["requested_time"])875 elif name == "qstat_l_requests":876 self.lrequest = False877 elif self.job and self.in_joblist:878 if name in name_trans:879 name = name_trans[name]880 self.job[name] = value881 882 718 # Abstracted from PBS original. 883 719 # … … 914 750 return nodeslist 915 751 916 class SgeDataGatherer(DataGatherer):917 918 jobs = {}919 920 def __init__( self ):921 self.jobs = {}922 self.timeoffset = 0923 self.dp = DataProcessor()924 925 def getJobData( self ):926 """Gather all data on current jobs in SGE"""927 928 import popen2929 930 self.cur_time = 0931 queues = ""932 if QUEUE: # only for specific queues933 # Fixme: assumes queue names don't contain single934 # quote or comma. Don't know what the SGE rules are.935 queues = " -q '" + string.join (QUEUE, ",") + "'"936 # Note the comment in SgeQstatXMLParser about scaling with937 # this method of getting data. I haven't found better one.938 # Output with args `-xml -ext -f -r' is easier to parse939 # in some ways, harder in others, but it doesn't provide940 # the submission time (at least SGE 6.0). The pipeline941 # into sed corrects bogus XML observed with a configuration942 # of SGE 6.0u8, which otherwise causes the parsing to hang.943 piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \944 sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \945 + queues, True)946 qstatparser = SgeQstatXMLParser()947 parse_err = 0948 try:949 xml.sax.parse(piping.fromchild, qstatparser)950 except NoJobs:951 pass952 except:953 parse_err = 1954 if piping.wait():955 debug_msg(10,956 "qstat error, skipping until next polling interval: "957 + piping.childerr.readline())958 return None959 elif parse_err:960 debug_msg(10, "Bad XML output from qstat"())961 exit (1)962 for f in piping.fromchild, piping.tochild, piping.childerr:963 f.close()964 self.cur_time = time.time()965 jobs_processed = []966 for job in qstatparser.joblist:967 job_id = job["number"]968 if job["status"] in [ 'Q', 'R' ]:969 jobs_processed.append(job_id)970 if job["status"] == "R":971 job["nodes"] = do_nodelist (job["nodes"])972 # Fixme: why is job["nodes"] sometimes null?973 try:974 # Fixme: Is this sensible? The975 # PBS-type PPN isn't something you use976 # with SGE.977 job["ppn"] = float(job["slots"]) / \978 len(job["nodes"])979 except:980 job["ppn"] = 0981 if DETECT_TIME_DIFFS:982 # If a job start is later than our983 # current date, that must mean984 # the SGE server's time is later985 # than our local time.986 start_timestamp = \987 int (job["start_timestamp"])988 if start_timestamp > \989 int(self.cur_time) + \990 int(self.timeoffset):991 992 self.timeoffset = \993 start_timestamp - \994 int(self.cur_time)995 else:996 # fixme: Note sure what this should be:997 job["ppn"] = job["RN_max"]998 job["nodes"] = "1"999 1000 myAttrs = {}1001 for attr in ["name", "queue", "owner",1002 "requested_time", "status",1003 "requested_memory", "ppn",1004 "start_timestamp", "queued_timestamp"]:1005 myAttrs[attr] = str(job[attr])1006 myAttrs["nodes"] = job["nodes"]1007 myAttrs["reported"] = str(int(self.cur_time) + \1008 int(self.timeoffset))1009 myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]1010 myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)1011 1012 if self.jobDataChanged(self.jobs, job_id, myAttrs) \1013 and myAttrs["status"] in ["R", "Q"]:1014 self.jobs[job_id] = myAttrs1015 for id, attrs in self.jobs.items():1016 if id not in jobs_processed:1017 del self.jobs[id]1018 1019 # LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>1020 # Requres LSFObject http://sourceforge.net/projects/lsfobject1021 #1022 class LsfDataGatherer(DataGatherer):1023 1024 """This is the DataGatherer for LSf"""1025 1026 global lsfObject1027 1028 def __init__( self ):1029 1030 self.jobs = { }1031 self.timeoffset = 01032 self.dp = DataProcessor()1033 self.initLsfQuery()1034 1035 def _countDuplicatesInList( self, dupedList ):1036 1037 countDupes = { }1038 1039 for item in dupedList:1040 1041 if not countDupes.has_key( item ):1042 1043 countDupes[ item ] = 11044 else:1045 countDupes[ item ] = countDupes[ item ] + 11046 1047 dupeCountList = [ ]1048 1049 for item, count in countDupes.items():1050 1051 dupeCountList.append( ( item, count ) )1052 1053 return dupeCountList1054 #1055 #lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']1056 #print _countDuplicatesInList(lst)1057 #[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]1058 ########################1059 1060 def initLsfQuery( self ):1061 self.pq = None1062 self.pq = lsfObject.jobInfoEntObject()1063 1064 def getJobData( self, known_jobs="" ):1065 """Gather all data on current jobs in LSF"""1066 if len( known_jobs ) > 0:1067 jobs = known_jobs1068 else:1069 jobs = { }1070 joblist = {}1071 joblist = self.pq.getJobInfo()1072 nodelist = ''1073 1074 self.cur_time = time.time()1075 1076 jobs_processed = [ ]1077 1078 for name, attrs in joblist.items():1079 job_id = str(name)1080 jobs_processed.append( job_id )1081 name = self.getAttr( attrs, 'jobName' )1082 queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )1083 owner = self.getAttr( attrs, 'user' )1084 1085 ### THIS IS THE rLimit List index values1086 #define LSF_RLIMIT_CPU 0 /* cpu time in milliseconds */1087 #define LSF_RLIMIT_FSIZE 1 /* maximum file size */1088 #define LSF_RLIMIT_DATA 2 /* data size */1089 #define LSF_RLIMIT_STACK 3 /* stack size */1090 #define LSF_RLIMIT_CORE 4 /* core file size */1091 #define LSF_RLIMIT_RSS 5 /* resident set size */1092 #define LSF_RLIMIT_NOFILE 6 /* open files */1093 #define LSF_RLIMIT_OPEN_MAX 7 /* (from HP-UX) */1094 #define LSF_RLIMIT_VMEM 8 /* maximum swap mem */1095 #define LSF_RLIMIT_SWAP 81096 #define LSF_RLIMIT_RUN 9 /* max wall-clock time limit */1097 #define LSF_RLIMIT_PROCESS 10 /* process number limit */1098 #define LSF_RLIMIT_THREAD 11 /* thread number limit (introduced in LSF6.0) */1099 #define LSF_RLIM_NLIMITS 12 /* number of resource limits */1100 1101 requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]1102 if requested_time == -1:1103 requested_time = ""1104 requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]1105 if requested_memory == -1:1106 requested_memory = ""1107 # This tries to get proc per node. We don't support this right now1108 ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )1109 requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )1110 if requested_cpus == None or requested_cpus == "":1111 requested_cpus = 11112 1113 if QUEUE:1114 for q in QUEUE:1115 if q == queue:1116 display_queue = 11117 break1118 else:1119 display_queue = 01120 continue1121 if display_queue == 0:1122 continue1123 1124 runState = self.getAttr( attrs, 'status' )1125 if runState == 4:1126 status = 'R'1127 else:1128 status = 'Q'1129 queued_timestamp = self.getAttr( attrs, 'submitTime' )1130 1131 if status == 'R':1132 start_timestamp = self.getAttr( attrs, 'startTime' )1133 nodesCpu = dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))1134 nodelist = nodesCpu.keys()1135 1136 if DETECT_TIME_DIFFS:1137 1138 # If a job start if later than our current date,1139 # that must mean the Torque server's time is later1140 # than our local time.1141 1142 if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):1143 1144 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )1145 1146 elif status == 'Q':1147 start_timestamp = ''1148 count_mynodes = 01149 numeric_node = 11150 nodelist = ''1151 1152 myAttrs = { }1153 if name == "":1154 myAttrs['name'] = "none"1155 else:1156 myAttrs['name'] = name1157 1158 myAttrs[ 'owner' ] = owner1159 myAttrs[ 'requested_time' ] = str(requested_time)1160 myAttrs[ 'requested_memory' ] = str(requested_memory)1161 myAttrs[ 'requested_cpus' ] = str(requested_cpus)1162 myAttrs[ 'ppn' ] = str( ppn )1163 myAttrs[ 'status' ] = status1164 myAttrs[ 'start_timestamp' ] = str(start_timestamp)1165 myAttrs[ 'queue' ] = str(queue)1166 myAttrs[ 'queued_timestamp' ] = str(queued_timestamp)1167 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )1168 myAttrs[ 'nodes' ] = do_nodelist( nodelist )1169 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1]1170 myAttrs[ 'poll_interval' ] = str(BATCH_POLL_INTERVAL)1171 1172 if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:1173 jobs[ job_id ] = myAttrs1174 1175 debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )1176 1177 for id, attrs in jobs.items():1178 if id not in jobs_processed:1179 # This one isn't there anymore1180 #1181 del jobs[ id ]1182 1183 self.jobs = jobs1184 1185 752 class PbsDataGatherer( DataGatherer ): 1186 753 … … 1235 802 queue = self.getAttr( attrs, 'queue' ) 1236 803 nodect = self.getAttr( attrs['Resource_List'], 'nodect' ) 1237 exec_group = self.getAttr( attrs, 'egroup' )1238 804 1239 805 requested_time = self.getAttr( attrs['Resource_List'], 'walltime' ) … … 1374 940 myAttrs[ 'owner' ] = str( owner ) 1375 941 myAttrs[ 'nodect' ] = str( nodect ) 1376 myAttrs[ 'exec.group' ] = str( exec_group )1377 942 myAttrs[ 'exec.hostnames' ] = str( running_nodes ) 1378 943 myAttrs[ 'exec.nodestr' ] = str( exec_nodestr )
Note: See TracChangeset
for help on using the changeset viewer.