Changeset 524


Ignore:
Timestamp:
03/13/08 17:23:30 (15 years ago)
Author:
bastiaans
Message:

jobmond/jobmond.py:

  • added LSF support by Mahmoud Hanafi
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobmond/jobmond.py

    r520 r524  
    300300                gmetric_dest_ip         = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
    301301
    302         gmetric_dest_port       = gcp.getStr( 'udp_send_channel', 'port' )
     302        gmetric_dest_port       = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
    303303
    304304        if gmetric_dest_ip and gmetric_dest_port:
     
    10541054                                del self.jobs[id]
    10551055
     1056# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
     1057# Requres LSFObject http://sourceforge.net/projects/lsfobject
     1058#
     1059class LsfDataGatherer(DataGatherer):
     1060        """This is the DataGatherer for LSf"""
     1061
     1062        global lsfObject
     1063
     1064        def __init__( self ):
     1065                self.jobs = { }
     1066                self.timeoffset = 0
     1067                self.dp = DataProcessor()
     1068                self.initLsfQuery()
     1069
     1070########################
     1071## THIS IS TAKEN FROM
     1072## http://bigbadcode.com/2007/04/04/count-the-duplicates-in-a-python-list/
     1073        from sets import Set
     1074#
     1075        def _countDuplicatesInList(self,dupedList):
     1076                uniqueSet = self.Set(item for item in dupedList)
     1077                return [(item, dupedList.count(item)) for item in uniqueSet]
     1078#
     1079#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
     1080#print _countDuplicatesInList(lst)
     1081#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
     1082########################
     1083
     1084        def initLsfQuery( self ):
     1085                self.pq = None
     1086                self.pq = lsfObject.jobInfoEntObject()
     1087
     1088        def getAttr( self, attrs, name ):
     1089                """Return certain attribute from dictionary, if exists"""
     1090                if attrs.has_key( name ):
     1091                        return attrs[name]
     1092                else:
     1093                        return ''
     1094
     1095        def getJobData( self, known_jobs="" ):
     1096                """Gather all data on current jobs in LSF"""
     1097                if len( known_jobs ) > 0:
     1098                        jobs = known_jobs
     1099                else:
     1100                        jobs = { }
     1101                joblist = {}
     1102                joblist = self.pq.getJobInfo()
     1103                nodelist = ''
     1104
     1105                self.cur_time = time.time()
     1106
     1107                jobs_processed = [ ]
     1108
     1109                for name, attrs in joblist.items():
     1110                        job_id = str(name)
     1111                        jobs_processed.append( job_id )
     1112                        name = self.getAttr( attrs, 'jobName' )
     1113                        queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
     1114                        owner = self.getAttr( attrs, 'user' )
     1115
     1116### THIS IS THE rLimit List index values
     1117#define LSF_RLIMIT_CPU      0            /* cpu time in milliseconds */
     1118#define LSF_RLIMIT_FSIZE    1            /* maximum file size */
     1119#define LSF_RLIMIT_DATA     2            /* data size */
     1120#define LSF_RLIMIT_STACK    3            /* stack size */
     1121#define LSF_RLIMIT_CORE     4            /* core file size */
     1122#define LSF_RLIMIT_RSS      5            /* resident set size */
     1123#define LSF_RLIMIT_NOFILE   6            /* open files */
     1124#define LSF_RLIMIT_OPEN_MAX 7            /* (from HP-UX) */
     1125#define LSF_RLIMIT_VMEM     8            /* maximum swap mem */
     1126#define LSF_RLIMIT_SWAP     8
     1127#define LSF_RLIMIT_RUN      9            /* max wall-clock time limit */
     1128#define LSF_RLIMIT_PROCESS  10           /* process number limit */
     1129#define LSF_RLIMIT_THREAD   11           /* thread number limit (introduced in LSF6.0) */
     1130#define LSF_RLIM_NLIMITS    12           /* number of resource limits */
     1131
     1132                        requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
     1133                        if requested_time == -1:
     1134                                requested_time = ""
     1135                        requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
     1136                        if requested_memory == -1:
     1137                                requested_memory = ""
     1138# This tries to get proc per node. We don't support this right now
     1139                        ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
     1140                        requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
     1141                        if requested_cpus == None or requested_cpus == "":
     1142                                requested_cpus = 1
     1143
     1144                        if QUEUE:
     1145                                for q in QUEUE:
     1146                                        if q == queue:
     1147                                                display_queue = 1
     1148                                                break
     1149                                        else:
     1150                                                display_queue = 0
     1151                                                continue
     1152                        if display_queue == 0:
     1153                                continue
     1154
     1155                        runState = self.getAttr( attrs, 'status' )
     1156                        if runState == 4:
     1157                                status = 'R'
     1158                        else:
     1159                                status = 'Q'
     1160                        queued_timestamp = self.getAttr( attrs, 'submitTime' )
     1161
     1162                        if status == 'R':
     1163                                start_timestamp = self.getAttr( attrs, 'startTime' )
     1164                                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
     1165                                nodelist = nodesCpu.keys()
     1166
     1167                                if DETECT_TIME_DIFFS:
     1168
     1169                                        # If a job start if later than our current date,
     1170                                        # that must mean the Torque server's time is later
     1171                                        # than our local time.
     1172
     1173                                        if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
     1174
     1175                                                self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
     1176
     1177                        elif status == 'Q':
     1178                                start_timestamp = ''
     1179                                count_mynodes = 0
     1180                                numeric_node = 1
     1181                                nodelist = ''
     1182
     1183                        myAttrs = { }
     1184                        if name == "":
     1185                                myAttrs['name'] = "none"
     1186                        else:
     1187                                myAttrs['name'] = name
     1188
     1189                        myAttrs[ 'owner' ]              = owner
     1190                        myAttrs[ 'requested_time' ]     = str(requested_time)
     1191                        myAttrs[ 'requested_memory' ]   = str(requested_memory)
     1192                        myAttrs[ 'requested_cpus' ]     = str(requested_cpus)
     1193                        myAttrs[ 'ppn' ]                = str( ppn )
     1194                        myAttrs[ 'status' ]             = status
     1195                        myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
     1196                        myAttrs[ 'queue' ]              = str(queue)
     1197                        myAttrs[ 'queued_timestamp' ]   = str(queued_timestamp)
     1198                        myAttrs[ 'reported' ]           = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     1199                        myAttrs[ 'nodes' ]              = do_nodelist( nodelist )
     1200                        myAttrs[ 'domain' ]             = fqdn_parts( socket.getfqdn() )[1]
     1201                        myAttrs[ 'poll_interval' ]      = str(BATCH_POLL_INTERVAL)
     1202
     1203                        if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
     1204                                jobs[ job_id ] = myAttrs
     1205
     1206                                #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
     1207
     1208                for id, attrs in jobs.items():
     1209                        if id not in jobs_processed:
     1210                                # This one isn't there anymore; toedeledoki!
     1211                                #
     1212                                del jobs[ id ]
     1213                self.jobs=jobs
     1214
     1215
    10561216class PbsDataGatherer( DataGatherer ):
    10571217
    10581218        """This is the DataGatherer for PBS and Torque"""
    10591219
    1060         global PBSQuery
     1220        global PBSQuery, PBSError
    10611221
    10621222        def __init__( self ):
     
    13911551        """Application start"""
    13921552
    1393         global PBSQuery, PBSError
     1553        global PBSQuery, PBSError, lsfObject
    13941554        global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
    13951555
     
    14161576
    14171577                # Tested with SGE 6.0u11.
    1418 #               debug_msg( 0, "FATAL ERROR: BATCH_API 'sge' implementation is currently broken, check future releases" )
    1419 
    1420 #               sys.exit( 1 )
    1421 
     1578                #
    14221579                gather = SgeDataGatherer()
     1580
     1581        elif BATCH_API == 'lsf':
     1582
     1583                try:
     1584                        from lsfObject import lsfObject
     1585                except:
     1586                        debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
     1587                        sys.exit( 1)
     1588
     1589                gather = LsfDataGatherer()
    14231590
    14241591        else:
     
    14301597
    14311598                syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
    1432 
    14331599
    14341600        if DAEMONIZE:
Note: See TracChangeset for help on using the changeset viewer.