source: trunk/jobmond/jobmond.py @ 212

Last change on this file since 212 was 212, checked in by bastiaans, 16 years ago

jobmond/jobmond.conf:

  • import of jobmond conf

jobmond/jobmond.py:

  • added config file support
File size: 11.8 KB
Line 
1#!/usr/bin/env python
2
3import sys, getopt, ConfigParser
4
5def processArgs( args ):
6
7        SHORT_L = 'c:'
8        LONG_L = 'config='
9
10        config_filename = None
11
12        try:
13
14                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
15
16        except getopt.error, detail:
17
18                print detail
19                sys.exit(1)
20
21        for opt, value in opts:
22
23                if opt in [ '--config', '-c' ]:
24               
25                        config_filename = value
26
27        if not config_filename:
28
29                config_filename = '/etc/jobmond.conf'
30
31        return loadConfig( config_filename )
32
33def loadConfig( filename ):
34
35        cfg = ConfigParser.ConfigParser()
36
37        cfg.read( filename )
38
39        global DEBUG_LEVEL, DAEMONIZE, TORQUE_SERVER, TORQUE_POLL_INTERVAL, GMOND_CONF, DETECT_TIME_DIFFS
40
41
42        # Specify debugging level here;
43        #
44        # 10 = gemtric cmd's
45        DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
46
47        # Wether or not to run as a daemon in background
48        #
49        DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
50
51        # Which Torque server to monitor
52        #
53        TORQUE_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
54
55        # How many seconds interval for polling of jobs
56        #
57        # this will effect directly how accurate the
58        # end time of a job can be determined
59        #
60        TORQUE_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
61
62        # Alternate location of gmond.conf
63        #
64        # Default: /etc/gmond.conf
65        #
66        GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' )
67
68        # Wether or not to detect differences in
69        # time from Torque server and local time.
70        #
71        # Ideally both machines (if not the same)
72        # should have the same time (via ntp or whatever)
73        #
74        DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
75
76        return True
77
78from PBSQuery import PBSQuery
79
80import time, os, socket, string
81
82class DataProcessor:
83        """Class for processing of data"""
84
85        binary = '/usr/bin/gmetric'
86
87        def __init__( self, binary=None ):
88                """Remember alternate binary location if supplied"""
89
90                if binary:
91                        self.binary = binary
92
93                # Timeout for XML
94                #
95                # From ganglia's documentation:
96                #
97                # 'A metric will be deleted DMAX seconds after it is received, and
98                # DMAX=0 means eternal life.'
99
100                self.dmax = str( int( int( TORQUE_POLL_INTERVAL ) + 2 ) )
101
102                try:
103                        gmond_file = GMOND_CONF
104
105                except NameError:
106                        gmond_file = '/etc/gmond.conf'
107
108                if not os.path.exists( gmond_file ):
109                        debug_msg( 0, gmond_file + ' does not exist' )
110                        sys.exit( 1 )
111
112                incompatible = self.checkGmetricVersion()
113
114                if incompatible:
115                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
116                        sys.exit( 1 )
117
118        def checkGmetricVersion( self ):
119                """
120                Check version of gmetric is at least 3.0.1
121                for the syntax we use
122                """
123
124                for line in os.popen( self.binary + ' --version' ).readlines():
125
126                        line = line.split( ' ' )
127
128                        if len( line ) == 2 and str(line).find( 'gmetric' ) != -1:
129                       
130                                gmetric_version = line[1].split( '\n' )[0]
131
132                                version_major = int( gmetric_version.split( '.' )[0] )
133                                version_minor = int( gmetric_version.split( '.' )[1] )
134                                version_patch = int( gmetric_version.split( '.' )[2] )
135
136                                incompatible = 0
137
138                                if version_major < 3:
139
140                                        incompatible = 1
141                               
142                                elif version_major == 3:
143
144                                        if version_minor == 0:
145
146                                                if version_patch < 1:
147                                               
148                                                        incompatible = 1
149
150                return incompatible
151
152        def multicastGmetric( self, metricname, metricval, valtype='string' ):
153                """Call gmetric binary and multicast"""
154
155                cmd = self.binary
156
157                try:
158                        cmd = cmd + ' -c' + GMOND_CONF
159                except NameError:
160                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
161
162                cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
163
164                debug_msg( 10, printTime() + ' ' + cmd )
165                os.system( cmd )
166
167class DataGatherer:
168
169        jobs = { }
170
171        def __init__( self ):
172                """Setup appropriate variables"""
173
174                self.jobs = { }
175                self.timeoffset = 0
176                self.dp = DataProcessor()
177                self.initPbsQuery()
178
179        def initPbsQuery( self ):
180
181                self.pq = None
182                if( TORQUE_SERVER ):
183                        self.pq = PBSQuery( TORQUE_SERVER )
184                else:
185                        self.pq = PBSQuery()
186
187        def getAttr( self, attrs, name ):
188                """Return certain attribute from dictionary, if exists"""
189
190                if attrs.has_key( name ):
191                        return attrs[name]
192                else:
193                        return ''
194
195        def jobDataChanged( self, jobs, job_id, attrs ):
196                """Check if job with attrs and job_id in jobs has changed"""
197
198                if jobs.has_key( job_id ):
199                        oldData = jobs[ job_id ]       
200                else:
201                        return 1
202
203                for name, val in attrs.items():
204
205                        if oldData.has_key( name ):
206
207                                if oldData[ name ] != attrs[ name ]:
208
209                                        return 1
210
211                        else:
212                                return 1
213
214                return 0
215
216        def getJobData( self, known_jobs ):
217                """Gather all data on current jobs in Torque"""
218
219                if len( known_jobs ) > 0:
220                        jobs = known_jobs
221                else:
222                        jobs = { }
223
224                #self.initPbsQuery()
225       
226                #print self.pq.getnodes()
227       
228                joblist = self.pq.getjobs()
229
230                self.cur_time = time.time()
231
232                jobs_processed = [ ]
233
234                #self.printJobs( joblist )
235
236                for name, attrs in joblist.items():
237
238                        job_id = name.split( '.' )[0]
239
240                        jobs_processed.append( job_id )
241
242                        name = self.getAttr( attrs, 'Job_Name' )
243                        queue = self.getAttr( attrs, 'queue' )
244                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
245                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
246                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
247
248                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
249
250                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
251                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
252                        else:
253                                ppn = ''
254
255                        status = self.getAttr( attrs, 'job_state' )
256
257                        if status == 'R':
258                                start_timestamp = self.getAttr( attrs, 'mtime' )
259                                nodes = self.getAttr( attrs, 'exec_host' ).split( '+' )
260
261                                nodeslist = [ ]
262
263                                for node in nodes:
264                                        host = node.split( '/' )[0]
265
266                                        if nodeslist.count( host ) == 0:
267                                                nodeslist.append( host )
268
269                                if DETECT_TIME_DIFFS:
270
271                                        # If a job start if later than our current date,
272                                        # that must mean the Torque server's time is later
273                                        # than our local time.
274                               
275                                        if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
276
277                                                self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
278
279                        elif status == 'Q':
280                                start_timestamp = ''
281                                count_mynodes = 0
282                                numeric_node = 1
283
284                                for node in mynoderequest.split( '+' ):
285
286                                        nodepart = node.split( ':' )[0]
287
288                                        for letter in nodepart:
289
290                                                if letter not in string.digits:
291
292                                                        numeric_node = 0
293
294                                        if not numeric_node:
295                                                count_mynodes = count_mynodes + 1
296                                        else:
297                                                count_mynodes = count_mynodes + int( nodepart )
298                                               
299                                nodeslist = count_mynodes
300                        else:
301                                start_timestamp = ''
302                                nodeslist = ''
303
304                        myAttrs = { }
305                        myAttrs['name'] = name
306                        myAttrs['queue'] = queue
307                        myAttrs['owner'] = owner
308                        myAttrs['requested_time'] = requested_time
309                        myAttrs['requested_memory'] = requested_memory
310                        myAttrs['ppn'] = ppn
311                        myAttrs['status'] = status
312                        myAttrs['start_timestamp'] = start_timestamp
313                        myAttrs['reported'] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
314                        myAttrs['nodes'] = nodeslist
315                        myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' )
316                        myAttrs['poll_interval'] = TORQUE_POLL_INTERVAL
317
318                        if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
319                                jobs[ job_id ] = myAttrs
320
321                                #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
322
323                for id, attrs in jobs.items():
324
325                        if id not in jobs_processed:
326
327                                # This one isn't there anymore; toedeledoki!
328                                #
329                                del jobs[ id ]
330
331                return jobs
332
333        def submitJobData( self, jobs ):
334                """Submit job info list"""
335
336                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
337
338                # Now let's spread the knowledge
339                #
340                for jobid, jobattrs in jobs.items():
341
342                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
343
344                        for val in gmetric_val:
345                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
346
347        def makeNodeString( self, nodelist ):
348                """Make one big string of all hosts"""
349
350                node_str = None
351
352                for node in nodelist:
353                        if not node_str:
354                                node_str = node
355                        else:
356                                node_str = node_str + ';' + node
357
358                return node_str
359
360        def compileGmetricVal( self, jobid, jobattrs ):
361                """Create a val string for gmetric of jobinfo"""
362
363                appendList = [ ]
364                appendList.append( 'name=' + jobattrs['name'] )
365                appendList.append( 'queue=' + jobattrs['queue'] )
366                appendList.append( 'owner=' + jobattrs['owner'] )
367                appendList.append( 'requested_time=' + jobattrs['requested_time'] )
368
369                if jobattrs['requested_memory'] != '':
370                        appendList.append( 'requested_memory=' + jobattrs['requested_memory'] )
371
372                if jobattrs['ppn'] != '':
373                        appendList.append( 'ppn=' + jobattrs['ppn'] )
374
375                appendList.append( 'status=' + jobattrs['status'] )
376
377                if jobattrs['start_timestamp'] != '':
378                        appendList.append( 'start_timestamp=' + jobattrs['start_timestamp'] )
379
380                appendList.append( 'reported=' + jobattrs['reported'] )
381                appendList.append( 'poll_interval=' + str( jobattrs['poll_interval'] ) )
382                appendList.append( 'domain=' + jobattrs['domain'] )
383
384                if jobattrs['status'] == 'R':
385                        if len( jobattrs['nodes'] ) > 0:
386                                appendList.append( 'nodes=' + self.makeNodeString( jobattrs['nodes'] ) )
387                elif jobattrs['status'] == 'Q':
388                        appendList.append( 'nodes=' + str(jobattrs['nodes']) )
389
390                return self.makeAppendLists( appendList )
391
392        def makeAppendLists( self, append_list ):
393                """
394                Divide all values from append_list over strings with a maximum
395                size of 1400
396                """
397
398                app_lists = [ ]
399
400                mystr = None
401
402                for val in append_list:
403
404                        if not mystr:
405                                mystr = val
406                        else:
407                                if not self.checkValAppendMaxSize( mystr, val ):
408                                        mystr = mystr + ' ' + val
409                                else:
410                                        # Too big, new appenlist
411                                        app_lists.append( mystr )
412                                        mystr = val
413
414                app_lists.append( mystr )
415
416                return app_lists
417
418        def checkValAppendMaxSize( self, val, text ):
419                """Check if val + text size is not above 1400 (max msg size)"""
420
421                # Max frame size of a udp datagram is 1500 bytes
422                # removing misc header and gmetric stuff leaves about 1400 bytes
423                #
424                if len( val + text ) > 1400:
425                        return 1
426                else:
427                        return 0
428
429        def printJobs( self, jobs ):
430                """Print a jobinfo overview"""
431
432                for name, attrs in self.jobs.items():
433
434                        print 'job %s' %(name)
435
436                        for name, val in attrs.items():
437
438                                print '\t%s = %s' %( name, val )
439
440        def printJob( self, jobs, job_id ):
441                """Print job with job_id from jobs"""
442
443                print 'job %s' %(job_id)
444
445                for name, val in jobs[ job_id ].items():
446
447                        print '\t%s = %s' %( name, val )
448
449        def daemon( self ):
450                """Run as daemon forever"""
451
452                # Fork the first child
453                #
454                pid = os.fork()
455                if pid > 0:
456                        sys.exit(0)  # end parent
457
458                # creates a session and sets the process group ID
459                #
460                os.setsid()
461
462                # Fork the second child
463                #
464                pid = os.fork()
465                if pid > 0:
466                        sys.exit(0)  # end parent
467
468                # Go to the root directory and set the umask
469                #
470                os.chdir('/')
471                os.umask(0)
472
473                sys.stdin.close()
474                sys.stdout.close()
475                sys.stderr.close()
476
477                os.open('/dev/null', 0)
478                os.dup(0)
479                os.dup(0)
480
481                self.run()
482
483        def run( self ):
484                """Main thread"""
485
486                while ( 1 ):
487               
488                        self.jobs = self.getJobData( self.jobs )
489                        self.submitJobData( self.jobs )
490                        time.sleep( TORQUE_POLL_INTERVAL )     
491
492def printTime( ):
493        """Print current time/date in human readable format for log/debug"""
494
495        return time.strftime("%a, %d %b %Y %H:%M:%S")
496
497def debug_msg( level, msg ):
498        """Print msg if at or above current debug level"""
499
500        if (DEBUG_LEVEL >= level):
501                        sys.stderr.write( msg + '\n' )
502
503def main():
504        """Application start"""
505
506        if not processArgs( sys.argv[1:] ):
507                sys.exit( 1 )
508
509        gather = DataGatherer()
510        if DAEMONIZE:
511                gather.daemon()
512        else:
513                gather.run()
514
515# w00t someone started me
516#
517if __name__ == '__main__':
518        main()
Note: See TracBrowser for help on using the repository browser.