Changeset 837
- Timestamp:
- 04/24/13 18:39:26 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.0/jobmond/jobmond.py
r829 r837 26 26 27 27 import sys, getopt, ConfigParser, time, os, socket, string, re 28 import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path 28 import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd 29 29 from xml.sax.handler import feature_namespaces 30 30 from collections import deque … … 1250 1250 return nodeslist 1251 1251 1252 class SLURMDataGatherer( DataGatherer ): 1253 1254 global pyslurm 1255 1256 """This is the DataGatherer for SLURM""" 1257 1258 def __init__( self ): 1259 1260 """Setup appropriate variables""" 1261 1262 self.jobs = { } 1263 self.timeoffset = 0 1264 self.dp = DataProcessor() 1265 1266 def getNodeData( self ): 1267 1268 slurm_type = pyslurm.node() 1269 1270 nodedict = slurm_type.get() 1271 1272 return nodedict 1273 1274 def getJobData( self ): 1275 1276 """Gather all data on current jobs""" 1277 1278 joblist = {} 1279 1280 self.cur_time = time.time() 1281 1282 slurm_type = pyslurm.job() 1283 joblist = slurm_type.get() 1284 1285 jobs_processed = [ ] 1286 1287 for name, attrs in joblist.items(): 1288 display_queue = 1 1289 job_id = name 1290 1291 name = self.getAttr( attrs, 'name' ) 1292 queue = self.getAttr( attrs, 'partition' ) 1293 1294 if QUEUE: 1295 for q in QUEUE: 1296 if q == queue: 1297 display_queue = 1 1298 break 1299 else: 1300 display_queue = 0 1301 continue 1302 if display_queue == 0: 1303 continue 1304 1305 owner_uid = attrs[ 'user_id' ] 1306 ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid ) 1307 1308 requested_time = self.getAttr( attrs, 'time_limit' ) 1309 requested_memory = self.getAttr( attrs, 'pn_min_memory' ) 1310 1311 ppn = self.getAttr( attrs, 'ntasks_per_node' ) 1312 1313 ( something, status_long ) = self.getAttr( attrs, 'job_state' ) 1314 1315 status = 'Q' 1316 1317 if status_long == 'RUNNING': 1318 1319 status = 'R' 1320 1321 elif status_long == 'COMPLETED': 1322 1323 continue 1324 1325 jobs_processed.append( job_id ) 1326 1327 queued_timestamp = self.getAttr( attrs, 'submit_time' ) 1328 1329 start_timestamp = '' 1330 nodeslist = '' 1331 1332 if status == 'R': 1333 1334 start_timestamp = self.getAttr( attrs, 'start_time' ) 1335 nodes = attrs[ 'alloc_node' ].split(',') 1336 1337 nodeslist = do_nodelist( nodes ) 1338 1339 if DETECT_TIME_DIFFS: 1340 1341 # If a job start if later than our current date, 1342 # that must mean the Torque server's time is later 1343 # than our local time. 1344 1345 if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ): 1346 1347 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 1348 1349 elif status == 'Q': 1350 1351 nodeslist = str( attrs[ 'num_nodes' ] ) 1352 1353 else: 1354 start_timestamp = '' 1355 nodeslist = '' 1356 1357 myAttrs = { } 1358 1359 myAttrs[ 'name' ] = str( name ) 1360 myAttrs[ 'queue' ] = str( queue ) 1361 myAttrs[ 'owner' ] = str( owner ) 1362 myAttrs[ 'requested_time' ] = str( requested_time ) 1363 myAttrs[ 'requested_memory' ] = str( requested_memory ) 1364 myAttrs[ 'ppn' ] = str( ppn ) 1365 myAttrs[ 'status' ] = str( status ) 1366 myAttrs[ 'start_timestamp' ] = str( start_timestamp ) 1367 myAttrs[ 'queued_timestamp' ] = str( queued_timestamp ) 1368 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1369 myAttrs[ 'nodes' ] = nodeslist 1370 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1371 myAttrs[ 'poll_interval' ] = str( BATCH_POLL_INTERVAL ) 1372 1373 if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 1374 1375 self.jobs[ job_id ] = myAttrs 1376 1377 for id, attrs in self.jobs.items(): 1378 1379 if id not in jobs_processed: 1380 1381 # This one isn't there anymore; toedeledoki! 1382 # 1383 del self.jobs[ id ] 1384 1252 1385 class SgeDataGatherer(DataGatherer): 1253 1386 … … 1905 2038 """Application start""" 1906 2039 1907 global PBSQuery, PBSError, lsfObject 2040 global PBSQuery, PBSError, lsfObject, pyslurm 1908 2041 global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE 1909 2042 … … 1944 2077 gather = LsfDataGatherer() 1945 2078 2079 elif BATCH_API == 'slurm': 2080 2081 try: 2082 import pyslurm 2083 except: 2084 print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed" 2085 sys.exit( 1) 2086 2087 gather = SLURMDataGatherer() 2088 1946 2089 else: 1947 2090 print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
Note: See TracChangeset
for help on using the changeset viewer.