source: trunk/plugin/togap.py @ 162

Last change on this file since 162 was 162, checked in by bastiaans, 17 years ago

plugin/togap.py:

  • Query local PBS server again
File size: 10.1 KB
Line 
1#!/usr/bin/env python
2
3# Specify debugging level here;
4#
5# 10 = gemtric cmd's
6DEBUG_LEVEL = 0
7
8# Wether or not to run as a daemon in background
9#
10DAEMONIZE = 1
11
12# How many seconds interval for polling of jobs
13#
14# this will effect directly how accurate the
15# end time of a job can be determined
16#
17TORQUE_POLL_INTERVAL = 10
18
19# Alternate location of gmond.conf
20#
21# Default: /etc/gmond.conf
22#
23#GMOND_CONF = '/etc/gmond.conf'
24
25from PBSQuery import PBSQuery
26import sys
27import time
28import os
29import socket
30import string
31
32class DataProcessor:
33        """Class for processing of data"""
34
35        binary = '/usr/bin/gmetric'
36
37        def __init__( self, binary=None ):
38                """Remember alternate binary location if supplied"""
39
40                if binary:
41                        self.binary = binary
42
43                # Timeout for XML
44                #
45                # From ganglia's documentation:
46                #
47                # 'A metric will be deleted DMAX seconds after it is received, and
48                # DMAX=0 means eternal life.'
49
50                self.dmax = str( int( TORQUE_POLL_INTERVAL ) )
51
52                try:
53                        gmond_file = GMOND_CONF
54
55                except NameError:
56                        gmond_file = '/etc/gmond.conf'
57
58                if not os.path.exists( gmond_file ):
59                        debug_msg( 0, gmond_file + ' does not exist' )
60                        sys.exit( 1 )
61
62                incompatible = self.checkGmetricVersion()
63
64                if incompatible:
65                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
66                        sys.exit( 1 )
67
68        def checkGmetricVersion( self ):
69                """
70                Check version of gmetric is at least 3.0.1
71                for the syntax we use
72                """
73
74                for line in os.popen( self.binary + ' --version' ).readlines():
75
76                        line = line.split( ' ' )
77
78                        if len( line ) == 2 and str(line).find( 'gmetric' ) != -1:
79                       
80                                gmetric_version = line[1].split( '\n' )[0]
81
82                                version_major = int( gmetric_version.split( '.' )[0] )
83                                version_minor = int( gmetric_version.split( '.' )[1] )
84                                version_patch = int( gmetric_version.split( '.' )[2] )
85
86                                incompatible = 0
87
88                                if version_major < 3:
89
90                                        incompatible = 1
91                               
92                                elif version_major == 3:
93
94                                        if version_minor == 0:
95
96                                                if version_patch < 1:
97                                               
98                                                        incompatible = 1
99
100                return incompatible
101
102        def multicastGmetric( self, metricname, metricval, valtype='string' ):
103                """Call gmetric binary and multicast"""
104
105                cmd = self.binary
106
107                try:
108                        cmd = cmd + ' -c' + GMOND_CONF
109                except NameError:
110                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
111
112                cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
113
114                debug_msg( 10, printTime() + ' ' + cmd )
115                os.system( cmd )
116
117class PBSDataGatherer:
118
119        jobs = { }
120
121        def __init__( self ):
122                """Setup appropriate variables"""
123
124                self.jobs = { }
125                self.dp = DataProcessor()
126                self.initPbsQuery()
127
128        def initPbsQuery( self ):
129
130                self.pq = None
131                self.pq = PBSQuery()
132
133        def getAttr( self, attrs, name ):
134                """Return certain attribute from dictionary, if exists"""
135
136                if attrs.has_key( name ):
137                        return attrs[name]
138                else:
139                        return ''
140
141        def jobDataChanged( self, jobs, job_id, attrs ):
142                """Check if job with attrs and job_id in jobs has changed"""
143
144                if jobs.has_key( job_id ):
145                        oldData = jobs[ job_id ]       
146                else:
147                        return 1
148
149                for name, val in attrs.items():
150
151                        if oldData.has_key( name ):
152
153                                if oldData[ name ] != attrs[ name ]:
154
155                                        return 1
156
157                        else:
158                                return 1
159
160                return 0
161
162        def getJobData( self, known_jobs ):
163                """Gather all data on current jobs in Torque"""
164
165                if len( known_jobs ) > 0:
166                        jobs = known_jobs
167                else:
168                        jobs = { }
169
170                #self.initPbsQuery()
171       
172                #print self.pq.getnodes()
173       
174                joblist = self.pq.getjobs()
175
176                self.cur_time = time.time()
177
178                jobs_processed = [ ]
179
180                #self.printJobs( joblist )
181
182                for name, attrs in joblist.items():
183
184                        job_id = name.split( '.' )[0]
185
186                        jobs_processed.append( job_id )
187
188                        name = self.getAttr( attrs, 'Job_Name' )
189                        queue = self.getAttr( attrs, 'queue' )
190                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
191                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
192                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
193
194                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
195
196                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
197                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
198                        else:
199                                ppn = ''
200
201                        status = self.getAttr( attrs, 'job_state' )
202
203                        if status == 'R':
204                                start_timestamp = self.getAttr( attrs, 'mtime' )
205                                nodes = self.getAttr( attrs, 'exec_host' ).split( '+' )
206
207                                nodeslist = [ ]
208
209                                for node in nodes:
210                                        host = node.split( '/' )[0]
211
212                                        if nodeslist.count( host ) == 0:
213                                                nodeslist.append( host )
214
215                        elif status == 'Q':
216                                start_timestamp = ''
217                                count_mynodes = 0
218                                numeric_node = 1
219
220                                for node in mynoderequest.split( '+' ):
221
222                                        nodepart = node.split( ':' )[0]
223
224                                        for letter in nodepart:
225
226                                                if letter not in string.digits:
227
228                                                        numeric_node = 0
229
230                                        if not numeric_node:
231                                                count_mynodes = count_mynodes + 1
232                                        else:
233                                                count_mynodes = count_mynodes + int( nodepart )
234                                               
235                                nodeslist = count_mynodes
236
237                        myAttrs = { }
238                        myAttrs['name'] = name
239                        myAttrs['queue'] = queue
240                        myAttrs['owner'] = owner
241                        myAttrs['requested_time'] = requested_time
242                        myAttrs['requested_memory'] = requested_memory
243                        myAttrs['ppn'] = ppn
244                        myAttrs['status'] = status
245                        myAttrs['start_timestamp'] = start_timestamp
246                        myAttrs['reported'] = str( int( self.cur_time ) )
247                        myAttrs['nodes'] = nodeslist
248                        myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' )
249                        myAttrs['poll_interval'] = TORQUE_POLL_INTERVAL
250
251                        if self.jobDataChanged( jobs, job_id, myAttrs ):
252                                jobs[ job_id ] = myAttrs
253
254                                #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
255
256                for id, attrs in jobs.items():
257
258                        if id not in jobs_processed:
259
260                                # This one isn't there anymore; toedeledoki!
261                                #
262                                del jobs[ id ]
263
264                return jobs
265
266        def submitJobData( self, jobs ):
267                """Submit job info list"""
268
269                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( int( self.cur_time ) ) )
270
271                # Now let's spread the knowledge
272                #
273                for jobid, jobattrs in jobs.items():
274
275                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
276
277                        for val in gmetric_val:
278                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
279
280        def makeNodeString( self, nodelist ):
281                """Make one big string of all hosts"""
282
283                node_str = None
284
285                for node in nodelist:
286                        if not node_str:
287                                node_str = node
288                        else:
289                                node_str = node_str + ';' + node
290
291                return node_str
292
293        def compileGmetricVal( self, jobid, jobattrs ):
294                """Create a val string for gmetric of jobinfo"""
295
296                appendList = [ ]
297                appendList.append( 'name=' + jobattrs['name'] )
298                appendList.append( 'queue=' + jobattrs['queue'] )
299                appendList.append( 'owner=' + jobattrs['owner'] )
300                appendList.append( 'requested_time=' + jobattrs['requested_time'] )
301
302                if jobattrs['requested_memory'] != '':
303                        appendList.append( 'requested_memory=' + jobattrs['requested_memory'] )
304
305                if jobattrs['ppn'] != '':
306                        appendList.append( 'ppn=' + jobattrs['ppn'] )
307
308                appendList.append( 'status=' + jobattrs['status'] )
309
310                if jobattrs['start_timestamp'] != '':
311                        appendList.append( 'start_timestamp=' + jobattrs['start_timestamp'] )
312
313                appendList.append( 'reported=' + jobattrs['reported'] )
314                appendList.append( 'poll_interval=' + str( jobattrs['poll_interval'] ) )
315                appendList.append( 'domain=' + jobattrs['domain'] )
316
317                if jobattrs['status'] == 'R':
318                        if len( jobattrs['nodes'] ) > 0:
319                                appendList.append( 'nodes=' + self.makeNodeString( jobattrs['nodes'] ) )
320                elif jobattrs['status'] == 'Q':
321                        appendList.append( 'nodes=' + str(jobattrs['nodes']) )
322
323                return self.makeAppendLists( appendList )
324
325        def makeAppendLists( self, append_list ):
326                """
327                Divide all values from append_list over strings with a maximum
328                size of 1400
329                """
330
331                app_lists = [ ]
332
333                mystr = None
334
335                for val in append_list:
336
337                        if not mystr:
338                                mystr = val
339                        else:
340                                if not self.checkValAppendMaxSize( mystr, val ):
341                                        mystr = mystr + ' ' + val
342                                else:
343                                        # Too big, new appenlist
344                                        app_lists.append( mystr )
345                                        mystr = val
346
347                app_lists.append( mystr )
348
349                return app_lists
350
351        def checkValAppendMaxSize( self, val, text ):
352                """Check if val + text size is not above 1400 (max msg size)"""
353
354                # Max frame size of a udp datagram is 1500 bytes
355                # removing misc header and gmetric stuff leaves about 1400 bytes
356                #
357                if len( val + text ) > 1400:
358                        return 1
359                else:
360                        return 0
361
362        def printJobs( self, jobs ):
363                """Print a jobinfo overview"""
364
365                for name, attrs in self.jobs.items():
366
367                        print 'job %s' %(name)
368
369                        for name, val in attrs.items():
370
371                                print '\t%s = %s' %( name, val )
372
373        def printJob( self, jobs, job_id ):
374                """Print job with job_id from jobs"""
375
376                print 'job %s' %(job_id)
377
378                for name, val in jobs[ job_id ].items():
379
380                        print '\t%s = %s' %( name, val )
381
382        def daemon( self ):
383                """Run as daemon forever"""
384
385                # Fork the first child
386                #
387                pid = os.fork()
388                if pid > 0:
389                        sys.exit(0)  # end parrent
390
391                # creates a session and sets the process group ID
392                #
393                os.setsid()
394
395                # Fork the second child
396                #
397                pid = os.fork()
398                if pid > 0:
399                        sys.exit(0)  # end parrent
400
401                # Go to the root directory and set the umask
402                #
403                os.chdir('/')
404                os.umask(0)
405
406                sys.stdin.close()
407                sys.stdout.close()
408                sys.stderr.close()
409
410                os.open('/dev/null', 0)
411                os.dup(0)
412                os.dup(0)
413
414                self.run()
415
416        def run( self ):
417                """Main thread"""
418
419                while ( 1 ):
420               
421                        self.jobs = self.getJobData( self.jobs )
422                        self.submitJobData( self.jobs )
423                        time.sleep( TORQUE_POLL_INTERVAL )     
424
425def printTime( ):
426        """Print current time/date in human readable format for log/debug"""
427
428        return time.strftime("%a, %d %b %Y %H:%M:%S")
429
430def debug_msg( level, msg ):
431        """Print msg if at or above current debug level"""
432
433        if (DEBUG_LEVEL >= level):
434                        sys.stderr.write( msg + '\n' )
435
436def main():
437        """Application start"""
438
439        gather = PBSDataGatherer()
440        if DAEMONIZE:
441                gather.daemon()
442        else:
443                gather.run()
444
445# w00t someone started me
446#
447if __name__ == '__main__':
448        main()
Note: See TracBrowser for help on using the repository browser.