source: trunk/plugin/togap.py @ 65

Last change on this file since 65 was 65, checked in by bastiaans, 19 years ago

plugin/togap.py:

  • Added version check of gmetric since we only understand 3.0.1 syntax atm
  • Disabled stop_timestamp transmitting should be determined at toga _server_ and not here
  • Will check if gmetrics to be send > 1400 bytes and chop it up in pieces if needed


File size: 7.4 KB
RevLine 
[23]1#!/usr/bin/env python
2
[26]3# Specify debugging level here;
4#
5DEBUG_LEVEL = 10
6
7# Wether or not to run as a daemon in background
8#
9DAEMONIZE = 0
10
[64]11# How many seconds interval for polling of jobs
[61]12#
[64]13# this will effect directly how accurate the
14# end time of a job can be determined
15#
16TORQUE_POLL_INTERVAL = 10
[61]17
[23]18from PBSQuery import PBSQuery
[26]19import sys
20import time
[65]21import os
22import string
[23]23
[61]24class DataProcessor:
25
26        binary = '/usr/bin/gmetric'
27
28        def __init__( self, binary=None ):
29
30                if binary:
31                        self.binary = binary
32
[65]33                self.dmax = TORQUE_POLL_INTERVAL
[61]34
[65]35                #incompatible = self.checkGmetricVersion
36                incompatible = 0
[61]37
[65]38                if incompatible:
39                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
40                        sys.exit( 1 )
41
42        def checkGmetricVersion( self ):
43
44                for line in os.popen( self.binary + ' --version' ).readlines():
45
46                        line = line.split( ' ' )
47
48                        if len( line ) == 2 and line.find( 'gmetric' ) != -1:
49                       
50                                gmetric_version = line[1]
51
52                                version_major = int( gemtric_version.split( '.' )[0] )
53                                version_minor = int( gemtric_version.split( '.' )[1] )
54                                version_patch = int( gemtric_version.split( '.' )[2] )
55
56                                incompatible = 0
57
58                                if version_major < 3:
59
60                                        incompatible = 1
61                               
62                                elif version_major == 3:
63
64                                        if version_minor == 0:
65
66                                                if version_patch < 1:
67                                               
68                                                        incompatbiel = 1
69
70                return incompatible
71
72        def multicastGmetric( self, metricname, metricval, tmax='15' ):
73
74                cmd = self.binary
75
[61]76                try:
77                        cmd = cmd + ' -c' + GMOND_CONF
78                except NameError:
[64]79                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
[61]80
[65]81                cmd = cmd + ' -n' + metricname + ' -v"' + metricval + '" -t' + tmax + ' -d' + str( self.dmax )
[61]82
83                print cmd
84                #os.system( cmd )
85
[23]86class PBSDataGatherer:
87
[61]88        jobs = { }
89
[23]90        def __init__( self ):
91
92                self.pq = PBSQuery()
[26]93                self.jobs = { }
[61]94                self.dp = DataProcessor()
[23]95
[26]96        def getAttr( self, attrs, name ):
97
98                if attrs.has_key( name ):
99                        return attrs[name]
100                else:
101                        return ''
102
103        def jobDataChanged( self, jobs, job_id, attrs ):
104
105                if jobs.has_key( job_id ):
106                        oldData = jobs[ job_id ]       
107                else:
108                        return 1
109
110                for name, val in attrs.items():
111
112                        if oldData.has_key( name ):
113
114                                if oldData[ name ] != attrs[ name ]:
115
116                                        return 1
117
118                        else:
119                                return 1
120
121                return 0
122
[65]123        def getJobData( self, known_jobs ):
[26]124
[65]125                if len( known_jobs ) > 0:
126                        jobs = known_jobs
127                else:
128                        jobs = { }
[26]129
130                joblist = self.pq.getjobs()
131
132                jobs_processed = [ ]
133
134                for name, attrs in joblist.items():
135
136                        job_id = name.split( '.' )[0]
137
138                        jobs_processed.append( job_id )
[61]139
[26]140                        name = self.getAttr( attrs, 'Job_Name' )
141                        queue = self.getAttr( attrs, 'queue' )
142                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
143                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
144                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
145                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
146                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
147                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
148                        else:
149                                ppn = ''
150                        status = self.getAttr( attrs, 'job_state' )
151                        start_timestamp = self.getAttr( attrs, 'mtime' )
[65]152                        #stop_timestamp = ''
[25]153
[26]154                        myAttrs = { }
155                        myAttrs['name'] = name
156                        myAttrs['queue'] = queue
157                        myAttrs['owner'] = owner
158                        myAttrs['requested_time'] = requested_time
159                        myAttrs['requested_memory'] = requested_memory
160                        myAttrs['ppn'] = ppn
161                        myAttrs['status'] = status
162                        myAttrs['start_timestamp'] = start_timestamp
[65]163                        #myAttrs['stop_timestamp'] = stop_timestamp
[26]164
165                        if self.jobDataChanged( jobs, job_id, myAttrs ):
166                                jobs[ job_id ] = myAttrs
[61]167
168                                self.printJob( jobs, job_id )
169
[26]170                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
171
[65]172                #for id, attrs in jobs.items():
[26]173
[65]174                #       # This job was there in the last run, and not anymore
175                #       # it must have finished
[26]176
[65]177                #       if id not in jobs_processed and attrs['stop_timestamp'] == '':
[26]178
[65]179                #               jobs[ id ]['status'] = 'F'
180                #               jobs[ id ]['stop_timestamp'] = time.time()
181                #               debug_msg( 10, printTime() + ' job %s finished' %(id) )
182                #               self.printJob( jobs, id )
[26]183
[65]184                return jobs
185
186        def submitJobData( self, jobs ):
187                """Submit job info list"""
188
189                time_now = time.time()
190
191                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( time_now ) )
192
[61]193                # Now let's spread the knowledge
194                #
195                for jobid, jobattrs in jobs.items():
196
[65]197                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
[61]198
[65]199                        for val in gmetric_val:
200                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
[61]201
[65]202        def compileGmetricVal( self, jobid, jobattrs ):
203                """Create a val string for gmetric of jobinfo"""
[61]204
[65]205                name_str = 'name=' + jobattrs['name']
206                queue_str = 'queue=' + jobattrs['queue']
207                owner_str = 'owner=' + jobattrs['owner']
208                rtime_str = 'rtime=' + jobattrs['requested_time']
209                rmem_str = 'rmem=' + jobattrs['requested_memory']
210                ppn_str = 'ppn=' + jobattrs['ppn']
211                status_str = 'status=' + jobattrs['status']
212                stime_str = 'stime=' + jobattrs['start_timestamp']
[26]213
[65]214                appendList = [ name_str, queue_str, owner_str, rtime_str, rmem_str, ppn_str, status_str, stime_str ]
215
216                return self.makeAppendLists( appendList )
217
218        def makeAppendLists( self, append_list ):
219
220                app_lists = [ ]
221
222                mystr = None
223
224                for val in append_list:
225
226                        if not mystr:
227                                mystr = val
228                        else:
229                                if not self.checkValAppendMaxSize( mystr, val ):
230                                        mystr = mystr + ' ' + val
231                                else:
232                                        # Too big, new appenlist
233                                        app_lists.append( mystr )
234                                        mystr = val
235
236                app_lists.append( mystr )
237
238                return app_lists
239
240        def checkValAppendMaxSize( self, val, text ):
241                """Check if val + text size is not above 1400 (max msg size)"""
242
243                if len( val + text ) > 1400:
244                        return 1
245                else:
246                        return 0
247
[61]248        def printJobs( self, jobs ):
[65]249                """Print a jobinfo overview"""
250
[26]251                for name, attrs in self.jobs.items():
252
253                        print 'job %s' %(name)
254
255                        for name, val in attrs.items():
256
257                                print '\t%s = %s' %( name, val )
258
[61]259        def printJob( self, jobs, job_id ):
[65]260                """Print job with job_id from jobs"""
[26]261
262                print 'job %s' %(job_id)
263
[65]264                for name, val in jobs[ job_id ].items():
[26]265
266                        print '\t%s = %s' %( name, val )
267
268        def daemon( self ):
[65]269                """Run as daemon forever"""
[26]270
271                # Fork the first child
272                #
273                pid = os.fork()
274                if pid > 0:
275                        sys.exit(0)  # end parrent
276
277                # creates a session and sets the process group ID
278                #
279                os.setsid()
280
281                # Fork the second child
282                #
283                pid = os.fork()
284                if pid > 0:
285                        sys.exit(0)  # end parrent
286
287                # Go to the root directory and set the umask
288                #
289                os.chdir('/')
290                os.umask(0)
291
292                sys.stdin.close()
293                sys.stdout.close()
294                sys.stderr.close()
295
296                os.open('/dev/null', 0)
297                os.dup(0)
298                os.dup(0)
299
300                self.run()
301
302        def run( self ):
[65]303                """Main thread"""
[26]304
305                while ( 1 ):
306               
[65]307                        self.jobs = self.getJobData( self.jobs )
308                        self.submitJobData( self.jobs )
[64]309                        time.sleep( TORQUE_POLL_INTERVAL )     
[26]310
311def printTime( ):
[65]312        """Print current time/date in human readable format for log/debug"""
[26]313
314        return time.strftime("%a, %d %b %Y %H:%M:%S")
315
316def debug_msg( level, msg ):
[65]317        """Print msg if at or above current debug level"""
[26]318
319        if (DEBUG_LEVEL >= level):
320                        sys.stderr.write( msg + '\n' )
321
[23]322def main():
[65]323        """Application start"""
[23]324
325        gather = PBSDataGatherer()
[26]326        if DAEMONIZE:
327                gather.daemon()
328        else:
329                gather.run()
[23]330
[65]331# w00t someone started me
332#
[23]333if __name__ == '__main__':
334        main()
Note: See TracBrowser for help on using the repository browser.