Changeset 318 for trunk/jobmond
- Timestamp:
- 04/18/07 16:54:23 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r317 r318 24 24 import sys, getopt, ConfigParser 25 25 26 import xml, xml.sax 27 from xml.sax import saxutils, make_parser 28 from xml.sax import make_parser 29 from xml.sax.handler import feature_namespaces 30 26 31 def usage(): 27 32 … … 159 164 try: 160 165 161 QUEUE = cfg.getlist( 'DEFAULT', 'QUEUE')166 QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) ) 162 167 163 168 except ConfigParser.NoOptionError, detail: … … 267 272 os.system( cmd ) 268 273 269 class SgeDataGatherer: 270 271 """Placeholder for Babu Sundaram's SGE implementation""" 274 class DataGatherer: 275 276 """Skeleton class for batch system DataGatherer""" 277 278 def printJobs( self, jobs ): 279 """Print a jobinfo overview""" 280 281 for name, attrs in self.jobs.items(): 282 283 print 'job %s' %(name) 284 285 for name, val in attrs.items(): 286 287 print '\t%s = %s' %( name, val ) 288 289 def printJob( self, jobs, job_id ): 290 """Print job with job_id from jobs""" 291 292 print 'job %s' %(job_id) 293 294 for name, val in jobs[ job_id ].items(): 295 296 print '\t%s = %s' %( name, val ) 272 297 273 298 def daemon( self ): 274 275 pass 299 """Run as daemon forever""" 300 301 # Fork the first child 302 # 303 pid = os.fork() 304 if pid > 0: 305 sys.exit(0) # end parent 306 307 # creates a session and sets the process group ID 308 # 309 os.setsid() 310 311 # Fork the second child 312 # 313 pid = os.fork() 314 if pid > 0: 315 sys.exit(0) # end parent 316 317 write_pidfile() 318 319 # Go to the root directory and set the umask 320 # 321 os.chdir('/') 322 os.umask(0) 323 324 sys.stdin.close() 325 sys.stdout.close() 326 sys.stderr.close() 327 328 os.open('/dev/null', os.O_RDWR) 329 os.dup2(0, 1) 330 os.dup2(0, 2) 331 332 self.run() 276 333 277 334 def run( self ): 278 279 pass 280 281 class PbsDataGatherer: 335 """Main thread""" 336 337 while ( 1 ): 338 339 self.jobs = self.getJobData( self.jobs ) 340 self.submitJobData( self.jobs ) 341 time.sleep( BATCH_POLL_INTERVAL ) 342 343 class SgeQstatXMLParser(xml.sax.handler.ContentHandler): 344 345 """Babu Sundaram's experimental SGE qstat XML parser""" 346 347 def __init__(self, qstatinxml): 348 349 self.qstatfile = qstatinxml 350 self.attribs = {} 351 self.value = '' 352 self.jobID = '' 353 self.currentJobInfo = '' 354 self.job_list = [] 355 self.EOFFlag = 0 356 self.jobinfoCount = 0 357 358 359 def startElement(self, name, attrs): 360 361 if name == 'job_list': 362 self.currentJobInfo = 'Status=' + attrs.get('state', None) + ' ' 363 elif name == 'job_info': 364 self.job_list = [] 365 self.jobinfoCount += 1 366 367 def characters(self, ch): 368 369 self.value = self.value + ch 370 371 def endElement(self, name): 372 373 if len(self.value.strip()) > 0 : 374 375 self.currentJobInfo += name + '=' + self.value.strip() + ' ' 376 elif name != 'job_list': 377 378 self.currentJobInfo += name + '=Unknown ' 379 380 if name == 'JB_job_number': 381 382 self.jobID = self.value.strip() 383 self.job_list.append(self.jobID) 384 385 if name == 'job_list': 386 387 if self.attribs.has_key(self.jobID) == False: 388 self.attribs[self.jobID] = self.currentJobInfo 389 elif self.attribs.has_key(self.jobID) and self.attribs[self.jobID] != self.currentJobInfo: 390 self.attribs[self.jobID] = self.currentJobInfo 391 self.currentJobInfo = '' 392 self.jobID = '' 393 394 elif name == 'job_info' and self.jobinfoCount == 2: 395 396 deljobs = [] 397 for id in self.attribs: 398 try: 399 self.job_list.index(str(id)) 400 except ValueError: 401 deljobs.append(id) 402 for i in deljobs: 403 del self.attribs[i] 404 deljobs = [] 405 self.jobinfoCount = 0 406 407 self.value = '' 408 409 class SgeDataGatherer(DataGatherer): 410 411 jobs = { } 412 413 def __init__( self ): 414 """Setup appropriate variables""" 415 416 self.jobs = { } 417 self.timeoffset = 0 418 self.dp = DataProcessor() 419 self.initSgeJobInfo() 420 421 def initSgeJobInfo( self ): 422 """This is outside the scope of DRMAA; Get the current jobs in SGE""" 423 """This is a hack because we cant get info about jobs beyond""" 424 """those in the current DRMAA session""" 425 426 self.qstatparser = SgeQstatXMLParser( SGE_QSTAT_XML_FILE ) 427 428 # Obtain the qstat information from SGE in XML format 429 # This would change to DRMAA-specific calls from 6.0u9 430 431 def getJobData(self): 432 """Gather all data on current jobs in SGE""" 433 434 # Get the information about the current jobs in the SGE queue 435 info = os.popen("qstat -ext -xml").readlines() 436 f = open(SGE_QSTAT_XML_FILE,'w') 437 for lines in info: 438 f.write(lines) 439 f.close() 440 441 # Parse the input 442 f = open(self.qstatparser.qstatfile, 'r') 443 xml.sax.parse(f, self.qstatparser) 444 f.close() 445 446 self.cur_time = time.time() 447 448 return self.qstatparser.attribs 449 450 def submitJobData(self): 451 """Submit job info list""" 452 453 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 454 # Now let's spread the knowledge 455 # 456 metric_increment = 0 457 for jobid, jobattrs in self.qstatparser.attribs.items(): 458 459 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), jobattrs) 460 461 class PbsDataGatherer(DataGatherer): 282 462 283 463 """This is the DataGatherer for PBS and Torque""" 284 285 jobs = { }286 464 287 465 global PBSQuery … … 566 744 return str_list 567 745 568 def printJobs( self, jobs ):569 """Print a jobinfo overview"""570 571 for name, attrs in self.jobs.items():572 573 print 'job %s' %(name)574 575 for name, val in attrs.items():576 577 print '\t%s = %s' %( name, val )578 579 def printJob( self, jobs, job_id ):580 """Print job with job_id from jobs"""581 582 print 'job %s' %(job_id)583 584 for name, val in jobs[ job_id ].items():585 586 print '\t%s = %s' %( name, val )587 588 def daemon( self ):589 """Run as daemon forever"""590 591 # Fork the first child592 #593 pid = os.fork()594 if pid > 0:595 sys.exit(0) # end parent596 597 # creates a session and sets the process group ID598 #599 os.setsid()600 601 # Fork the second child602 #603 pid = os.fork()604 if pid > 0:605 sys.exit(0) # end parent606 607 write_pidfile()608 609 # Go to the root directory and set the umask610 #611 os.chdir('/')612 os.umask(0)613 614 sys.stdin.close()615 sys.stdout.close()616 sys.stderr.close()617 618 os.open('/dev/null', os.O_RDWR)619 os.dup2(0, 1)620 os.dup2(0, 2)621 622 self.run()623 624 def run( self ):625 """Main thread"""626 627 while ( 1 ):628 629 self.jobs = self.getJobData( self.jobs )630 self.submitJobData( self.jobs )631 time.sleep( BATCH_POLL_INTERVAL )632 633 746 def printTime( ): 634 747 """Print current time/date in human readable format for log/debug"""
Note: See TracChangeset
for help on using the changeset viewer.