Changeset 663


Ignore:
Timestamp:
09/04/12 12:24:10 (9 years ago)
Author:
ramonb
Message:
  • remove SGE, LSF: no feedback from users/authors and not compatible anymore
  • removed egroup metric
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobmond/jobmond.py

    r662 r663  
    33# This file is part of Jobmonarch
    44#
    5 # Copyright (C) 2006-2007  Ramon Bastiaans
    6 # Copyright (C) 2007, 2009  Dave Love  (SGE code)
     5# Copyright (C) 2006-2012  Ramon Bastiaans
    76#
    87# Jobmonarch is free software; you can redistribute it and/or modify
     
    2524import sys, getopt, ConfigParser, time, os, socket, string, re
    2625import xdrlib, socket, syslog, xml, xml.sax
    27 from xml.sax.handler import feature_namespaces
    28 from collections import deque
    2926from types import *
    3027
    31 VERSION='0.3.1'
     28VERSION='TRUNK+SVN'
    3229
    3330def usage( ver ):
     
    719716            time.sleep( BATCH_POLL_INTERVAL )   
    720717
    721 # SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
    722 # work with SGE 6.1 (else should be easily fixable), but definitely doesn't
    723 # with 6.2.  See also the fixmes.
    724 
    725 class NoJobs (Exception):
    726     """Exception raised by empty job list in qstat output."""
    727     pass
    728 
    729 class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
    730     """SAX handler for XML output from Sun Grid Engine's `qstat'."""
    731 
    732     def __init__(self):
    733         self.value = ""
    734         self.joblist = []
    735         self.job = {}
    736         self.queue = ""
    737         self.in_joblist = False
    738         self.lrequest = False
    739         self.eltq = deque()
    740         xml.sax.handler.ContentHandler.__init__(self)
    741 
    742     # The structure of the output is as follows (for SGE 6.0).  It's
    743     # similar for 6.1, but radically different for 6.2, and is
    744     # undocumented generally.  Unfortunately it's voluminous, and probably
    745     # doesn't scale to large clusters/queues.
    746 
    747     # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
    748     #   <djob_info>
    749     #     <qmaster_response>  <!-- job -->
    750     #       ...
    751     #       <JB_ja_template> 
    752     #         <ulong_sublist>
    753     #         ...             <!-- start_time, state ... -->
    754     #         </ulong_sublist>
    755     #       </JB_ja_template> 
    756     #       <JB_ja_tasks>
    757     #         <ulong_sublist>
    758     #           ...           <!-- task info
    759     #         </ulong_sublist>
    760     #         ...
    761     #       </JB_ja_tasks>
    762     #       ...
    763     #     </qmaster_response>
    764     #   </djob_info>
    765     #   <messages>
    766     #   ...
    767 
    768     # NB.  We might treat each task as a separate job, like
    769     # straight qstat output, but the web interface expects jobs to
    770     # be identified by integers, not, say, <job number>.<task>.
    771 
    772     # So, I lied.  If the job list is empty, we get invalid XML
    773     # like this, which we need to defend against:
    774 
    775     # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
    776     #   <>
    777     #     <ST_name>*</ST_name>
    778     #   </>
    779     # </unknown_jobs>
    780 
    781     def startElement(self, name, attrs):
    782         self.value = ""
    783         if name == "djob_info": # job list
    784             self.in_joblist = True
    785         # The job container is "qmaster_response" in SGE 6.0
    786         # and 6.1, but "element" in 6.2.  This is only the very
    787         # start of what's necessary for 6.2, though (sigh).
    788         elif (name == "qmaster_response" or name == "element") \
    789                 and self.eltq[-1] == "djob_info": # job
    790             self.job = {"job_state": "U", "slots": 0,
    791                     "nodes": [], "queued_timestamp": "",
    792                     "queued_timestamp": "", "queue": "",
    793                     "ppn": "0", "RN_max": 0,
    794                     # fixme in endElement
    795                     "requested_memory": 0, "requested_time": 0
    796                     }
    797             self.joblist.append(self.job)
    798         elif name == "qstat_l_requests": # resource request
    799             self.lrequest = True
    800         elif name == "unknown_jobs":
    801             raise NoJobs
    802         self.eltq.append (name)
    803 
    804     def characters(self, ch):
    805         self.value += ch
    806 
    807     def endElement(self, name):
    808         """Snarf job elements contents into job dictionary.
    809            Translate keys if appropriate."""
    810 
    811         name_trans = {
    812           "JB_job_number": "number",
    813           "JB_job_name": "name", "JB_owner": "owner",
    814           "queue_name": "queue", "JAT_start_time": "start_timestamp",
    815           "JB_submission_time": "queued_timestamp"
    816           }
    817         value = self.value
    818         self.eltq.pop ()
    819 
    820         if name == "djob_info":
    821             self.in_joblist = False
    822             self.job = {}
    823         elif name == "JAT_master_queue":
    824             self.job["queue"] = value.split("@")[0]
    825         elif name == "JG_qhostname":
    826             if not (value in self.job["nodes"]):
    827                 self.job["nodes"].append(value)
    828         elif name == "JG_slots": # slots in use
    829             self.job["slots"] += int(value)
    830         elif name == "RN_max": # requested slots (tasks or parallel)
    831             self.job["RN_max"] = max (self.job["RN_max"],
    832                           int(value))
    833         elif name == "JAT_state": # job state (bitwise or)
    834             value = int (value)
    835             # Status values from sge_jobL.h
    836             #define JIDLE                   0x00000000
    837             #define JHELD                   0x00000010
    838             #define JMIGRATING              0x00000020
    839             #define JQUEUED                 0x00000040
    840             #define JRUNNING                0x00000080
    841             #define JSUSPENDED              0x00000100
    842             #define JTRANSFERING            0x00000200
    843             #define JDELETED                0x00000400
    844             #define JWAITING                0x00000800
    845             #define JEXITING                0x00001000
    846             #define JWRITTEN                0x00002000
    847             #define JSUSPENDED_ON_THRESHOLD 0x00010000
    848             #define JFINISHED               0x00010000
    849             if value & 0x80:
    850                 self.job["status"] = "R"
    851             elif value & 0x40:
    852                 self.job["status"] = "Q"
    853             else:
    854                 self.job["status"] = "O" # `other'
    855         elif name == "CE_name" and self.lrequest and self.value in \
    856                 ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
    857             # We're in a container for an interesting resource
    858             # request; record which type.
    859             self.lrequest = self.value
    860         elif name == "CE_doubleval" and self.lrequest:
    861             # if we're in a container for an interesting
    862             # resource request, use the maxmimum of the hard
    863             # and soft requests to record the requested CPU
    864             # or core.  Fixme:  I'm not sure if this logic is
    865             # right.
    866             if self.lrequest in ("h_core", "s_core"):
    867                 self.job["requested_memory"] = \
    868                     max (float (value),
    869                      self.job["requested_memory"])
    870             # Fixme:  Check what cpu means, c.f [hs]_cpu.
    871             elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
    872                 self.job["requested_time"] = \
    873                     max (float (value),
    874                      self.job["requested_time"])
    875         elif name == "qstat_l_requests":
    876             self.lrequest = False
    877         elif self.job and self.in_joblist:
    878             if name in name_trans:
    879                 name = name_trans[name]
    880                 self.job[name] = value
    881 
    882718# Abstracted from PBS original.
    883719#
     
    914750    return nodeslist
    915751
    916 class SgeDataGatherer(DataGatherer):
    917 
    918     jobs = {}
    919 
    920     def __init__( self ):
    921         self.jobs = {}
    922         self.timeoffset = 0
    923         self.dp = DataProcessor()
    924 
    925     def getJobData( self ):
    926         """Gather all data on current jobs in SGE"""
    927 
    928         import popen2
    929 
    930         self.cur_time = 0
    931         queues = ""
    932         if QUEUE:   # only for specific queues
    933             # Fixme:  assumes queue names don't contain single
    934             # quote or comma.  Don't know what the SGE rules are.
    935             queues = " -q '" + string.join (QUEUE, ",") + "'"
    936         # Note the comment in SgeQstatXMLParser about scaling with
    937         # this method of getting data.  I haven't found better one.
    938         # Output with args `-xml -ext -f -r' is easier to parse
    939         # in some ways, harder in others, but it doesn't provide
    940         # the submission time (at least SGE 6.0).  The pipeline
    941         # into sed corrects bogus XML observed with a configuration
    942         # of SGE 6.0u8, which otherwise causes the parsing to hang.
    943         piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
    944 sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
    945                            + queues, True)
    946         qstatparser = SgeQstatXMLParser()
    947         parse_err = 0
    948         try:
    949             xml.sax.parse(piping.fromchild, qstatparser)
    950         except NoJobs:
    951             pass
    952         except:
    953             parse_err = 1
    954             if piping.wait():
    955                 debug_msg(10,
    956                   "qstat error, skipping until next polling interval: "
    957                   + piping.childerr.readline())
    958                 return None
    959             elif parse_err:
    960                 debug_msg(10, "Bad XML output from qstat"())
    961                 exit (1)
    962         for f in piping.fromchild, piping.tochild, piping.childerr:
    963             f.close()
    964         self.cur_time = time.time()
    965         jobs_processed = []
    966         for job in qstatparser.joblist:
    967             job_id = job["number"]
    968             if job["status"] in [ 'Q', 'R' ]:
    969                 jobs_processed.append(job_id)
    970             if job["status"] == "R":
    971                 job["nodes"] = do_nodelist (job["nodes"])
    972                 # Fixme: why is job["nodes"] sometimes null?
    973                 try:
    974                     # Fixme: Is this sensible?  The
    975                     # PBS-type PPN isn't something you use
    976                     # with SGE.
    977                     job["ppn"] = float(job["slots"]) / \
    978                         len(job["nodes"])
    979                 except:
    980                     job["ppn"] = 0
    981                 if DETECT_TIME_DIFFS:
    982                     # If a job start is later than our
    983                     # current date, that must mean
    984                     # the SGE server's time is later
    985                     # than our local time.
    986                     start_timestamp = \
    987                         int (job["start_timestamp"])
    988                     if start_timestamp > \
    989                             int(self.cur_time) + \
    990                             int(self.timeoffset):
    991 
    992                         self.timeoffset = \
    993                             start_timestamp - \
    994                             int(self.cur_time)
    995             else:
    996                 # fixme: Note sure what this should be:
    997                 job["ppn"] = job["RN_max"]
    998                 job["nodes"] = "1"
    999 
    1000             myAttrs = {}
    1001             for attr in ["name", "queue", "owner",
    1002                      "requested_time", "status",
    1003                      "requested_memory", "ppn",
    1004                      "start_timestamp", "queued_timestamp"]:
    1005                 myAttrs[attr] = str(job[attr])
    1006             myAttrs["nodes"] = job["nodes"]
    1007             myAttrs["reported"] = str(int(self.cur_time) + \
    1008                           int(self.timeoffset))
    1009             myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
    1010             myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
    1011 
    1012             if self.jobDataChanged(self.jobs, job_id, myAttrs) \
    1013                     and myAttrs["status"] in ["R", "Q"]:
    1014                 self.jobs[job_id] = myAttrs
    1015         for id, attrs in self.jobs.items():
    1016             if id not in jobs_processed:
    1017                 del self.jobs[id]
    1018 
    1019 # LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
    1020 # Requres LSFObject http://sourceforge.net/projects/lsfobject
    1021 #
    1022 class LsfDataGatherer(DataGatherer):
    1023 
    1024         """This is the DataGatherer for LSf"""
    1025 
    1026         global lsfObject
    1027 
    1028         def __init__( self ):
    1029 
    1030                 self.jobs = { }
    1031                 self.timeoffset = 0
    1032                 self.dp = DataProcessor()
    1033                 self.initLsfQuery()
    1034 
    1035         def _countDuplicatesInList( self, dupedList ):
    1036 
    1037             countDupes  = { }
    1038 
    1039             for item in dupedList:
    1040 
    1041                 if not countDupes.has_key( item ):
    1042 
    1043                     countDupes[ item ]  = 1
    1044                 else:
    1045                     countDupes[ item ]  = countDupes[ item ] + 1
    1046 
    1047             dupeCountList   = [ ]
    1048 
    1049             for item, count in countDupes.items():
    1050 
    1051                 dupeCountList.append( ( item, count ) )
    1052 
    1053             return dupeCountList
    1054 #
    1055 #lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
    1056 #print _countDuplicatesInList(lst)
    1057 #[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
    1058 ########################
    1059 
    1060         def initLsfQuery( self ):
    1061                 self.pq = None
    1062                 self.pq = lsfObject.jobInfoEntObject()
    1063 
    1064         def getJobData( self, known_jobs="" ):
    1065                 """Gather all data on current jobs in LSF"""
    1066                 if len( known_jobs ) > 0:
    1067                         jobs = known_jobs
    1068                 else:
    1069                         jobs = { }
    1070                 joblist = {}
    1071                 joblist = self.pq.getJobInfo()
    1072                 nodelist = ''
    1073 
    1074                 self.cur_time = time.time()
    1075 
    1076                 jobs_processed = [ ]
    1077 
    1078                 for name, attrs in joblist.items():
    1079                         job_id = str(name)
    1080                         jobs_processed.append( job_id )
    1081                         name = self.getAttr( attrs, 'jobName' )
    1082                         queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
    1083                         owner = self.getAttr( attrs, 'user' )
    1084 
    1085 ### THIS IS THE rLimit List index values
    1086 #define LSF_RLIMIT_CPU      0            /* cpu time in milliseconds */
    1087 #define LSF_RLIMIT_FSIZE    1            /* maximum file size */
    1088 #define LSF_RLIMIT_DATA     2            /* data size */
    1089 #define LSF_RLIMIT_STACK    3            /* stack size */
    1090 #define LSF_RLIMIT_CORE     4            /* core file size */
    1091 #define LSF_RLIMIT_RSS      5            /* resident set size */
    1092 #define LSF_RLIMIT_NOFILE   6            /* open files */
    1093 #define LSF_RLIMIT_OPEN_MAX 7            /* (from HP-UX) */
    1094 #define LSF_RLIMIT_VMEM     8            /* maximum swap mem */
    1095 #define LSF_RLIMIT_SWAP     8
    1096 #define LSF_RLIMIT_RUN      9            /* max wall-clock time limit */
    1097 #define LSF_RLIMIT_PROCESS  10           /* process number limit */
    1098 #define LSF_RLIMIT_THREAD   11           /* thread number limit (introduced in LSF6.0) */
    1099 #define LSF_RLIM_NLIMITS    12           /* number of resource limits */
    1100 
    1101                         requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
    1102                         if requested_time == -1:
    1103                                 requested_time = ""
    1104                         requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
    1105                         if requested_memory == -1:
    1106                                 requested_memory = ""
    1107 # This tries to get proc per node. We don't support this right now
    1108                         ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
    1109                         requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
    1110                         if requested_cpus == None or requested_cpus == "":
    1111                                 requested_cpus = 1
    1112 
    1113                         if QUEUE:
    1114                             for q in QUEUE:
    1115                                 if q == queue:
    1116                                     display_queue = 1
    1117                                     break
    1118                                 else:
    1119                                     display_queue = 0
    1120                                     continue
    1121                         if display_queue == 0:
    1122                             continue
    1123 
    1124                         runState = self.getAttr( attrs, 'status' )
    1125                         if runState == 4:
    1126                                 status = 'R'
    1127                         else:
    1128                                 status = 'Q'
    1129                         queued_timestamp = self.getAttr( attrs, 'submitTime' )
    1130 
    1131                         if status == 'R':
    1132                                 start_timestamp = self.getAttr( attrs, 'startTime' )
    1133                                 nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
    1134                                 nodelist = nodesCpu.keys()
    1135 
    1136                                 if DETECT_TIME_DIFFS:
    1137 
    1138                                         # If a job start if later than our current date,
    1139                                         # that must mean the Torque server's time is later
    1140                                         # than our local time.
    1141 
    1142                                         if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
    1143 
    1144                                                 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
    1145 
    1146                         elif status == 'Q':
    1147                                 start_timestamp = ''
    1148                                 count_mynodes = 0
    1149                                 numeric_node = 1
    1150                                 nodelist = ''
    1151 
    1152                         myAttrs = { }
    1153                         if name == "":
    1154                                 myAttrs['name'] = "none"
    1155                         else:
    1156                                 myAttrs['name'] = name
    1157 
    1158                         myAttrs[ 'owner' ]      = owner
    1159                         myAttrs[ 'requested_time' ] = str(requested_time)
    1160                         myAttrs[ 'requested_memory' ]   = str(requested_memory)
    1161                         myAttrs[ 'requested_cpus' ] = str(requested_cpus)
    1162                         myAttrs[ 'ppn' ]        = str( ppn )
    1163                         myAttrs[ 'status' ]     = status
    1164                         myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
    1165                         myAttrs[ 'queue' ]      = str(queue)
    1166                         myAttrs[ 'queued_timestamp' ]   = str(queued_timestamp)
    1167                         myAttrs[ 'reported' ]       = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    1168                         myAttrs[ 'nodes' ]      = do_nodelist( nodelist )
    1169                         myAttrs[ 'domain' ]     = fqdn_parts( socket.getfqdn() )[1]
    1170                         myAttrs[ 'poll_interval' ]  = str(BATCH_POLL_INTERVAL)
    1171 
    1172                         if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
    1173                                 jobs[ job_id ] = myAttrs
    1174 
    1175                                 debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
    1176 
    1177                 for id, attrs in jobs.items():
    1178                         if id not in jobs_processed:
    1179                                 # This one isn't there anymore
    1180                                 #
    1181                                 del jobs[ id ]
    1182 
    1183                 self.jobs = jobs
    1184 
    1185752class PbsDataGatherer( DataGatherer ):
    1186753
     
    1235802            queue           = self.getAttr( attrs, 'queue' )
    1236803            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
    1237             exec_group      = self.getAttr( attrs, 'egroup' )
    1238804
    1239805            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
     
    1374940            myAttrs[ 'owner' ]             = str( owner )
    1375941            myAttrs[ 'nodect' ]            = str( nodect )
    1376             myAttrs[ 'exec.group' ]        = str( exec_group )
    1377942            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
    1378943            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
Note: See TracChangeset for help on using the changeset viewer.