Changeset 318


Ignore:
Timestamp:
04/18/07 16:54:23 (14 years ago)
Author:
bastiaans
Message:

jobmond/jobmond.py:

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobmond/jobmond.py

    r317 r318  
    2424import sys, getopt, ConfigParser
    2525
     26import xml, xml.sax
     27from xml.sax import saxutils, make_parser
     28from xml.sax import make_parser
     29from xml.sax.handler import feature_namespaces
     30
    2631def usage():
    2732
     
    159164        try:
    160165
    161                 QUEUE = cfg.getlist( 'DEFAULT', 'QUEUE' )
     166                QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
    162167
    163168        except ConfigParser.NoOptionError, detail:
     
    267272                os.system( cmd )
    268273
    269 class SgeDataGatherer:
    270 
    271         """Placeholder for Babu Sundaram's SGE implementation"""
     274class DataGatherer:
     275
     276        """Skeleton class for batch system DataGatherer"""
     277
     278        def printJobs( self, jobs ):
     279                """Print a jobinfo overview"""
     280
     281                for name, attrs in self.jobs.items():
     282
     283                        print 'job %s' %(name)
     284
     285                        for name, val in attrs.items():
     286
     287                                print '\t%s = %s' %( name, val )
     288
     289        def printJob( self, jobs, job_id ):
     290                """Print job with job_id from jobs"""
     291
     292                print 'job %s' %(job_id)
     293
     294                for name, val in jobs[ job_id ].items():
     295
     296                        print '\t%s = %s' %( name, val )
    272297
    273298        def daemon( self ):
    274 
    275                 pass
     299                """Run as daemon forever"""
     300
     301                # Fork the first child
     302                #
     303                pid = os.fork()
     304                if pid > 0:
     305                        sys.exit(0)  # end parent
     306
     307                # creates a session and sets the process group ID
     308                #
     309                os.setsid()
     310
     311                # Fork the second child
     312                #
     313                pid = os.fork()
     314                if pid > 0:
     315                        sys.exit(0)  # end parent
     316
     317                write_pidfile()
     318
     319                # Go to the root directory and set the umask
     320                #
     321                os.chdir('/')
     322                os.umask(0)
     323
     324                sys.stdin.close()
     325                sys.stdout.close()
     326                sys.stderr.close()
     327
     328                os.open('/dev/null', os.O_RDWR)
     329                os.dup2(0, 1)
     330                os.dup2(0, 2)
     331
     332                self.run()
    276333
    277334        def run( self ):
    278 
    279                 pass
    280 
    281 class PbsDataGatherer:
     335                """Main thread"""
     336
     337                while ( 1 ):
     338               
     339                        self.jobs = self.getJobData( self.jobs )
     340                        self.submitJobData( self.jobs )
     341                        time.sleep( BATCH_POLL_INTERVAL )       
     342
     343class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
     344
     345        """Babu Sundaram's experimental SGE qstat XML parser"""
     346
     347        def __init__(self, qstatinxml):
     348
     349                self.qstatfile = qstatinxml
     350                self.attribs = {}
     351                self.value = ''
     352                self.jobID = ''
     353                self.currentJobInfo = ''
     354                self.job_list = []
     355                self.EOFFlag = 0
     356                self.jobinfoCount = 0
     357
     358
     359        def startElement(self, name, attrs):
     360
     361                if name == 'job_list':
     362                        self.currentJobInfo = 'Status=' + attrs.get('state', None) + ' '
     363                elif name == 'job_info':
     364                        self.job_list = []
     365                        self.jobinfoCount += 1
     366
     367        def characters(self, ch):
     368
     369                self.value = self.value + ch
     370
     371        def endElement(self, name):
     372
     373                if len(self.value.strip()) > 0 :
     374
     375                        self.currentJobInfo += name + '=' + self.value.strip() + ' '         
     376                elif name != 'job_list':
     377
     378                        self.currentJobInfo += name + '=Unknown '
     379
     380                if name == 'JB_job_number':
     381
     382                        self.jobID = self.value.strip()
     383                        self.job_list.append(self.jobID)         
     384
     385                if name == 'job_list':
     386
     387                        if self.attribs.has_key(self.jobID) == False:
     388                                self.attribs[self.jobID] = self.currentJobInfo
     389                        elif self.attribs.has_key(self.jobID) and self.attribs[self.jobID] != self.currentJobInfo:
     390                                self.attribs[self.jobID] = self.currentJobInfo
     391                        self.currentJobInfo = ''
     392                        self.jobID = ''
     393
     394                elif name == 'job_info' and self.jobinfoCount == 2:
     395
     396                        deljobs = []
     397                        for id in self.attribs:
     398                                try:
     399                                        self.job_list.index(str(id))
     400                                except ValueError:
     401                                        deljobs.append(id)
     402                        for i in deljobs:
     403                                del self.attribs[i]
     404                        deljobs = []
     405                        self.jobinfoCount = 0
     406
     407                self.value = ''
     408
     409class SgeDataGatherer(DataGatherer):
     410
     411        jobs = { }
     412
     413        def __init__( self ):
     414                """Setup appropriate variables"""
     415
     416                self.jobs = { }
     417                self.timeoffset = 0
     418                self.dp = DataProcessor()
     419                self.initSgeJobInfo()
     420
     421        def initSgeJobInfo( self ):
     422                """This is outside the scope of DRMAA; Get the current jobs in SGE"""
     423                """This is a hack because we cant get info about jobs beyond"""
     424                """those in the current DRMAA session"""
     425
     426                self.qstatparser = SgeQstatXMLParser( SGE_QSTAT_XML_FILE )
     427
     428                # Obtain the qstat information from SGE in XML format
     429                # This would change to DRMAA-specific calls from 6.0u9
     430
     431        def getJobData(self):
     432                """Gather all data on current jobs in SGE"""
     433
     434                # Get the information about the current jobs in the SGE queue
     435                info = os.popen("qstat -ext -xml").readlines()
     436                f = open(SGE_QSTAT_XML_FILE,'w')
     437                for lines in info:
     438                        f.write(lines)
     439                f.close()
     440
     441                # Parse the input
     442                f = open(self.qstatparser.qstatfile, 'r')
     443                xml.sax.parse(f, self.qstatparser)
     444                f.close()
     445
     446                self.cur_time = time.time()
     447
     448                return self.qstatparser.attribs
     449
     450        def submitJobData(self):
     451                """Submit job info list"""
     452
     453                self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     454                # Now let's spread the knowledge
     455                #
     456                metric_increment = 0
     457                for jobid, jobattrs in self.qstatparser.attribs.items():
     458
     459                        self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), jobattrs)
     460
     461class PbsDataGatherer(DataGatherer):
    282462
    283463        """This is the DataGatherer for PBS and Torque"""
    284 
    285         jobs = { }
    286464
    287465        global PBSQuery
     
    566744                return str_list
    567745
    568         def printJobs( self, jobs ):
    569                 """Print a jobinfo overview"""
    570 
    571                 for name, attrs in self.jobs.items():
    572 
    573                         print 'job %s' %(name)
    574 
    575                         for name, val in attrs.items():
    576 
    577                                 print '\t%s = %s' %( name, val )
    578 
    579         def printJob( self, jobs, job_id ):
    580                 """Print job with job_id from jobs"""
    581 
    582                 print 'job %s' %(job_id)
    583 
    584                 for name, val in jobs[ job_id ].items():
    585 
    586                         print '\t%s = %s' %( name, val )
    587 
    588         def daemon( self ):
    589                 """Run as daemon forever"""
    590 
    591                 # Fork the first child
    592                 #
    593                 pid = os.fork()
    594                 if pid > 0:
    595                         sys.exit(0)  # end parent
    596 
    597                 # creates a session and sets the process group ID
    598                 #
    599                 os.setsid()
    600 
    601                 # Fork the second child
    602                 #
    603                 pid = os.fork()
    604                 if pid > 0:
    605                         sys.exit(0)  # end parent
    606 
    607                 write_pidfile()
    608 
    609                 # Go to the root directory and set the umask
    610                 #
    611                 os.chdir('/')
    612                 os.umask(0)
    613 
    614                 sys.stdin.close()
    615                 sys.stdout.close()
    616                 sys.stderr.close()
    617 
    618                 os.open('/dev/null', os.O_RDWR)
    619                 os.dup2(0, 1)
    620                 os.dup2(0, 2)
    621 
    622                 self.run()
    623 
    624         def run( self ):
    625                 """Main thread"""
    626 
    627                 while ( 1 ):
    628                
    629                         self.jobs = self.getJobData( self.jobs )
    630                         self.submitJobData( self.jobs )
    631                         time.sleep( BATCH_POLL_INTERVAL )       
    632 
    633746def printTime( ):
    634747        """Print current time/date in human readable format for log/debug"""
Note: See TracChangeset for help on using the changeset viewer.