Changeset 837 for branches/1.0/jobmond


Ignore:
Timestamp:
04/24/13 18:39:26 (11 years ago)
Author:
ramonb
Message:
  • first attempt at SLURM support
  • need to test more: not sure if node reporting works for multinode jobs
  • see #162
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/1.0/jobmond/jobmond.py

    r829 r837  
    2626
    2727import sys, getopt, ConfigParser, time, os, socket, string, re
    28 import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path
     28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
    2929from xml.sax.handler import feature_namespaces
    3030from collections import deque
     
    12501250    return nodeslist
    12511251
     1252class SLURMDataGatherer( DataGatherer ):
     1253
     1254    global pyslurm
     1255
     1256    """This is the DataGatherer for SLURM"""
     1257
     1258    def __init__( self ):
     1259
     1260        """Setup appropriate variables"""
     1261
     1262        self.jobs       = { }
     1263        self.timeoffset = 0
     1264        self.dp         = DataProcessor()
     1265
     1266    def getNodeData( self ):
     1267
     1268        slurm_type = pyslurm.node()
     1269
     1270        nodedict = slurm_type.get()
     1271
     1272        return nodedict
     1273
     1274    def getJobData( self ):
     1275
     1276        """Gather all data on current jobs"""
     1277
     1278        joblist            = {}
     1279
     1280        self.cur_time  = time.time()
     1281
     1282        slurm_type = pyslurm.job()
     1283        joblist    = slurm_type.get()
     1284
     1285        jobs_processed    = [ ]
     1286
     1287        for name, attrs in joblist.items():
     1288            display_queue = 1
     1289            job_id        = name
     1290
     1291            name          = self.getAttr( attrs, 'name' )
     1292            queue         = self.getAttr( attrs, 'partition' )
     1293
     1294            if QUEUE:
     1295                for q in QUEUE:
     1296                    if q == queue:
     1297                        display_queue = 1
     1298                        break
     1299                    else:
     1300                        display_queue = 0
     1301                        continue
     1302            if display_queue == 0:
     1303                continue
     1304
     1305            owner_uid        = attrs[ 'user_id' ]
     1306            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
     1307
     1308            requested_time   = self.getAttr( attrs, 'time_limit' )
     1309            requested_memory = self.getAttr( attrs, 'pn_min_memory' )
     1310
     1311            ppn = self.getAttr( attrs, 'ntasks_per_node' )
     1312
     1313            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
     1314
     1315            status = 'Q'
     1316
     1317            if status_long == 'RUNNING':
     1318
     1319                status = 'R'
     1320
     1321            elif status_long == 'COMPLETED':
     1322
     1323                continue
     1324
     1325            jobs_processed.append( job_id )
     1326
     1327            queued_timestamp = self.getAttr( attrs, 'submit_time' )
     1328
     1329            start_timestamp = ''
     1330            nodeslist       = ''
     1331
     1332            if status == 'R':
     1333
     1334                start_timestamp = self.getAttr( attrs, 'start_time' )
     1335                nodes           = attrs[ 'alloc_node' ].split(',')
     1336
     1337                nodeslist       = do_nodelist( nodes )
     1338
     1339                if DETECT_TIME_DIFFS:
     1340
     1341                    # If a job start if later than our current date,
     1342                    # that must mean the Torque server's time is later
     1343                    # than our local time.
     1344               
     1345                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
     1346
     1347                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
     1348
     1349            elif status == 'Q':
     1350
     1351                nodeslist       = str( attrs[ 'num_nodes' ] )
     1352
     1353            else:
     1354                start_timestamp = ''
     1355                nodeslist       = ''
     1356
     1357            myAttrs                = { }
     1358
     1359            myAttrs[ 'name' ]             = str( name )
     1360            myAttrs[ 'queue' ]            = str( queue )
     1361            myAttrs[ 'owner' ]            = str( owner )
     1362            myAttrs[ 'requested_time' ]   = str( requested_time )
     1363            myAttrs[ 'requested_memory' ] = str( requested_memory )
     1364            myAttrs[ 'ppn' ]              = str( ppn )
     1365            myAttrs[ 'status' ]           = str( status )
     1366            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
     1367            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
     1368            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     1369            myAttrs[ 'nodes' ]            = nodeslist
     1370            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
     1371            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
     1372
     1373            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
     1374
     1375                self.jobs[ job_id ] = myAttrs
     1376
     1377        for id, attrs in self.jobs.items():
     1378
     1379            if id not in jobs_processed:
     1380
     1381                # This one isn't there anymore; toedeledoki!
     1382                #
     1383                del self.jobs[ id ]
     1384
    12521385class SgeDataGatherer(DataGatherer):
    12531386
     
    19052038    """Application start"""
    19062039
    1907     global PBSQuery, PBSError, lsfObject
     2040    global PBSQuery, PBSError, lsfObject, pyslurm
    19082041    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
    19092042
     
    19442077        gather = LsfDataGatherer()
    19452078
     2079    elif BATCH_API == 'slurm':
     2080
     2081        try:
     2082            import pyslurm
     2083        except:
     2084            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
     2085            sys.exit( 1)
     2086
     2087        gather = SLURMDataGatherer()
     2088
    19462089    else:
    19472090        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
Note: See TracChangeset for help on using the changeset viewer.