Changeset 26 for trunk/plugin


Ignore:
Timestamp:
03/31/05 16:15:11 (19 years ago)
Author:
bastiaans
Message:

plugin/togap.py:

  • Setup daemon style
  • Will compile a ever expanding list of jobs
    • Poll PBS every second
  • Will check for changes since last poll
    • Update internal joblist when info changed
  • Will check for jobs that disappear from PBS' joblist
    • Stamp those jobs with end time when they are gone and update status


File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/plugin/togap.py

    r25 r26  
    11#!/usr/bin/env python
    22
     3# Specify debugging level here;
     4#
     5DEBUG_LEVEL = 10
     6
     7# Wether or not to run as a daemon in background
     8#
     9DAEMONIZE = 0
     10
    311from PBSQuery import PBSQuery
     12import sys
     13import time
    414
    515class PBSDataGatherer:
     
    818
    919                self.pq = PBSQuery()
     20                self.jobs = { }
     21
     22        def getAttr( self, attrs, name ):
     23
     24                if attrs.has_key( name ):
     25                        return attrs[name]
     26                else:
     27                        return ''
     28
     29        def jobDataChanged( self, jobs, job_id, attrs ):
     30
     31                if jobs.has_key( job_id ):
     32                        oldData = jobs[ job_id ]       
     33                else:
     34                        return 1
     35
     36                for name, val in attrs.items():
     37
     38                        if oldData.has_key( name ):
     39
     40                                if oldData[ name ] != attrs[ name ]:
     41
     42                                        return 1
     43
     44                        else:
     45                                return 1
     46
     47                return 0
     48
     49        def getJobData( self ):
     50
     51                jobs = self.jobs
     52
     53                joblist = self.pq.getjobs()
     54
     55                jobs_processed = [ ]
     56
     57                for name, attrs in joblist.items():
     58
     59                        job_id = name.split( '.' )[0]
     60
     61                        jobs_processed.append( job_id )
     62                       
     63                        name = self.getAttr( attrs, 'Job_Name' )
     64                        queue = self.getAttr( attrs, 'queue' )
     65                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
     66                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
     67                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
     68                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
     69                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
     70                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
     71                        else:
     72                                ppn = ''
     73                        status = self.getAttr( attrs, 'job_state' )
     74                        start_timestamp = self.getAttr( attrs, 'mtime' )
     75                        stop_timestamp = ''
     76
     77                        myAttrs = { }
     78                        myAttrs['name'] = name
     79                        myAttrs['queue'] = queue
     80                        myAttrs['owner'] = owner
     81                        myAttrs['requested_time'] = requested_time
     82                        myAttrs['requested_memory'] = requested_memory
     83                        myAttrs['ppn'] = ppn
     84                        myAttrs['status'] = status
     85                        myAttrs['start_timestamp'] = start_timestamp
     86                        myAttrs['stop_timestamp'] = stop_timestamp
     87
     88                        if self.jobDataChanged( jobs, job_id, myAttrs ):
     89                                jobs[ job_id ] = myAttrs
     90                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
     91                                self.printJob( job_id )
     92
     93                for id, attrs in jobs.items():
     94
     95                        # This job was there in the last run, and not anymore
     96                        # it must have finished
     97
     98                        if id not in jobs_processed and attrs['stop_timestamp'] == '':
     99
     100                                jobs[ id ]['status'] = 'D'
     101                                jobs[ id ]['stop_timestamp'] = time.time()
     102                                debug_msg( 10, printTime() + ' job %s finished' %(id) )
     103                                self.printJob( id )
     104
     105                self.jobs = jobs
     106
     107        def printJobs( self ):
     108       
     109                for name, attrs in self.jobs.items():
     110
     111                        print 'job %s' %(name)
     112
     113                        for name, val in attrs.items():
     114
     115                                print '\t%s = %s' %( name, val )
     116
     117        def printJob( self, job_id ):
     118
     119                print 'job %s' %(job_id)
     120
     121                for name, val in self.jobs[ job_id ].items():
     122
     123                        print '\t%s = %s' %( name, val )
     124
     125        def daemon( self ):
     126                "Run as daemon forever"
     127
     128                self.DAEMON = 1
     129
     130                # Fork the first child
     131                #
     132                pid = os.fork()
     133                if pid > 0:
     134                        sys.exit(0)  # end parrent
     135
     136                # creates a session and sets the process group ID
     137                #
     138                os.setsid()
     139
     140                # Fork the second child
     141                #
     142                pid = os.fork()
     143                if pid > 0:
     144                        sys.exit(0)  # end parrent
     145
     146                # Go to the root directory and set the umask
     147                #
     148                os.chdir('/')
     149                os.umask(0)
     150
     151                sys.stdin.close()
     152                sys.stdout.close()
     153                sys.stderr.close()
     154
     155                os.open('/dev/null', 0)
     156                os.dup(0)
     157                os.dup(0)
     158
     159                self.run()
     160
     161        def run( self ):
     162                "Main thread"
     163
     164                while ( 1 ):
    10165               
    11         def getJobList( self ):
     166                        self.getJobData()
     167                        time.sleep( 1 )
    12168
    13                 joblist = self.pq.getjobs().items
    14                 #for name, job in joblist:
    15                        
     169def printTime( ):
     170
     171        return time.strftime("%a, %d %b %Y %H:%M:%S")
     172
     173def debug_msg( level, msg ):
     174
     175        if (DEBUG_LEVEL >= level):
     176                        sys.stderr.write( msg + '\n' )
    16177
    17178def main():
    18179
    19180        gather = PBSDataGatherer()
    20 
    21         print 'blaat'
     181        if DAEMONIZE:
     182                gather.daemon()
     183        else:
     184                gather.run()
    22185
    23186if __name__ == '__main__':
Note: See TracChangeset for help on using the changeset viewer.