source: trunk/plugin/togap.py @ 134

Last change on this file since 134 was 134, checked in by bastiaans, 19 years ago

plugin/togap.py:

  • Node reporting of queued jobs now really working
File size: 10.1 KB
RevLine 
[23]1#!/usr/bin/env python
2
[26]3# Specify debugging level here;
4#
[101]5# 10 = gemtric cmd's
[125]6DEBUG_LEVEL = 0
[26]7
8# Wether or not to run as a daemon in background
9#
[125]10DAEMONIZE = 1
[26]11
[64]12# How many seconds interval for polling of jobs
[61]13#
[64]14# this will effect directly how accurate the
15# end time of a job can be determined
16#
17TORQUE_POLL_INTERVAL = 10
[61]18
[68]19# Alternate location of gmond.conf
20#
21# Default: /etc/gmond.conf
22#
23#GMOND_CONF = '/etc/gmond.conf'
24
[23]25from PBSQuery import PBSQuery
[26]26import sys
27import time
[65]28import os
[67]29import socket
[65]30import string
[23]31
[61]32class DataProcessor:
[68]33        """Class for processing of data"""
[61]34
35        binary = '/usr/bin/gmetric'
36
37        def __init__( self, binary=None ):
[68]38                """Remember alternate binary location if supplied"""
[61]39
40                if binary:
41                        self.binary = binary
42
[80]43                # Timeout for XML
44                #
45                # From ganglia's documentation:
46                #
47                # 'A metric will be deleted DMAX seconds after it is received, and
48                # DMAX=0 means eternal life.'
[61]49
[88]50                self.dmax = str( int( TORQUE_POLL_INTERVAL ) )
[80]51
[68]52                try:
53                        gmond_file = GMOND_CONF
54
55                except NameError:
56                        gmond_file = '/etc/gmond.conf'
57
58                if not os.path.exists( gmond_file ):
59                        debug_msg( 0, gmond_file + ' does not exist' )
60                        sys.exit( 1 )
61
[69]62                incompatible = self.checkGmetricVersion()
[61]63
[65]64                if incompatible:
65                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
66                        sys.exit( 1 )
67
68        def checkGmetricVersion( self ):
[68]69                """
70                Check version of gmetric is at least 3.0.1
71                for the syntax we use
72                """
[65]73
74                for line in os.popen( self.binary + ' --version' ).readlines():
75
76                        line = line.split( ' ' )
77
[69]78                        if len( line ) == 2 and str(line).find( 'gmetric' ) != -1:
[65]79                       
[69]80                                gmetric_version = line[1].split( '\n' )[0]
[65]81
[69]82                                version_major = int( gmetric_version.split( '.' )[0] )
83                                version_minor = int( gmetric_version.split( '.' )[1] )
84                                version_patch = int( gmetric_version.split( '.' )[2] )
[65]85
86                                incompatible = 0
87
88                                if version_major < 3:
89
90                                        incompatible = 1
91                               
92                                elif version_major == 3:
93
94                                        if version_minor == 0:
95
96                                                if version_patch < 1:
97                                               
[91]98                                                        incompatible = 1
[65]99
100                return incompatible
101
[75]102        def multicastGmetric( self, metricname, metricval, valtype='string' ):
[68]103                """Call gmetric binary and multicast"""
[65]104
105                cmd = self.binary
106
[61]107                try:
108                        cmd = cmd + ' -c' + GMOND_CONF
109                except NameError:
[64]110                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
[61]111
[80]112                cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[61]113
[101]114                debug_msg( 10, printTime() + ' ' + cmd )
[69]115                os.system( cmd )
[61]116
[23]117class PBSDataGatherer:
118
[61]119        jobs = { }
120
[23]121        def __init__( self ):
[68]122                """Setup appropriate variables"""
[23]123
[26]124                self.jobs = { }
[61]125                self.dp = DataProcessor()
[91]126                self.initPbsQuery()
[23]127
[91]128        def initPbsQuery( self ):
129
130                self.pq = None
[125]131                self.pq = PBSQuery( 'login.irc.sara.nl' )
[91]132
[26]133        def getAttr( self, attrs, name ):
[68]134                """Return certain attribute from dictionary, if exists"""
[26]135
136                if attrs.has_key( name ):
137                        return attrs[name]
138                else:
139                        return ''
140
141        def jobDataChanged( self, jobs, job_id, attrs ):
[68]142                """Check if job with attrs and job_id in jobs has changed"""
[26]143
144                if jobs.has_key( job_id ):
145                        oldData = jobs[ job_id ]       
146                else:
147                        return 1
148
149                for name, val in attrs.items():
150
151                        if oldData.has_key( name ):
152
153                                if oldData[ name ] != attrs[ name ]:
154
155                                        return 1
156
157                        else:
158                                return 1
159
160                return 0
161
[65]162        def getJobData( self, known_jobs ):
[68]163                """Gather all data on current jobs in Torque"""
[26]164
[65]165                if len( known_jobs ) > 0:
166                        jobs = known_jobs
167                else:
168                        jobs = { }
[26]169
[101]170                #self.initPbsQuery()
[125]171       
172                #print self.pq.getnodes()
173       
[26]174                joblist = self.pq.getjobs()
175
[69]176                self.cur_time = time.time()
[68]177
[26]178                jobs_processed = [ ]
179
[125]180                #self.printJobs( joblist )
181
[26]182                for name, attrs in joblist.items():
183
184                        job_id = name.split( '.' )[0]
185
186                        jobs_processed.append( job_id )
[61]187
[26]188                        name = self.getAttr( attrs, 'Job_Name' )
189                        queue = self.getAttr( attrs, 'queue' )
190                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
191                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
192                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
[95]193
[26]194                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
[95]195
[26]196                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
197                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
198                        else:
199                                ppn = ''
[95]200
[26]201                        status = self.getAttr( attrs, 'job_state' )
[25]202
[95]203                        if status == 'R':
204                                start_timestamp = self.getAttr( attrs, 'mtime' )
205                                nodes = self.getAttr( attrs, 'exec_host' ).split( '+' )
[133]206
207                                nodeslist = [ ]
208
209                                for node in nodes:
210                                        host = node.split( '/' )[0]
211
212                                        if nodeslist.count( host ) == 0:
213                                                nodeslist.append( host )
214
215                        elif status == 'Q':
[95]216                                start_timestamp = ''
[133]217                                count_mynodes = 0
218                                numeric_node = 1
[95]219
[133]220                                for node in mynoderequest.split( '+' ):
[67]221
[133]222                                        nodepart = node.split( ':' )[0]
[67]223
[133]224                                        for letter in nodepart:
[67]225
[133]226                                                if letter not in string.digits:
227
228                                                        numeric_node = 0
229
230                                        if not numeric_node:
231                                                count_mynodes = count_mynodes + 1
232                                        else:
233                                                count_mynodes = count_mynodes + int( nodepart )
234                                               
[134]235                                nodeslist = count_mynodes
[133]236
[26]237                        myAttrs = { }
238                        myAttrs['name'] = name
239                        myAttrs['queue'] = queue
240                        myAttrs['owner'] = owner
241                        myAttrs['requested_time'] = requested_time
242                        myAttrs['requested_memory'] = requested_memory
243                        myAttrs['ppn'] = ppn
244                        myAttrs['status'] = status
245                        myAttrs['start_timestamp'] = start_timestamp
[75]246                        myAttrs['reported'] = str( int( self.cur_time ) )
[67]247                        myAttrs['nodes'] = nodeslist
248                        myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' )
[80]249                        myAttrs['poll_interval'] = TORQUE_POLL_INTERVAL
[26]250
251                        if self.jobDataChanged( jobs, job_id, myAttrs ):
252                                jobs[ job_id ] = myAttrs
[61]253
[101]254                                #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
[26]255
[76]256                for id, attrs in jobs.items():
257
258                        if id not in jobs_processed:
259
260                                # This one isn't there anymore; toedeledoki!
261                                #
262                                del jobs[ id ]
263
[65]264                return jobs
265
266        def submitJobData( self, jobs ):
267                """Submit job info list"""
268
[69]269                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( int( self.cur_time ) ) )
270
[61]271                # Now let's spread the knowledge
272                #
273                for jobid, jobattrs in jobs.items():
274
[95]275                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
[61]276
[95]277                        for val in gmetric_val:
278                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
[61]279
[67]280        def makeNodeString( self, nodelist ):
[68]281                """Make one big string of all hosts"""
[67]282
283                node_str = None
284
285                for node in nodelist:
286                        if not node_str:
287                                node_str = node
288                        else:
289                                node_str = node_str + ';' + node
290
291                return node_str
292
[65]293        def compileGmetricVal( self, jobid, jobattrs ):
294                """Create a val string for gmetric of jobinfo"""
[61]295
[80]296                appendList = [ ]
297                appendList.append( 'name=' + jobattrs['name'] )
298                appendList.append( 'queue=' + jobattrs['queue'] )
299                appendList.append( 'owner=' + jobattrs['owner'] )
300                appendList.append( 'requested_time=' + jobattrs['requested_time'] )
[95]301
302                if jobattrs['requested_memory'] != '':
303                        appendList.append( 'requested_memory=' + jobattrs['requested_memory'] )
304
305                if jobattrs['ppn'] != '':
306                        appendList.append( 'ppn=' + jobattrs['ppn'] )
307
[80]308                appendList.append( 'status=' + jobattrs['status'] )
[95]309
310                if jobattrs['start_timestamp'] != '':
311                        appendList.append( 'start_timestamp=' + jobattrs['start_timestamp'] )
312
[80]313                appendList.append( 'reported=' + jobattrs['reported'] )
[85]314                appendList.append( 'poll_interval=' + str( jobattrs['poll_interval'] ) )
[80]315                appendList.append( 'domain=' + jobattrs['domain'] )
[26]316
[134]317                if jobattrs['status'] == 'R':
318                        if len( jobattrs['nodes'] ) > 0:
319                                appendList.append( 'nodes=' + self.makeNodeString( jobattrs['nodes'] ) )
320                elif jobattrs['status'] == 'Q':
321                        appendList.append( 'nodes=' + str(jobattrs['nodes']) )
[95]322
[65]323                return self.makeAppendLists( appendList )
324
325        def makeAppendLists( self, append_list ):
[68]326                """
327                Divide all values from append_list over strings with a maximum
328                size of 1400
329                """
[65]330
331                app_lists = [ ]
332
333                mystr = None
334
335                for val in append_list:
336
337                        if not mystr:
338                                mystr = val
339                        else:
340                                if not self.checkValAppendMaxSize( mystr, val ):
341                                        mystr = mystr + ' ' + val
342                                else:
343                                        # Too big, new appenlist
344                                        app_lists.append( mystr )
345                                        mystr = val
346
347                app_lists.append( mystr )
348
349                return app_lists
350
351        def checkValAppendMaxSize( self, val, text ):
352                """Check if val + text size is not above 1400 (max msg size)"""
353
[69]354                # Max frame size of a udp datagram is 1500 bytes
355                # removing misc header and gmetric stuff leaves about 1400 bytes
356                #
[65]357                if len( val + text ) > 1400:
358                        return 1
359                else:
360                        return 0
361
[61]362        def printJobs( self, jobs ):
[65]363                """Print a jobinfo overview"""
364
[26]365                for name, attrs in self.jobs.items():
366
367                        print 'job %s' %(name)
368
369                        for name, val in attrs.items():
370
371                                print '\t%s = %s' %( name, val )
372
[61]373        def printJob( self, jobs, job_id ):
[65]374                """Print job with job_id from jobs"""
[26]375
376                print 'job %s' %(job_id)
377
[65]378                for name, val in jobs[ job_id ].items():
[26]379
380                        print '\t%s = %s' %( name, val )
381
382        def daemon( self ):
[65]383                """Run as daemon forever"""
[26]384
385                # Fork the first child
386                #
387                pid = os.fork()
388                if pid > 0:
389                        sys.exit(0)  # end parrent
390
391                # creates a session and sets the process group ID
392                #
393                os.setsid()
394
395                # Fork the second child
396                #
397                pid = os.fork()
398                if pid > 0:
399                        sys.exit(0)  # end parrent
400
401                # Go to the root directory and set the umask
402                #
403                os.chdir('/')
404                os.umask(0)
405
406                sys.stdin.close()
407                sys.stdout.close()
408                sys.stderr.close()
409
410                os.open('/dev/null', 0)
411                os.dup(0)
412                os.dup(0)
413
414                self.run()
415
416        def run( self ):
[65]417                """Main thread"""
[26]418
419                while ( 1 ):
420               
[65]421                        self.jobs = self.getJobData( self.jobs )
422                        self.submitJobData( self.jobs )
[64]423                        time.sleep( TORQUE_POLL_INTERVAL )     
[26]424
425def printTime( ):
[65]426        """Print current time/date in human readable format for log/debug"""
[26]427
428        return time.strftime("%a, %d %b %Y %H:%M:%S")
429
430def debug_msg( level, msg ):
[65]431        """Print msg if at or above current debug level"""
[26]432
433        if (DEBUG_LEVEL >= level):
434                        sys.stderr.write( msg + '\n' )
435
[23]436def main():
[65]437        """Application start"""
[23]438
439        gather = PBSDataGatherer()
[26]440        if DAEMONIZE:
441                gather.daemon()
442        else:
443                gather.run()
[23]444
[65]445# w00t someone started me
446#
[23]447if __name__ == '__main__':
448        main()
Note: See TracBrowser for help on using the repository browser.