source: trunk/plugin/togap.py @ 26

Last change on this file since 26 was 26, checked in by bastiaans, 19 years ago

plugin/togap.py:

  • Setup daemon style
  • Will compile a ever expanding list of jobs
    • Poll PBS every second
  • Will check for changes since last poll
    • Update internal joblist when info changed
  • Will check for jobs that disappear from PBS' joblist
    • Stamp those jobs with end time when they are gone and update status


File size: 4.0 KB
Line 
1#!/usr/bin/env python
2
3# Specify debugging level here;
4#
5DEBUG_LEVEL = 10
6
7# Wether or not to run as a daemon in background
8#
9DAEMONIZE = 0
10
11from PBSQuery import PBSQuery
12import sys
13import time
14
15class PBSDataGatherer:
16
17        def __init__( self ):
18
19                self.pq = PBSQuery()
20                self.jobs = { }
21
22        def getAttr( self, attrs, name ):
23
24                if attrs.has_key( name ):
25                        return attrs[name]
26                else:
27                        return ''
28
29        def jobDataChanged( self, jobs, job_id, attrs ):
30
31                if jobs.has_key( job_id ):
32                        oldData = jobs[ job_id ]       
33                else:
34                        return 1
35
36                for name, val in attrs.items():
37
38                        if oldData.has_key( name ):
39
40                                if oldData[ name ] != attrs[ name ]:
41
42                                        return 1
43
44                        else:
45                                return 1
46
47                return 0
48
49        def getJobData( self ):
50
51                jobs = self.jobs
52
53                joblist = self.pq.getjobs()
54
55                jobs_processed = [ ]
56
57                for name, attrs in joblist.items():
58
59                        job_id = name.split( '.' )[0]
60
61                        jobs_processed.append( job_id )
62                       
63                        name = self.getAttr( attrs, 'Job_Name' )
64                        queue = self.getAttr( attrs, 'queue' )
65                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
66                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
67                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
68                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
69                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
70                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
71                        else:
72                                ppn = ''
73                        status = self.getAttr( attrs, 'job_state' )
74                        start_timestamp = self.getAttr( attrs, 'mtime' )
75                        stop_timestamp = ''
76
77                        myAttrs = { }
78                        myAttrs['name'] = name
79                        myAttrs['queue'] = queue
80                        myAttrs['owner'] = owner
81                        myAttrs['requested_time'] = requested_time
82                        myAttrs['requested_memory'] = requested_memory
83                        myAttrs['ppn'] = ppn
84                        myAttrs['status'] = status
85                        myAttrs['start_timestamp'] = start_timestamp
86                        myAttrs['stop_timestamp'] = stop_timestamp
87
88                        if self.jobDataChanged( jobs, job_id, myAttrs ):
89                                jobs[ job_id ] = myAttrs
90                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
91                                self.printJob( job_id )
92
93                for id, attrs in jobs.items():
94
95                        # This job was there in the last run, and not anymore
96                        # it must have finished
97
98                        if id not in jobs_processed and attrs['stop_timestamp'] == '':
99
100                                jobs[ id ]['status'] = 'D'
101                                jobs[ id ]['stop_timestamp'] = time.time()
102                                debug_msg( 10, printTime() + ' job %s finished' %(id) )
103                                self.printJob( id )
104
105                self.jobs = jobs
106
107        def printJobs( self ):
108       
109                for name, attrs in self.jobs.items():
110
111                        print 'job %s' %(name)
112
113                        for name, val in attrs.items():
114
115                                print '\t%s = %s' %( name, val )
116
117        def printJob( self, job_id ):
118
119                print 'job %s' %(job_id)
120
121                for name, val in self.jobs[ job_id ].items():
122
123                        print '\t%s = %s' %( name, val )
124
125        def daemon( self ):
126                "Run as daemon forever"
127
128                self.DAEMON = 1
129
130                # Fork the first child
131                #
132                pid = os.fork()
133                if pid > 0:
134                        sys.exit(0)  # end parrent
135
136                # creates a session and sets the process group ID
137                #
138                os.setsid()
139
140                # Fork the second child
141                #
142                pid = os.fork()
143                if pid > 0:
144                        sys.exit(0)  # end parrent
145
146                # Go to the root directory and set the umask
147                #
148                os.chdir('/')
149                os.umask(0)
150
151                sys.stdin.close()
152                sys.stdout.close()
153                sys.stderr.close()
154
155                os.open('/dev/null', 0)
156                os.dup(0)
157                os.dup(0)
158
159                self.run()
160
161        def run( self ):
162                "Main thread"
163
164                while ( 1 ):
165               
166                        self.getJobData()
167                        time.sleep( 1 ) 
168
169def printTime( ):
170
171        return time.strftime("%a, %d %b %Y %H:%M:%S")
172
173def debug_msg( level, msg ):
174
175        if (DEBUG_LEVEL >= level):
176                        sys.stderr.write( msg + '\n' )
177
178def main():
179
180        gather = PBSDataGatherer()
181        if DAEMONIZE:
182                gather.daemon()
183        else:
184                gather.run()
185
186if __name__ == '__main__':
187        main()
Note: See TracBrowser for help on using the repository browser.