source: trunk/plugin/togap.py @ 65

Last change on this file since 65 was 65, checked in by bastiaans, 18 years ago

plugin/togap.py:

  • Added version check of gmetric since we only understand 3.0.1 syntax atm
  • Disabled stop_timestamp transmitting should be determined at toga _server_ and not here
  • Will check if gmetrics to be send > 1400 bytes and chop it up in pieces if needed


File size: 7.4 KB
Line 
1#!/usr/bin/env python
2
3# Specify debugging level here;
4#
5DEBUG_LEVEL = 10
6
7# Wether or not to run as a daemon in background
8#
9DAEMONIZE = 0
10
11# How many seconds interval for polling of jobs
12#
13# this will effect directly how accurate the
14# end time of a job can be determined
15#
16TORQUE_POLL_INTERVAL = 10
17
18from PBSQuery import PBSQuery
19import sys
20import time
21import os
22import string
23
24class DataProcessor:
25
26        binary = '/usr/bin/gmetric'
27
28        def __init__( self, binary=None ):
29
30                if binary:
31                        self.binary = binary
32
33                self.dmax = TORQUE_POLL_INTERVAL
34
35                #incompatible = self.checkGmetricVersion
36                incompatible = 0
37
38                if incompatible:
39                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
40                        sys.exit( 1 )
41
42        def checkGmetricVersion( self ):
43
44                for line in os.popen( self.binary + ' --version' ).readlines():
45
46                        line = line.split( ' ' )
47
48                        if len( line ) == 2 and line.find( 'gmetric' ) != -1:
49                       
50                                gmetric_version = line[1]
51
52                                version_major = int( gemtric_version.split( '.' )[0] )
53                                version_minor = int( gemtric_version.split( '.' )[1] )
54                                version_patch = int( gemtric_version.split( '.' )[2] )
55
56                                incompatible = 0
57
58                                if version_major < 3:
59
60                                        incompatible = 1
61                               
62                                elif version_major == 3:
63
64                                        if version_minor == 0:
65
66                                                if version_patch < 1:
67                                               
68                                                        incompatbiel = 1
69
70                return incompatible
71
72        def multicastGmetric( self, metricname, metricval, tmax='15' ):
73
74                cmd = self.binary
75
76                try:
77                        cmd = cmd + ' -c' + GMOND_CONF
78                except NameError:
79                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
80
81                cmd = cmd + ' -n' + metricname + ' -v"' + metricval + '" -t' + tmax + ' -d' + str( self.dmax )
82
83                print cmd
84                #os.system( cmd )
85
86class PBSDataGatherer:
87
88        jobs = { }
89
90        def __init__( self ):
91
92                self.pq = PBSQuery()
93                self.jobs = { }
94                self.dp = DataProcessor()
95
96        def getAttr( self, attrs, name ):
97
98                if attrs.has_key( name ):
99                        return attrs[name]
100                else:
101                        return ''
102
103        def jobDataChanged( self, jobs, job_id, attrs ):
104
105                if jobs.has_key( job_id ):
106                        oldData = jobs[ job_id ]       
107                else:
108                        return 1
109
110                for name, val in attrs.items():
111
112                        if oldData.has_key( name ):
113
114                                if oldData[ name ] != attrs[ name ]:
115
116                                        return 1
117
118                        else:
119                                return 1
120
121                return 0
122
123        def getJobData( self, known_jobs ):
124
125                if len( known_jobs ) > 0:
126                        jobs = known_jobs
127                else:
128                        jobs = { }
129
130                joblist = self.pq.getjobs()
131
132                jobs_processed = [ ]
133
134                for name, attrs in joblist.items():
135
136                        job_id = name.split( '.' )[0]
137
138                        jobs_processed.append( job_id )
139
140                        name = self.getAttr( attrs, 'Job_Name' )
141                        queue = self.getAttr( attrs, 'queue' )
142                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
143                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
144                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
145                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
146                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
147                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
148                        else:
149                                ppn = ''
150                        status = self.getAttr( attrs, 'job_state' )
151                        start_timestamp = self.getAttr( attrs, 'mtime' )
152                        #stop_timestamp = ''
153
154                        myAttrs = { }
155                        myAttrs['name'] = name
156                        myAttrs['queue'] = queue
157                        myAttrs['owner'] = owner
158                        myAttrs['requested_time'] = requested_time
159                        myAttrs['requested_memory'] = requested_memory
160                        myAttrs['ppn'] = ppn
161                        myAttrs['status'] = status
162                        myAttrs['start_timestamp'] = start_timestamp
163                        #myAttrs['stop_timestamp'] = stop_timestamp
164
165                        if self.jobDataChanged( jobs, job_id, myAttrs ):
166                                jobs[ job_id ] = myAttrs
167
168                                self.printJob( jobs, job_id )
169
170                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
171
172                #for id, attrs in jobs.items():
173
174                #       # This job was there in the last run, and not anymore
175                #       # it must have finished
176
177                #       if id not in jobs_processed and attrs['stop_timestamp'] == '':
178
179                #               jobs[ id ]['status'] = 'F'
180                #               jobs[ id ]['stop_timestamp'] = time.time()
181                #               debug_msg( 10, printTime() + ' job %s finished' %(id) )
182                #               self.printJob( jobs, id )
183
184                return jobs
185
186        def submitJobData( self, jobs ):
187                """Submit job info list"""
188
189                time_now = time.time()
190
191                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( time_now ) )
192
193                # Now let's spread the knowledge
194                #
195                for jobid, jobattrs in jobs.items():
196
197                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
198
199                        for val in gmetric_val:
200                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
201
202        def compileGmetricVal( self, jobid, jobattrs ):
203                """Create a val string for gmetric of jobinfo"""
204
205                name_str = 'name=' + jobattrs['name']
206                queue_str = 'queue=' + jobattrs['queue']
207                owner_str = 'owner=' + jobattrs['owner']
208                rtime_str = 'rtime=' + jobattrs['requested_time']
209                rmem_str = 'rmem=' + jobattrs['requested_memory']
210                ppn_str = 'ppn=' + jobattrs['ppn']
211                status_str = 'status=' + jobattrs['status']
212                stime_str = 'stime=' + jobattrs['start_timestamp']
213
214                appendList = [ name_str, queue_str, owner_str, rtime_str, rmem_str, ppn_str, status_str, stime_str ]
215
216                return self.makeAppendLists( appendList )
217
218        def makeAppendLists( self, append_list ):
219
220                app_lists = [ ]
221
222                mystr = None
223
224                for val in append_list:
225
226                        if not mystr:
227                                mystr = val
228                        else:
229                                if not self.checkValAppendMaxSize( mystr, val ):
230                                        mystr = mystr + ' ' + val
231                                else:
232                                        # Too big, new appenlist
233                                        app_lists.append( mystr )
234                                        mystr = val
235
236                app_lists.append( mystr )
237
238                return app_lists
239
240        def checkValAppendMaxSize( self, val, text ):
241                """Check if val + text size is not above 1400 (max msg size)"""
242
243                if len( val + text ) > 1400:
244                        return 1
245                else:
246                        return 0
247
248        def printJobs( self, jobs ):
249                """Print a jobinfo overview"""
250
251                for name, attrs in self.jobs.items():
252
253                        print 'job %s' %(name)
254
255                        for name, val in attrs.items():
256
257                                print '\t%s = %s' %( name, val )
258
259        def printJob( self, jobs, job_id ):
260                """Print job with job_id from jobs"""
261
262                print 'job %s' %(job_id)
263
264                for name, val in jobs[ job_id ].items():
265
266                        print '\t%s = %s' %( name, val )
267
268        def daemon( self ):
269                """Run as daemon forever"""
270
271                # Fork the first child
272                #
273                pid = os.fork()
274                if pid > 0:
275                        sys.exit(0)  # end parrent
276
277                # creates a session and sets the process group ID
278                #
279                os.setsid()
280
281                # Fork the second child
282                #
283                pid = os.fork()
284                if pid > 0:
285                        sys.exit(0)  # end parrent
286
287                # Go to the root directory and set the umask
288                #
289                os.chdir('/')
290                os.umask(0)
291
292                sys.stdin.close()
293                sys.stdout.close()
294                sys.stderr.close()
295
296                os.open('/dev/null', 0)
297                os.dup(0)
298                os.dup(0)
299
300                self.run()
301
302        def run( self ):
303                """Main thread"""
304
305                while ( 1 ):
306               
307                        self.jobs = self.getJobData( self.jobs )
308                        self.submitJobData( self.jobs )
309                        time.sleep( TORQUE_POLL_INTERVAL )     
310
311def printTime( ):
312        """Print current time/date in human readable format for log/debug"""
313
314        return time.strftime("%a, %d %b %Y %H:%M:%S")
315
316def debug_msg( level, msg ):
317        """Print msg if at or above current debug level"""
318
319        if (DEBUG_LEVEL >= level):
320                        sys.stderr.write( msg + '\n' )
321
322def main():
323        """Application start"""
324
325        gather = PBSDataGatherer()
326        if DAEMONIZE:
327                gather.daemon()
328        else:
329                gather.run()
330
331# w00t someone started me
332#
333if __name__ == '__main__':
334        main()
Note: See TracBrowser for help on using the repository browser.