source: trunk/plugin/togap.py @ 64

Last change on this file since 64 was 64, checked in by bastiaans, 18 years ago

plugin/togap.py:

Misc. code cleanup

File size: 4.9 KB
Line 
1#!/usr/bin/env python
2
3# Specify debugging level here;
4#
5DEBUG_LEVEL = 10
6
7# Wether or not to run as a daemon in background
8#
9DAEMONIZE = 0
10
11# How many seconds interval for polling of jobs
12#
13# this will effect directly how accurate the
14# end time of a job can be determined
15#
16TORQUE_POLL_INTERVAL = 10
17
18from PBSQuery import PBSQuery
19import sys
20import time
21
22class DataProcessor:
23
24        binary = '/usr/bin/gmetric'
25
26        def __init__( self, binary=None ):
27
28                if binary:
29                        self.binary = binary
30
31        def multicastGmetric( self, metricname, metricval, tmax ):
32
33                cmd = binary
34
35                try:
36                        cmd = cmd + ' -c' + GMOND_CONF
37                except NameError:
38                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
39
40                cmd = cmd + ' -n' + metricname + ' -v' + metricval + ' -t' + tmax
41
42                print cmd
43                #os.system( cmd )
44
45class PBSDataGatherer:
46
47        jobs = { }
48
49        def __init__( self ):
50
51                self.pq = PBSQuery()
52                self.jobs = { }
53                self.dp = DataProcessor()
54
55        def getAttr( self, attrs, name ):
56
57                if attrs.has_key( name ):
58                        return attrs[name]
59                else:
60                        return ''
61
62        def jobDataChanged( self, jobs, job_id, attrs ):
63
64                if jobs.has_key( job_id ):
65                        oldData = jobs[ job_id ]       
66                else:
67                        return 1
68
69                for name, val in attrs.items():
70
71                        if oldData.has_key( name ):
72
73                                if oldData[ name ] != attrs[ name ]:
74
75                                        return 1
76
77                        else:
78                                return 1
79
80                return 0
81
82        def getJobData( self ):
83
84                jobs = self.jobs[:]
85
86                joblist = self.pq.getjobs()
87
88                jobs_processed = [ ]
89
90                for name, attrs in joblist.items():
91
92                        job_id = name.split( '.' )[0]
93
94                        jobs_processed.append( job_id )
95
96                        name = self.getAttr( attrs, 'Job_Name' )
97                        queue = self.getAttr( attrs, 'queue' )
98                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
99                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
100                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
101                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
102                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
103                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
104                        else:
105                                ppn = ''
106                        status = self.getAttr( attrs, 'job_state' )
107                        start_timestamp = self.getAttr( attrs, 'mtime' )
108                        stop_timestamp = ''
109
110                        myAttrs = { }
111                        myAttrs['name'] = name
112                        myAttrs['queue'] = queue
113                        myAttrs['owner'] = owner
114                        myAttrs['requested_time'] = requested_time
115                        myAttrs['requested_memory'] = requested_memory
116                        myAttrs['ppn'] = ppn
117                        myAttrs['status'] = status
118                        myAttrs['start_timestamp'] = start_timestamp
119                        myAttrs['stop_timestamp'] = stop_timestamp
120
121                        if self.jobDataChanged( jobs, job_id, myAttrs ):
122                                jobs[ job_id ] = myAttrs
123
124                                self.printJob( jobs, job_id )
125
126                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
127
128                for id, attrs in jobs.items():
129
130                        # This job was there in the last run, and not anymore
131                        # it must have finished
132
133                        if id not in jobs_processed and attrs['stop_timestamp'] == '':
134
135                                jobs[ id ]['status'] = 'F'
136                                jobs[ id ]['stop_timestamp'] = time.time()
137                                debug_msg( 10, printTime() + ' job %s finished' %(id) )
138                                self.printJob( jobs, id )
139
140                # Now let's spread the knowledge
141                #
142                for jobid, jobattrs in jobs.items():
143
144                        if ARCHIVE_MODE:
145
146                                if self.jobDataChanged( self.jobs, jobid, jobattrs ):
147
148                                        self.dp.togaSubmitJob( jobid, jobattrs )
149
150                        self.dp.multicastGmetric( jobid, jobattrs )
151                                       
152                self.jobs = jobs
153
154        def printJobs( self, jobs ):
155       
156                for name, attrs in self.jobs.items():
157
158                        print 'job %s' %(name)
159
160                        for name, val in attrs.items():
161
162                                print '\t%s = %s' %( name, val )
163
164        def printJob( self, jobs, job_id ):
165
166                print 'job %s' %(job_id)
167
168                for name, val in self.jobs[ job_id ].items():
169
170                        print '\t%s = %s' %( name, val )
171
172        def daemon( self ):
173                "Run as daemon forever"
174
175                self.DAEMON = 1
176
177                # Fork the first child
178                #
179                pid = os.fork()
180                if pid > 0:
181                        sys.exit(0)  # end parrent
182
183                # creates a session and sets the process group ID
184                #
185                os.setsid()
186
187                # Fork the second child
188                #
189                pid = os.fork()
190                if pid > 0:
191                        sys.exit(0)  # end parrent
192
193                # Go to the root directory and set the umask
194                #
195                os.chdir('/')
196                os.umask(0)
197
198                sys.stdin.close()
199                sys.stdout.close()
200                sys.stderr.close()
201
202                os.open('/dev/null', 0)
203                os.dup(0)
204                os.dup(0)
205
206                self.run()
207
208        def run( self ):
209                "Main thread"
210
211                while ( 1 ):
212               
213                        self.getJobData()
214                        time.sleep( TORQUE_POLL_INTERVAL )     
215
216def printTime( ):
217
218        return time.strftime("%a, %d %b %Y %H:%M:%S")
219
220def debug_msg( level, msg ):
221
222        if (DEBUG_LEVEL >= level):
223                        sys.stderr.write( msg + '\n' )
224
225def main():
226
227        gather = PBSDataGatherer()
228        if DAEMONIZE:
229                gather.daemon()
230        else:
231                gather.run()
232
233if __name__ == '__main__':
234        main()
Note: See TracBrowser for help on using the repository browser.