source: trunk/plugin/togap.py @ 166

Last change on this file since 166 was 166, checked in by bastiaans, 17 years ago

plugin/togap.py:

  • Multicast port configurable
File size: 10.3 KB
Line 
1#!/usr/bin/env python
2
3# Specify debugging level here;
4#
5# 10 = gemtric cmd's
6DEBUG_LEVEL = 0
7
8# Wether or not to run as a daemon in background
9#
10DAEMONIZE = 1
11
12# Which multicast port (and gmond) to transmit to
13#
14GANGLIA_MULTICAST_PORT = 8649
15
16# Which Torque server to monitor
17#
18TORQUE_SERVER = 'localhost'
19
20# How many seconds interval for polling of jobs
21#
22# this will effect directly how accurate the
23# end time of a job can be determined
24#
25TORQUE_POLL_INTERVAL = 10
26
27# Alternate location of gmond.conf
28#
29# Default: /etc/gmond.conf
30#
31#GMOND_CONF = '/etc/gmond.conf'
32
33from PBSQuery import PBSQuery
34import sys
35import time
36import os
37import socket
38import string
39
40class DataProcessor:
41        """Class for processing of data"""
42
43        binary = '/usr/bin/gmetric'
44
45        def __init__( self, binary=None ):
46                """Remember alternate binary location if supplied"""
47
48                if binary:
49                        self.binary = binary
50
51                # Timeout for XML
52                #
53                # From ganglia's documentation:
54                #
55                # 'A metric will be deleted DMAX seconds after it is received, and
56                # DMAX=0 means eternal life.'
57
58                self.dmax = str( int( TORQUE_POLL_INTERVAL ) )
59
60                try:
61                        gmond_file = GMOND_CONF
62
63                except NameError:
64                        gmond_file = '/etc/gmond.conf'
65
66                if not os.path.exists( gmond_file ):
67                        debug_msg( 0, gmond_file + ' does not exist' )
68                        sys.exit( 1 )
69
70                incompatible = self.checkGmetricVersion()
71
72                if incompatible:
73                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
74                        sys.exit( 1 )
75
76        def checkGmetricVersion( self ):
77                """
78                Check version of gmetric is at least 3.0.1
79                for the syntax we use
80                """
81
82                for line in os.popen( self.binary + ' --version' ).readlines():
83
84                        line = line.split( ' ' )
85
86                        if len( line ) == 2 and str(line).find( 'gmetric' ) != -1:
87                       
88                                gmetric_version = line[1].split( '\n' )[0]
89
90                                version_major = int( gmetric_version.split( '.' )[0] )
91                                version_minor = int( gmetric_version.split( '.' )[1] )
92                                version_patch = int( gmetric_version.split( '.' )[2] )
93
94                                incompatible = 0
95
96                                if version_major < 3:
97
98                                        incompatible = 1
99                               
100                                elif version_major == 3:
101
102                                        if version_minor == 0:
103
104                                                if version_patch < 1:
105                                               
106                                                        incompatible = 1
107
108                return incompatible
109
110        def multicastGmetric( self, metricname, metricval, valtype='string' ):
111                """Call gmetric binary and multicast"""
112
113                cmd = self.binary
114
115                try:
116                        cmd = cmd + ' -c' + GMOND_CONF
117                except NameError:
118                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
119
120                cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
121
122                debug_msg( 10, printTime() + ' ' + cmd )
123                os.system( cmd )
124
125class PBSDataGatherer:
126
127        jobs = { }
128
129        def __init__( self ):
130                """Setup appropriate variables"""
131
132                self.jobs = { }
133                self.dp = DataProcessor()
134                self.initPbsQuery()
135
136        def initPbsQuery( self ):
137
138                self.pq = None
139                if( TORQUE_SERVER ):
140                        self.pq = PBSQuery( TORQUE_SERVER )
141                else
142                        self.pq = PBSQuery()
143
144        def getAttr( self, attrs, name ):
145                """Return certain attribute from dictionary, if exists"""
146
147                if attrs.has_key( name ):
148                        return attrs[name]
149                else:
150                        return ''
151
152        def jobDataChanged( self, jobs, job_id, attrs ):
153                """Check if job with attrs and job_id in jobs has changed"""
154
155                if jobs.has_key( job_id ):
156                        oldData = jobs[ job_id ]       
157                else:
158                        return 1
159
160                for name, val in attrs.items():
161
162                        if oldData.has_key( name ):
163
164                                if oldData[ name ] != attrs[ name ]:
165
166                                        return 1
167
168                        else:
169                                return 1
170
171                return 0
172
173        def getJobData( self, known_jobs ):
174                """Gather all data on current jobs in Torque"""
175
176                if len( known_jobs ) > 0:
177                        jobs = known_jobs
178                else:
179                        jobs = { }
180
181                #self.initPbsQuery()
182       
183                #print self.pq.getnodes()
184       
185                joblist = self.pq.getjobs()
186
187                self.cur_time = time.time()
188
189                jobs_processed = [ ]
190
191                #self.printJobs( joblist )
192
193                for name, attrs in joblist.items():
194
195                        job_id = name.split( '.' )[0]
196
197                        jobs_processed.append( job_id )
198
199                        name = self.getAttr( attrs, 'Job_Name' )
200                        queue = self.getAttr( attrs, 'queue' )
201                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
202                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
203                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
204
205                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
206
207                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
208                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
209                        else:
210                                ppn = ''
211
212                        status = self.getAttr( attrs, 'job_state' )
213
214                        if status == 'R':
215                                start_timestamp = self.getAttr( attrs, 'mtime' )
216                                nodes = self.getAttr( attrs, 'exec_host' ).split( '+' )
217
218                                nodeslist = [ ]
219
220                                for node in nodes:
221                                        host = node.split( '/' )[0]
222
223                                        if nodeslist.count( host ) == 0:
224                                                nodeslist.append( host )
225
226                        elif status == 'Q':
227                                start_timestamp = ''
228                                count_mynodes = 0
229                                numeric_node = 1
230
231                                for node in mynoderequest.split( '+' ):
232
233                                        nodepart = node.split( ':' )[0]
234
235                                        for letter in nodepart:
236
237                                                if letter not in string.digits:
238
239                                                        numeric_node = 0
240
241                                        if not numeric_node:
242                                                count_mynodes = count_mynodes + 1
243                                        else:
244                                                count_mynodes = count_mynodes + int( nodepart )
245                                               
246                                nodeslist = count_mynodes
247
248                        myAttrs = { }
249                        myAttrs['name'] = name
250                        myAttrs['queue'] = queue
251                        myAttrs['owner'] = owner
252                        myAttrs['requested_time'] = requested_time
253                        myAttrs['requested_memory'] = requested_memory
254                        myAttrs['ppn'] = ppn
255                        myAttrs['status'] = status
256                        myAttrs['start_timestamp'] = start_timestamp
257                        myAttrs['reported'] = str( int( self.cur_time ) )
258                        myAttrs['nodes'] = nodeslist
259                        myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' )
260                        myAttrs['poll_interval'] = TORQUE_POLL_INTERVAL
261
262                        if self.jobDataChanged( jobs, job_id, myAttrs ):
263                                jobs[ job_id ] = myAttrs
264
265                                #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
266
267                for id, attrs in jobs.items():
268
269                        if id not in jobs_processed:
270
271                                # This one isn't there anymore; toedeledoki!
272                                #
273                                del jobs[ id ]
274
275                return jobs
276
277        def submitJobData( self, jobs ):
278                """Submit job info list"""
279
280                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( int( self.cur_time ) ) )
281
282                # Now let's spread the knowledge
283                #
284                for jobid, jobattrs in jobs.items():
285
286                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
287
288                        for val in gmetric_val:
289                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
290
291        def makeNodeString( self, nodelist ):
292                """Make one big string of all hosts"""
293
294                node_str = None
295
296                for node in nodelist:
297                        if not node_str:
298                                node_str = node
299                        else:
300                                node_str = node_str + ';' + node
301
302                return node_str
303
304        def compileGmetricVal( self, jobid, jobattrs ):
305                """Create a val string for gmetric of jobinfo"""
306
307                appendList = [ ]
308                appendList.append( 'name=' + jobattrs['name'] )
309                appendList.append( 'queue=' + jobattrs['queue'] )
310                appendList.append( 'owner=' + jobattrs['owner'] )
311                appendList.append( 'requested_time=' + jobattrs['requested_time'] )
312
313                if jobattrs['requested_memory'] != '':
314                        appendList.append( 'requested_memory=' + jobattrs['requested_memory'] )
315
316                if jobattrs['ppn'] != '':
317                        appendList.append( 'ppn=' + jobattrs['ppn'] )
318
319                appendList.append( 'status=' + jobattrs['status'] )
320
321                if jobattrs['start_timestamp'] != '':
322                        appendList.append( 'start_timestamp=' + jobattrs['start_timestamp'] )
323
324                appendList.append( 'reported=' + jobattrs['reported'] )
325                appendList.append( 'poll_interval=' + str( jobattrs['poll_interval'] ) )
326                appendList.append( 'domain=' + jobattrs['domain'] )
327
328                if jobattrs['status'] == 'R':
329                        if len( jobattrs['nodes'] ) > 0:
330                                appendList.append( 'nodes=' + self.makeNodeString( jobattrs['nodes'] ) )
331                elif jobattrs['status'] == 'Q':
332                        appendList.append( 'nodes=' + str(jobattrs['nodes']) )
333
334                return self.makeAppendLists( appendList )
335
336        def makeAppendLists( self, append_list ):
337                """
338                Divide all values from append_list over strings with a maximum
339                size of 1400
340                """
341
342                app_lists = [ ]
343
344                mystr = None
345
346                for val in append_list:
347
348                        if not mystr:
349                                mystr = val
350                        else:
351                                if not self.checkValAppendMaxSize( mystr, val ):
352                                        mystr = mystr + ' ' + val
353                                else:
354                                        # Too big, new appenlist
355                                        app_lists.append( mystr )
356                                        mystr = val
357
358                app_lists.append( mystr )
359
360                return app_lists
361
362        def checkValAppendMaxSize( self, val, text ):
363                """Check if val + text size is not above 1400 (max msg size)"""
364
365                # Max frame size of a udp datagram is 1500 bytes
366                # removing misc header and gmetric stuff leaves about 1400 bytes
367                #
368                if len( val + text ) > 1400:
369                        return 1
370                else:
371                        return 0
372
373        def printJobs( self, jobs ):
374                """Print a jobinfo overview"""
375
376                for name, attrs in self.jobs.items():
377
378                        print 'job %s' %(name)
379
380                        for name, val in attrs.items():
381
382                                print '\t%s = %s' %( name, val )
383
384        def printJob( self, jobs, job_id ):
385                """Print job with job_id from jobs"""
386
387                print 'job %s' %(job_id)
388
389                for name, val in jobs[ job_id ].items():
390
391                        print '\t%s = %s' %( name, val )
392
393        def daemon( self ):
394                """Run as daemon forever"""
395
396                # Fork the first child
397                #
398                pid = os.fork()
399                if pid > 0:
400                        sys.exit(0)  # end parrent
401
402                # creates a session and sets the process group ID
403                #
404                os.setsid()
405
406                # Fork the second child
407                #
408                pid = os.fork()
409                if pid > 0:
410                        sys.exit(0)  # end parrent
411
412                # Go to the root directory and set the umask
413                #
414                os.chdir('/')
415                os.umask(0)
416
417                sys.stdin.close()
418                sys.stdout.close()
419                sys.stderr.close()
420
421                os.open('/dev/null', 0)
422                os.dup(0)
423                os.dup(0)
424
425                self.run()
426
427        def run( self ):
428                """Main thread"""
429
430                while ( 1 ):
431               
432                        self.jobs = self.getJobData( self.jobs )
433                        self.submitJobData( self.jobs )
434                        time.sleep( TORQUE_POLL_INTERVAL )     
435
436def printTime( ):
437        """Print current time/date in human readable format for log/debug"""
438
439        return time.strftime("%a, %d %b %Y %H:%M:%S")
440
441def debug_msg( level, msg ):
442        """Print msg if at or above current debug level"""
443
444        if (DEBUG_LEVEL >= level):
445                        sys.stderr.write( msg + '\n' )
446
447def main():
448        """Application start"""
449
450        gather = PBSDataGatherer()
451        if DAEMONIZE:
452                gather.daemon()
453        else:
454                gather.run()
455
456# w00t someone started me
457#
458if __name__ == '__main__':
459        main()
Note: See TracBrowser for help on using the repository browser.