source: trunk/plugin/togap.py @ 166

Last change on this file since 166 was 166, checked in by bastiaans, 19 years ago

plugin/togap.py:

  • Multicast port configurable
File size: 10.3 KB
RevLine 
[23]1#!/usr/bin/env python
2
[26]3# Specify debugging level here;
4#
[101]5# 10 = gemtric cmd's
[125]6DEBUG_LEVEL = 0
[26]7
8# Wether or not to run as a daemon in background
9#
[125]10DAEMONIZE = 1
[26]11
[166]12# Which multicast port (and gmond) to transmit to
13#
14GANGLIA_MULTICAST_PORT = 8649
15
[165]16# Which Torque server to monitor
17#
18TORQUE_SERVER = 'localhost'
19
[64]20# How many seconds interval for polling of jobs
[61]21#
[64]22# this will effect directly how accurate the
23# end time of a job can be determined
24#
25TORQUE_POLL_INTERVAL = 10
[61]26
[68]27# Alternate location of gmond.conf
28#
29# Default: /etc/gmond.conf
30#
31#GMOND_CONF = '/etc/gmond.conf'
32
[23]33from PBSQuery import PBSQuery
[26]34import sys
35import time
[65]36import os
[67]37import socket
[65]38import string
[23]39
[61]40class DataProcessor:
[68]41        """Class for processing of data"""
[61]42
43        binary = '/usr/bin/gmetric'
44
45        def __init__( self, binary=None ):
[68]46                """Remember alternate binary location if supplied"""
[61]47
48                if binary:
49                        self.binary = binary
50
[80]51                # Timeout for XML
52                #
53                # From ganglia's documentation:
54                #
55                # 'A metric will be deleted DMAX seconds after it is received, and
56                # DMAX=0 means eternal life.'
[61]57
[88]58                self.dmax = str( int( TORQUE_POLL_INTERVAL ) )
[80]59
[68]60                try:
61                        gmond_file = GMOND_CONF
62
63                except NameError:
64                        gmond_file = '/etc/gmond.conf'
65
66                if not os.path.exists( gmond_file ):
67                        debug_msg( 0, gmond_file + ' does not exist' )
68                        sys.exit( 1 )
69
[69]70                incompatible = self.checkGmetricVersion()
[61]71
[65]72                if incompatible:
73                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
74                        sys.exit( 1 )
75
76        def checkGmetricVersion( self ):
[68]77                """
78                Check version of gmetric is at least 3.0.1
79                for the syntax we use
80                """
[65]81
82                for line in os.popen( self.binary + ' --version' ).readlines():
83
84                        line = line.split( ' ' )
85
[69]86                        if len( line ) == 2 and str(line).find( 'gmetric' ) != -1:
[65]87                       
[69]88                                gmetric_version = line[1].split( '\n' )[0]
[65]89
[69]90                                version_major = int( gmetric_version.split( '.' )[0] )
91                                version_minor = int( gmetric_version.split( '.' )[1] )
92                                version_patch = int( gmetric_version.split( '.' )[2] )
[65]93
94                                incompatible = 0
95
96                                if version_major < 3:
97
98                                        incompatible = 1
99                               
100                                elif version_major == 3:
101
102                                        if version_minor == 0:
103
104                                                if version_patch < 1:
105                                               
[91]106                                                        incompatible = 1
[65]107
108                return incompatible
109
[75]110        def multicastGmetric( self, metricname, metricval, valtype='string' ):
[68]111                """Call gmetric binary and multicast"""
[65]112
113                cmd = self.binary
114
[61]115                try:
116                        cmd = cmd + ' -c' + GMOND_CONF
117                except NameError:
[64]118                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
[61]119
[80]120                cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[61]121
[101]122                debug_msg( 10, printTime() + ' ' + cmd )
[69]123                os.system( cmd )
[61]124
[23]125class PBSDataGatherer:
126
[61]127        jobs = { }
128
[23]129        def __init__( self ):
[68]130                """Setup appropriate variables"""
[23]131
[26]132                self.jobs = { }
[61]133                self.dp = DataProcessor()
[91]134                self.initPbsQuery()
[23]135
[91]136        def initPbsQuery( self ):
137
138                self.pq = None
[165]139                if( TORQUE_SERVER ):
140                        self.pq = PBSQuery( TORQUE_SERVER )
141                else
142                        self.pq = PBSQuery()
[91]143
[26]144        def getAttr( self, attrs, name ):
[68]145                """Return certain attribute from dictionary, if exists"""
[26]146
147                if attrs.has_key( name ):
148                        return attrs[name]
149                else:
150                        return ''
151
152        def jobDataChanged( self, jobs, job_id, attrs ):
[68]153                """Check if job with attrs and job_id in jobs has changed"""
[26]154
155                if jobs.has_key( job_id ):
156                        oldData = jobs[ job_id ]       
157                else:
158                        return 1
159
160                for name, val in attrs.items():
161
162                        if oldData.has_key( name ):
163
164                                if oldData[ name ] != attrs[ name ]:
165
166                                        return 1
167
168                        else:
169                                return 1
170
171                return 0
172
[65]173        def getJobData( self, known_jobs ):
[68]174                """Gather all data on current jobs in Torque"""
[26]175
[65]176                if len( known_jobs ) > 0:
177                        jobs = known_jobs
178                else:
179                        jobs = { }
[26]180
[101]181                #self.initPbsQuery()
[125]182       
183                #print self.pq.getnodes()
184       
[26]185                joblist = self.pq.getjobs()
186
[69]187                self.cur_time = time.time()
[68]188
[26]189                jobs_processed = [ ]
190
[125]191                #self.printJobs( joblist )
192
[26]193                for name, attrs in joblist.items():
194
195                        job_id = name.split( '.' )[0]
196
197                        jobs_processed.append( job_id )
[61]198
[26]199                        name = self.getAttr( attrs, 'Job_Name' )
200                        queue = self.getAttr( attrs, 'queue' )
201                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
202                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
203                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
[95]204
[26]205                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
[95]206
[26]207                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
208                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
209                        else:
210                                ppn = ''
[95]211
[26]212                        status = self.getAttr( attrs, 'job_state' )
[25]213
[95]214                        if status == 'R':
215                                start_timestamp = self.getAttr( attrs, 'mtime' )
216                                nodes = self.getAttr( attrs, 'exec_host' ).split( '+' )
[133]217
218                                nodeslist = [ ]
219
220                                for node in nodes:
221                                        host = node.split( '/' )[0]
222
223                                        if nodeslist.count( host ) == 0:
224                                                nodeslist.append( host )
225
226                        elif status == 'Q':
[95]227                                start_timestamp = ''
[133]228                                count_mynodes = 0
229                                numeric_node = 1
[95]230
[133]231                                for node in mynoderequest.split( '+' ):
[67]232
[133]233                                        nodepart = node.split( ':' )[0]
[67]234
[133]235                                        for letter in nodepart:
[67]236
[133]237                                                if letter not in string.digits:
238
239                                                        numeric_node = 0
240
241                                        if not numeric_node:
242                                                count_mynodes = count_mynodes + 1
243                                        else:
244                                                count_mynodes = count_mynodes + int( nodepart )
245                                               
[134]246                                nodeslist = count_mynodes
[133]247
[26]248                        myAttrs = { }
249                        myAttrs['name'] = name
250                        myAttrs['queue'] = queue
251                        myAttrs['owner'] = owner
252                        myAttrs['requested_time'] = requested_time
253                        myAttrs['requested_memory'] = requested_memory
254                        myAttrs['ppn'] = ppn
255                        myAttrs['status'] = status
256                        myAttrs['start_timestamp'] = start_timestamp
[75]257                        myAttrs['reported'] = str( int( self.cur_time ) )
[67]258                        myAttrs['nodes'] = nodeslist
259                        myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' )
[80]260                        myAttrs['poll_interval'] = TORQUE_POLL_INTERVAL
[26]261
262                        if self.jobDataChanged( jobs, job_id, myAttrs ):
263                                jobs[ job_id ] = myAttrs
[61]264
[101]265                                #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
[26]266
[76]267                for id, attrs in jobs.items():
268
269                        if id not in jobs_processed:
270
271                                # This one isn't there anymore; toedeledoki!
272                                #
273                                del jobs[ id ]
274
[65]275                return jobs
276
277        def submitJobData( self, jobs ):
278                """Submit job info list"""
279
[69]280                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( int( self.cur_time ) ) )
281
[61]282                # Now let's spread the knowledge
283                #
284                for jobid, jobattrs in jobs.items():
285
[95]286                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
[61]287
[95]288                        for val in gmetric_val:
289                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
[61]290
[67]291        def makeNodeString( self, nodelist ):
[68]292                """Make one big string of all hosts"""
[67]293
294                node_str = None
295
296                for node in nodelist:
297                        if not node_str:
298                                node_str = node
299                        else:
300                                node_str = node_str + ';' + node
301
302                return node_str
303
[65]304        def compileGmetricVal( self, jobid, jobattrs ):
305                """Create a val string for gmetric of jobinfo"""
[61]306
[80]307                appendList = [ ]
308                appendList.append( 'name=' + jobattrs['name'] )
309                appendList.append( 'queue=' + jobattrs['queue'] )
310                appendList.append( 'owner=' + jobattrs['owner'] )
311                appendList.append( 'requested_time=' + jobattrs['requested_time'] )
[95]312
313                if jobattrs['requested_memory'] != '':
314                        appendList.append( 'requested_memory=' + jobattrs['requested_memory'] )
315
316                if jobattrs['ppn'] != '':
317                        appendList.append( 'ppn=' + jobattrs['ppn'] )
318
[80]319                appendList.append( 'status=' + jobattrs['status'] )
[95]320
321                if jobattrs['start_timestamp'] != '':
322                        appendList.append( 'start_timestamp=' + jobattrs['start_timestamp'] )
323
[80]324                appendList.append( 'reported=' + jobattrs['reported'] )
[85]325                appendList.append( 'poll_interval=' + str( jobattrs['poll_interval'] ) )
[80]326                appendList.append( 'domain=' + jobattrs['domain'] )
[26]327
[134]328                if jobattrs['status'] == 'R':
329                        if len( jobattrs['nodes'] ) > 0:
330                                appendList.append( 'nodes=' + self.makeNodeString( jobattrs['nodes'] ) )
331                elif jobattrs['status'] == 'Q':
332                        appendList.append( 'nodes=' + str(jobattrs['nodes']) )
[95]333
[65]334                return self.makeAppendLists( appendList )
335
336        def makeAppendLists( self, append_list ):
[68]337                """
338                Divide all values from append_list over strings with a maximum
339                size of 1400
340                """
[65]341
342                app_lists = [ ]
343
344                mystr = None
345
346                for val in append_list:
347
348                        if not mystr:
349                                mystr = val
350                        else:
351                                if not self.checkValAppendMaxSize( mystr, val ):
352                                        mystr = mystr + ' ' + val
353                                else:
354                                        # Too big, new appenlist
355                                        app_lists.append( mystr )
356                                        mystr = val
357
358                app_lists.append( mystr )
359
360                return app_lists
361
362        def checkValAppendMaxSize( self, val, text ):
363                """Check if val + text size is not above 1400 (max msg size)"""
364
[69]365                # Max frame size of a udp datagram is 1500 bytes
366                # removing misc header and gmetric stuff leaves about 1400 bytes
367                #
[65]368                if len( val + text ) > 1400:
369                        return 1
370                else:
371                        return 0
372
[61]373        def printJobs( self, jobs ):
[65]374                """Print a jobinfo overview"""
375
[26]376                for name, attrs in self.jobs.items():
377
378                        print 'job %s' %(name)
379
380                        for name, val in attrs.items():
381
382                                print '\t%s = %s' %( name, val )
383
[61]384        def printJob( self, jobs, job_id ):
[65]385                """Print job with job_id from jobs"""
[26]386
387                print 'job %s' %(job_id)
388
[65]389                for name, val in jobs[ job_id ].items():
[26]390
391                        print '\t%s = %s' %( name, val )
392
393        def daemon( self ):
[65]394                """Run as daemon forever"""
[26]395
396                # Fork the first child
397                #
398                pid = os.fork()
399                if pid > 0:
400                        sys.exit(0)  # end parrent
401
402                # creates a session and sets the process group ID
403                #
404                os.setsid()
405
406                # Fork the second child
407                #
408                pid = os.fork()
409                if pid > 0:
410                        sys.exit(0)  # end parrent
411
412                # Go to the root directory and set the umask
413                #
414                os.chdir('/')
415                os.umask(0)
416
417                sys.stdin.close()
418                sys.stdout.close()
419                sys.stderr.close()
420
421                os.open('/dev/null', 0)
422                os.dup(0)
423                os.dup(0)
424
425                self.run()
426
427        def run( self ):
[65]428                """Main thread"""
[26]429
430                while ( 1 ):
431               
[65]432                        self.jobs = self.getJobData( self.jobs )
433                        self.submitJobData( self.jobs )
[64]434                        time.sleep( TORQUE_POLL_INTERVAL )     
[26]435
436def printTime( ):
[65]437        """Print current time/date in human readable format for log/debug"""
[26]438
439        return time.strftime("%a, %d %b %Y %H:%M:%S")
440
441def debug_msg( level, msg ):
[65]442        """Print msg if at or above current debug level"""
[26]443
444        if (DEBUG_LEVEL >= level):
445                        sys.stderr.write( msg + '\n' )
446
[23]447def main():
[65]448        """Application start"""
[23]449
450        gather = PBSDataGatherer()
[26]451        if DAEMONIZE:
452                gather.daemon()
453        else:
454                gather.run()
[23]455
[65]456# w00t someone started me
457#
[23]458if __name__ == '__main__':
459        main()
Note: See TracBrowser for help on using the repository browser.