source: trunk/jobmond/jobmond.py @ 667

Last change on this file since 667 was 667, checked in by ramonb, 9 years ago

jobmond.py:

  • added lots of configparsing debug prints
  • fix to comment parsing when /* and */ on same line
  • ignore /* for lines starting with include: can be wildcard
  • fix to section { and } parsing: only close section when {}count hits 0
  • fix to include file parsing: should be string not list
  • include line should be split, not strip
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 35.7 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2012  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobmond.py 667 2012-09-14 12:45:50Z ramonb $
22#
23
24import sys, getopt, ConfigParser, time, os, socket, string, re
25import xdrlib, socket, syslog, xml, xml.sax
26from types import *
27from glob import glob
28
29VERSION='TRUNK+SVN'
30
31def usage( ver ):
32
33    print 'jobmond %s' %VERSION
34
35    if ver:
36        return 0
37
38    print
39    print 'Purpose:'
40    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
41    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
42    print
43    print 'Usage:   jobmond [OPTIONS]'
44    print
45    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
46    print '  -p, --pidfile=FILE Use pid file to store the process id'
47    print '  -h, --help     Print help and exit'
48    print '  -v, --version          Print version and exit'
49    print
50
51def processArgs( args ):
52
53    SHORT_L     = 'p:hvc:'
54    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
55
56    global PIDFILE
57    PIDFILE     = None
58
59    config_filename = '/etc/jobmond.conf'
60
61    try:
62
63        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
64
65    except getopt.GetoptError, detail:
66
67        print detail
68        usage()
69        sys.exit( 1 )
70
71    for opt, value in opts:
72
73        if opt in [ '--config', '-c' ]:
74       
75            config_filename = value
76
77        if opt in [ '--pidfile', '-p' ]:
78
79            PIDFILE     = value
80       
81        if opt in [ '--help', '-h' ]:
82 
83            usage( False )
84            sys.exit( 0 )
85
86        if opt in [ '--version', '-v' ]:
87
88            usage( True )
89            sys.exit( 0 )
90
91    return loadConfig( config_filename )
92
93class GangliaConfigParser:
94
95    def __init__( self, config_file ):
96
97        self.config_file     = config_file
98        self.include_parsers = [ ]
99
100        if not os.path.exists( self.config_file ):
101
102            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
103            sys.exit( 1 )
104
105    def removeHaakjes( self, value ):
106
107        clean_value = value
108        clean_value = clean_value.replace( "(", "" )
109        clean_value = clean_value.replace( ")", "" )
110        clean_value = clean_value.strip()
111
112        return clean_value
113
114    def removeQuotes( self, value ):
115
116        clean_value = value
117        clean_value = clean_value.replace( "'", "" )
118        clean_value = clean_value.replace( '"', '' )
119        clean_value = clean_value.strip()
120
121        return clean_value
122
123    def removeComments( self, value ):
124
125        clean_value = value
126
127        if clean_value.find('#') != -1:
128
129            clean_value = value[:value.find('#')]
130
131        if clean_value.find('//') != -1:
132
133            clean_value = value[:value.find('//')]
134
135        return clean_value
136
137    def getVal( self, section, valname ):
138
139        debug_msg( 10, "Parsing: '" + self.config_file + "' - searching for section: " + section + " value: " + valname )
140
141        cfg_fp      = open( self.config_file )
142        cfg_lines   = cfg_fp.readlines()
143        cfg_fp.close()
144
145        section_start = False
146        section_found = False
147        include_start = False
148        value         = None
149        comment_start = False
150
151        acc_count     = 0
152
153        for line in cfg_lines:
154
155            line = line.strip()
156            debug_msg( 10, "line org: " + line )
157            line = self.removeComments( line )
158            debug_msg( 10, "line remove comments: " + line )
159
160            if line.find( '/*' ) != -1 and line.find( 'include', 0, 7 ):
161
162                debug_msg( 10, "line comment start */: " + line )
163
164                if line.find( '*/' ) != -1:
165
166                    begin_line = line[:line.find('/*')]
167                    rest_line = line[(line.find('*/')+2):]
168
169                    line = begin_line + rest_line
170
171                    debug_msg( 10, "line comment end */: " + line )
172
173                else:
174
175                    line = line[:line.find('/*')]
176                    comment_start = True
177
178                    debug_msg( 10, "line find /*: " + line )
179
180                debug_msg( 10, "line find /*: " + line )
181
182            if line.find( '*/' ) != -1 and comment_start:
183
184                line = line[(line.find('*/')+2):]
185                comment_start = False
186                debug_msg( 10, "line comment end*/: " + line )
187
188            if comment_start:
189
190                debug_msg( 10, "line ignoring comment: " + line )
191                continue
192
193            if line.find( 'include' ) != -1:
194
195                debug_msg( 10, "line include: " + line )
196                line = self.removeHaakjes( line )
197                line = self.removeQuotes( line )
198                debug_msg( 10, "line removeHaakjes: " + line )
199
200                include_line  = line.split(' ')[1]
201
202                debug_msg( 10, "line includes: " + str( include_line ) )
203
204                for include_file in glob( include_line ):
205
206                    debug_msg( 10, "include file found: '" + include_file + "'" )
207
208                    include_parser = GangliaConfigParser( include_file )
209                    include_getval = include_parser.getStr( section, valname )
210
211                    if include_getval:
212
213                        value = include_getval
214                        #break ? FIXME value can be overriden by other includes: perhaps users problem
215
216                        debug_msg( 10, "VALUE found: '" + value + "'" )
217
218                    del include_parser
219
220            if line.find( section ) != -1:
221
222                section_found   = True
223
224                debug_msg( 10, "section start: '" + section + "'" )
225
226            if line.find( '{' ) != -1 and section_found:
227
228                section_start   = True
229                acc_count       = acc_count + 1
230
231            if line.find( '}' ) != -1 and section_found:
232
233                acc_count       = acc_count - 1
234
235                if acc_count == 0:
236
237                    debug_msg( 10, "section closed: '" + section + "'" )
238                    section_start   = False
239                    section_found   = False
240
241            if line.find( valname ) != -1 and section_start:
242
243                value       = string.join( line.split( '=' )[1:], '' ).strip()
244
245        debug_msg( 10, "Parsing done: '" + self.config_file + "' " )
246
247        return value
248
249    def getInt( self, section, valname ):
250
251        value   = self.getVal( section, valname )
252
253        if not value:
254            return False
255
256        value   = self.removeQuotes( value )
257
258        return int( value )
259
260    def getStr( self, section, valname ):
261
262        value   = self.getVal( section, valname )
263
264        if not value:
265            return False
266
267        value   = self.removeQuotes( value )
268
269        return str( value )
270
271def findGmetric():
272
273    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
274
275        guess   = '%s/%s' %( dir, 'gmetric' )
276
277        if os.path.exists( guess ):
278
279            return guess
280
281    return False
282
283def loadConfig( filename ):
284
285    def getlist( cfg_string ):
286
287        my_list = [ ]
288
289        for item_txt in cfg_string.split( ',' ):
290
291                sep_char = None
292
293                item_txt = item_txt.strip()
294
295                for s_char in [ "'", '"' ]:
296
297                        if item_txt.find( s_char ) != -1:
298
299                                if item_txt.count( s_char ) != 2:
300
301                                        print 'Missing quote: %s' %item_txt
302                                        sys.exit( 1 )
303
304                                else:
305
306                                        sep_char = s_char
307                                        break
308
309                if sep_char:
310
311                        item_txt = item_txt.split( sep_char )[1]
312
313                my_list.append( item_txt )
314
315        return my_list
316
317    cfg     = ConfigParser.ConfigParser()
318
319    cfg.read( filename )
320
321    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
322    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
323    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
324    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
325
326    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
327
328    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
329
330    SYSLOG_LEVEL    = -1
331    SYSLOG_FACILITY = None
332
333    try:
334        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
335
336    except ConfigParser.NoOptionError:
337
338        USE_SYSLOG  = True
339
340        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
341
342    if USE_SYSLOG:
343
344        try:
345            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
346
347        except ConfigParser.NoOptionError:
348
349            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
350            SYSLOG_LEVEL    = 0
351
352        try:
353
354            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
355
356        except ConfigParser.NoOptionError:
357
358            SYSLOG_FACILITY = syslog.LOG_DAEMON
359
360            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
361
362    try:
363
364        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
365
366    except ConfigParser.NoOptionError:
367
368        # Backwards compatibility for old configs
369        #
370
371        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
372        api_guess       = 'pbs'
373   
374    try:
375   
376        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
377
378    except ConfigParser.NoOptionError:
379
380        # Backwards compatibility for old configs
381        #
382
383        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
384        api_guess       = 'pbs'
385   
386    try:
387
388        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
389
390    except ConfigParser.NoOptionError:
391
392        # Not specified: assume /etc/ganglia/gmond.conf
393        #
394        GMOND_CONF      = '/etc/ganglia/gmond.conf'
395
396    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
397
398    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
399    #
400    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
401
402    if not gmetric_dest_ip:
403
404        # Maybe unicast target then
405        #
406        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
407
408        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
409
410    if gmetric_dest_ip and gmetric_dest_port:
411
412        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
413    else:
414
415        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
416
417        # Couldn't figure it out: let's see if it's in our jobmond.conf
418        #
419        try:
420
421            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
422
423        # Guess not: now just give up
424        #
425        except ConfigParser.NoOptionError:
426
427            GMETRIC_TARGET  = None
428
429            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
430
431    gmetric_bin = findGmetric()
432
433    if gmetric_bin:
434
435        GMETRIC_BINARY      = gmetric_bin
436    else:
437        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
438
439        try:
440
441            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
442
443        except ConfigParser.NoOptionError:
444
445            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
446            sys.exit( 1 )
447
448    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
449
450    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
451
452    try:
453
454        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
455
456    except ConfigParser.NoOptionError, detail:
457
458        if BATCH_SERVER and api_guess:
459
460            BATCH_API   = api_guess
461        else:
462            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
463            sys.exit( 1 )
464
465    try:
466
467        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
468
469    except ConfigParser.NoOptionError, detail:
470
471        QUEUE       = None
472
473    return True
474
475def fqdn_parts (fqdn):
476
477    """Return pair of host and domain for fully-qualified domain name arg."""
478
479    parts = fqdn.split (".")
480
481    return (parts[0], string.join(parts[1:], "."))
482
483METRIC_MAX_VAL_LEN = 900
484
485class DataProcessor:
486
487    """Class for processing of data"""
488
489    binary = None
490
491    def __init__( self, binary=None ):
492
493        """Remember alternate binary location if supplied"""
494
495        global GMETRIC_BINARY
496
497        if binary:
498            self.binary = binary
499
500        if not self.binary:
501            self.binary = GMETRIC_BINARY
502
503        # Timeout for XML
504        #
505        # From ganglia's documentation:
506        #
507        # 'A metric will be deleted DMAX seconds after it is received, and
508            # DMAX=0 means eternal life.'
509
510        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
511
512        if GMOND_CONF:
513
514            incompatible = self.checkGmetricVersion()
515
516            if incompatible:
517
518                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
519                sys.exit( 1 )
520
521    def checkGmetricVersion( self ):
522
523        """
524        Check version of gmetric is at least 3.0.1
525        for the syntax we use
526        """
527
528        global METRIC_MAX_VAL_LEN
529
530        incompatible    = 0
531
532        gfp     = os.popen( self.binary + ' --version' )
533        lines       = gfp.readlines()
534
535        gfp.close()
536
537        for line in lines:
538
539            line = line.split( ' ' )
540
541            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
542           
543                gmetric_version = line[1].split( '\n' )[0]
544
545                version_major   = int( gmetric_version.split( '.' )[0] )
546                version_minor   = int( gmetric_version.split( '.' )[1] )
547                version_patch   = int( gmetric_version.split( '.' )[2] )
548
549                incompatible    = 0
550
551                if version_major < 3:
552
553                    incompatible = 1
554               
555                elif version_major == 3:
556
557                    if version_minor == 0:
558
559                        if version_patch < 1:
560                       
561                            incompatible = 1
562
563                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
564                        #
565                        if version_patch < 3:
566
567                            METRIC_MAX_VAL_LEN = 900
568
569                        elif version_patch >= 3:
570
571                            METRIC_MAX_VAL_LEN = 1400
572
573        return incompatible
574
575    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
576
577        """Call gmetric binary and multicast"""
578
579        cmd = self.binary
580
581        if GMETRIC_TARGET:
582
583            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
584            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
585
586            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
587
588            debug_msg( 10, printTime() + ' ' + metric_debug)
589
590            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
591
592            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
593
594        else:
595            try:
596                cmd = cmd + ' -c' + GMOND_CONF
597
598            except NameError:
599
600                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
601
602            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
603
604            if len( units ) > 0:
605
606                cmd = cmd + ' -u"' + units + '"'
607
608            debug_msg( 10, printTime() + ' ' + cmd )
609
610            os.system( cmd )
611
612class DataGatherer:
613
614    """Skeleton class for batch system DataGatherer"""
615
616    def printJobs( self, jobs ):
617
618        """Print a jobinfo overview"""
619
620        for name, attrs in self.jobs.items():
621
622            print 'job %s' %(name)
623
624            for name, val in attrs.items():
625
626                print '\t%s = %s' %( name, val )
627
628    def printJob( self, jobs, job_id ):
629
630        """Print job with job_id from jobs"""
631
632        print 'job %s' %(job_id)
633
634        for name, val in jobs[ job_id ].items():
635
636            print '\t%s = %s' %( name, val )
637
638    def getAttr( self, d, name ):
639
640        """Return certain attribute from dictionary, if exists"""
641
642        if d.has_key( name ):
643
644            if type( d[ name ] ) == ListType:
645
646                return string.join( d[ name ], ' ' )
647
648            return d[ name ]
649       
650        return ''
651
652    def jobDataChanged( self, jobs, job_id, attrs ):
653
654        """Check if job with attrs and job_id in jobs has changed"""
655
656        if jobs.has_key( job_id ):
657
658            oldData = jobs[ job_id ]   
659        else:
660            return 1
661
662        for name, val in attrs.items():
663
664            if oldData.has_key( name ):
665
666                if oldData[ name ] != attrs[ name ]:
667
668                    return 1
669
670            else:
671                return 1
672
673        return 0
674
675    def submitJobData( self ):
676
677        """Submit job info list"""
678
679        global BATCH_API
680
681        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
682
683        running_jobs    = 0
684        queued_jobs = 0
685
686        # Count how many running/queued jobs we found
687                #
688        for jobid, jobattrs in self.jobs.items():
689
690            if jobattrs[ 'status' ] == 'Q':
691
692                queued_jobs += 1
693
694            elif jobattrs[ 'status' ] == 'R':
695
696                running_jobs += 1
697
698        # Report running/queued jobs as seperate metric for a nice RRD graph
699                #
700        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
701        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
702
703        # Report down/offline nodes in batch (PBS only ATM)
704        #
705        if BATCH_API == 'pbs':
706
707            domain      = fqdn_parts( socket.getfqdn() )[1]
708
709            downed_nodes    = list()
710            offline_nodes   = list()
711       
712            l       = ['state']
713       
714            for name, node in self.pq.getnodes().items():
715
716                if 'down' in node[ 'state' ]:
717
718                    downed_nodes.append( name )
719
720                if 'offline' in node[ 'state' ]:
721
722                    offline_nodes.append( name )
723
724            downnodeslist       = do_nodelist( downed_nodes )
725            offlinenodeslist    = do_nodelist( offline_nodes )
726
727            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
728            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
729            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
730            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
731
732        # Now let's spread the knowledge
733        #
734        for jobid, jobattrs in self.jobs.items():
735
736            # Make gmetric values for each job: respect max gmetric value length
737            #
738            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
739
740            for g_name, g_val in gmetrics.items():
741
742                self.dp.multicastGmetric( g_name, g_val )
743
744    def compileGmetricVal( self, jobid, jobattrs ):
745
746        """Create gmetric name/value pairs of jobinfo"""
747
748        gmetrics = { }
749
750        for val_name, val_value in jobattrs.items():
751
752            gmetric_sequence = 0
753
754            if len( val_value ) > METRIC_MAX_VAL_LEN:
755
756                while len( val_value ) > METRIC_MAX_VAL_LEN:
757
758                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
759                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
760
761                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
762
763                    gmetrics[ gmetric_name ] = gmetric_value
764
765                    gmetric_sequence = gmetric_sequence + 1
766            else:
767                gmetric_value   = val_value
768
769                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
770
771                gmetrics[ gmetric_name ] = gmetric_value
772
773        return gmetrics
774
775    def daemon( self ):
776
777        """Run as daemon forever"""
778
779        # Fork the first child
780        #
781        pid = os.fork()
782        if pid > 0:
783                sys.exit(0)  # end parent
784
785        # creates a session and sets the process group ID
786        #
787        os.setsid()
788
789        # Fork the second child
790        #
791        pid = os.fork()
792        if pid > 0:
793                sys.exit(0)  # end parent
794
795        write_pidfile()
796
797        # Go to the root directory and set the umask
798        #
799        os.chdir('/')
800        os.umask(0)
801
802        sys.stdin.close()
803        sys.stdout.close()
804        sys.stderr.close()
805
806        os.open('/dev/null', os.O_RDWR)
807        os.dup2(0, 1)
808        os.dup2(0, 2)
809
810        self.run()
811
812    def run( self ):
813
814        """Main thread"""
815
816        while ( 1 ):
817       
818            self.getJobData()
819            self.submitJobData()
820            time.sleep( BATCH_POLL_INTERVAL )   
821
822# Abstracted from PBS original.
823#
824def do_nodelist( nodes ):
825
826    """Translate node list as appropriate."""
827
828    nodeslist       = [ ]
829    my_domain       = fqdn_parts( socket.getfqdn() )[1]
830
831    for node in nodes:
832
833        host        = node.split( '/' )[0] # not relevant for SGE
834        h, host_domain  = fqdn_parts(host)
835
836        if host_domain == my_domain:
837
838            host    = h
839
840        if nodeslist.count( host ) == 0:
841
842            for translate_pattern in BATCH_HOST_TRANSLATE:
843
844                if translate_pattern.find( '/' ) != -1:
845
846                    translate_orig  = \
847                        translate_pattern.split( '/' )[1]
848                    translate_new   = \
849                        translate_pattern.split( '/' )[2]
850                    host = re.sub( translate_orig,
851                               translate_new, host )
852            if not host in nodeslist:
853                nodeslist.append( host )
854    return nodeslist
855
856class PbsDataGatherer( DataGatherer ):
857
858    """This is the DataGatherer for PBS and Torque"""
859
860    global PBSQuery, PBSError
861
862    def __init__( self ):
863
864        """Setup appropriate variables"""
865
866        self.jobs   = { }
867        self.timeoffset = 0
868        self.dp     = DataProcessor()
869
870        self.initPbsQuery()
871
872    def initPbsQuery( self ):
873
874        self.pq     = None
875
876        if( BATCH_SERVER ):
877
878            self.pq     = PBSQuery( BATCH_SERVER )
879        else:
880            self.pq     = PBSQuery()
881
882    def getJobData( self ):
883
884        """Gather all data on current jobs in Torque"""
885
886        joblist     = {}
887        self.cur_time   = 0
888
889        try:
890            joblist     = self.pq.getjobs()
891            self.cur_time   = time.time()
892
893        except PBSError, detail:
894
895            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
896            return None
897
898        jobs_processed  = [ ]
899
900        for name, attrs in joblist.items():
901            display_queue       = 1
902            job_id          = name.split( '.' )[0]
903
904            owner           = self.getAttr( attrs, 'Job_Owner' )
905            name            = self.getAttr( attrs, 'Job_Name' )
906            queue           = self.getAttr( attrs, 'queue' )
907            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
908
909            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
910            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
911
912
913            requested_nodes     = ''
914            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
915
916            ppn         = ''
917            attributes  = ''
918
919            if mynoderequest.find( ':' ) != -1:
920
921                mynoderequest_fields    = mynoderequest.split( ':' )
922
923                for mynoderequest_field in mynoderequest_fields:
924
925                    if mynoderequest_field.isdigit():
926
927                        continue #TODO add requested_nodes if is hostname(s)
928
929                    if mynoderequest_field.find( 'ppn' ) != -1:
930
931                        ppn = mynoderequest_field.split( 'ppn=' )[1]
932
933                    else:
934
935                        if attributes == '':
936
937                            attributes = '%s' %mynoderequest_field
938                        else:
939                            attributes = '%s:%s' %( attributes, mynoderequest_field )
940
941            status          = self.getAttr( attrs, 'job_state' )
942
943            if status in [ 'Q', 'R', 'W' ]:
944
945                jobs_processed.append( job_id )
946
947            create_timestamp    = self.getAttr( attrs, 'ctime' )
948            running_nodes       = ''
949            exec_nodestr        = '' 
950
951            if status == 'R':
952
953                #start_timestamp     = self.getAttr( attrs, 'etime' )
954                start_timestamp     = self.getAttr( attrs, 'start_time' )
955                exec_nodestr        = self.getAttr( attrs, 'exec_host' )
956
957                nodes           = exec_nodestr.split( '+' )
958                nodeslist       = do_nodelist( nodes )
959                running_nodes   = string.join( nodeslist, ' ' )
960
961                if DETECT_TIME_DIFFS:
962
963                    # If a job start if later than our current date,
964                    # that must mean the Torque server's time is later
965                    # than our local time.
966               
967                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
968
969                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
970
971            elif status == 'Q':
972
973                # 'mynodequest' can be a string in the following syntax according to the
974                # Torque Administator's manual:
975                #
976                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
977                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
978                # etc
979                #
980
981                #
982                # For now we only count the amount of nodes request and ignore properties
983                #
984
985                start_timestamp     = ''
986                count_mynodes       = 0
987
988                queued_timestamp    = self.getAttr( attrs, 'qtime' )
989
990                for node in mynoderequest.split( '+' ):
991
992                    # Just grab the {node_count|hostname} part and ignore properties
993                    #
994                    nodepart    = node.split( ':' )[0]
995
996                    # Let's assume a node_count value
997                    #
998                    numeric_node    = 1
999
1000                    # Chop the value up into characters
1001                    #
1002                    for letter in nodepart:
1003
1004                        # If this char is not a digit (0-9), this must be a hostname
1005                        #
1006                        if letter not in string.digits:
1007
1008                            numeric_node    = 0
1009
1010                    # If this is a hostname, just count this as one (1) node
1011                    #
1012                    if not numeric_node:
1013
1014                        count_mynodes   = count_mynodes + 1
1015                    else:
1016
1017                        # If this a number, it must be the node_count
1018                        # and increase our count with it's value
1019                        #
1020                        try:
1021                            count_mynodes   = count_mynodes + int( nodepart )
1022
1023                        except ValueError, detail:
1024
1025                            # When we arrive here I must be bugged or very confused
1026                            # THIS SHOULD NOT HAPPEN!
1027                            #
1028                            debug_msg( 10, str( detail ) )
1029                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1030                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1031                            debug_msg( 10, 'job = ' + str( name ) )
1032                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1033                       
1034                nodeslist   = str( count_mynodes )
1035            else:
1036                start_timestamp = ''
1037                nodeslist   = ''
1038
1039            myAttrs             = { }
1040
1041            myAttrs[ 'name' ]              = str( name )
1042            myAttrs[ 'status' ]            = str( status )
1043            myAttrs[ 'queue' ]             = str( queue )
1044            myAttrs[ 'owner' ]             = str( owner )
1045            myAttrs[ 'nodect' ]            = str( nodect )
1046            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
1047            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
1048            myAttrs[ 'req.walltime' ]      = str( requested_time )
1049            myAttrs[ 'req.memory' ]        = str( requested_memory )
1050            myAttrs[ 'req.nodes' ]         = str( requested_nodes )
1051            myAttrs[ 'req.ppn' ]           = str( ppn )
1052            myAttrs[ 'req.attributes' ]    = str( attributes )
1053            myAttrs[ 'timestamp.running' ] = str( start_timestamp )
1054            myAttrs[ 'timestamp.created' ] = str( create_timestamp )
1055            myAttrs[ 'timestamp.queued' ]  = str( queued_timestamp )
1056
1057            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q', 'W' ]:
1058
1059                self.jobs[ job_id ] = myAttrs
1060
1061        for id, attrs in self.jobs.items():
1062
1063            if id not in jobs_processed:
1064
1065                # This one isn't there anymore; toedeledoki!
1066                #
1067                del self.jobs[ id ]
1068
1069GMETRIC_DEFAULT_TYPE    = 'string'
1070GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1071GMETRIC_DEFAULT_PORT    = '8649'
1072GMETRIC_DEFAULT_UNITS   = ''
1073
1074class Gmetric:
1075
1076    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1077
1078    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1079    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1080    protocol        = ( 'udp', 'multicast' )
1081
1082    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1083               
1084        global GMETRIC_DEFAULT_TYPE
1085
1086        self.prot       = self.checkHostProtocol( host )
1087        self.data_msg   = xdrlib.Packer()
1088        self.meta_msg   = xdrlib.Packer()
1089        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1090
1091        if self.prot not in self.protocol:
1092
1093            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1094
1095        if self.prot == 'multicast':
1096
1097            # Set multicast options
1098            #
1099            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1100
1101        self.hostport   = ( host, int( port ) )
1102        self.slopestr   = 'both'
1103        self.tmax       = 60
1104
1105    def checkHostProtocol( self, ip ):
1106
1107        """Detect if a ip adress is a multicast address"""
1108
1109        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1110        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1111
1112        ip_fields               = ip.split( '.' )
1113
1114        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1115
1116            return 'multicast'
1117        else:
1118            return 'udp'
1119
1120    def send( self, name, value, dmax, typestr = '', units = '' ):
1121
1122        if len( units ) == 0:
1123            units       = GMETRIC_DEFAULT_UNITS
1124
1125        if len( typestr ) == 0:
1126            typestr     = GMETRIC_DEFAULT_TYPE
1127
1128        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
1129
1130        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1131        data_rt = self.socket.sendto( data_msg, self.hostport )
1132
1133        return ( meta_rt, data_rt )
1134
1135    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1136
1137        hostname = "unset"
1138
1139        if slopestr not in self.slope:
1140
1141            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
1142
1143        if typestr not in self.type:
1144
1145            raise ValueError( "Type must be one of: " + str( self.type ) )
1146
1147        if len( name ) == 0:
1148
1149            raise ValueError( "Name must be non-empty" )
1150
1151        self.meta_msg.reset()
1152        self.meta_msg.pack_int( 128 )
1153
1154        if not spoof:
1155            self.meta_msg.pack_string( hostname )
1156        else:
1157            self.meta_msg.pack_string( spoof )
1158
1159        self.meta_msg.pack_string( name )
1160
1161        if not spoof:
1162            self.meta_msg.pack_int( 0 )
1163        else:
1164            self.meta_msg.pack_int( 1 )
1165           
1166        self.meta_msg.pack_string( typestr )
1167        self.meta_msg.pack_string( name )
1168        self.meta_msg.pack_string( unitstr )
1169        self.meta_msg.pack_int( self.slope[ slopestr ] )
1170        self.meta_msg.pack_uint( int( tmax ) )
1171        self.meta_msg.pack_uint( int( dmax ) )
1172
1173        if not group:
1174            self.meta_msg.pack_int( 0 )
1175        else:
1176            self.meta_msg.pack_int( 1 )
1177            self.meta_msg.pack_string( "GROUP" )
1178            self.meta_msg.pack_string( group )
1179
1180        self.data_msg.reset()
1181        self.data_msg.pack_int( 128+5 )
1182
1183        if not spoof:
1184            self.data_msg.pack_string( hostname )
1185        else:
1186            self.data_msg.pack_string( spoof )
1187
1188        self.data_msg.pack_string( name )
1189
1190        if not spoof:
1191            self.data_msg.pack_int( 0 )
1192        else:
1193            self.data_msg.pack_int( 1 )
1194
1195        self.data_msg.pack_string( "%s" )
1196        self.data_msg.pack_string( str( value ) )
1197
1198        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
1199
1200def printTime( ):
1201
1202    """Print current time/date in human readable format for log/debug"""
1203
1204    return time.strftime("%a, %d %b %Y %H:%M:%S")
1205
1206def debug_msg( level, msg ):
1207
1208    """Print msg if at or above current debug level"""
1209
1210    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
1211
1212    if (not DAEMONIZE and DEBUG_LEVEL >= level):
1213        sys.stderr.write( msg + '\n' )
1214
1215    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1216        syslog.syslog( msg )
1217
1218def write_pidfile():
1219
1220    # Write pidfile if PIDFILE is set
1221    #
1222    if PIDFILE:
1223
1224        pid = os.getpid()
1225
1226        pidfile = open( PIDFILE, 'w' )
1227
1228        pidfile.write( str( pid ) )
1229        pidfile.close()
1230
1231def main():
1232
1233    """Application start"""
1234
1235    global PBSQuery, PBSError, lsfObject
1236    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
1237
1238    if not processArgs( sys.argv[1:] ):
1239
1240        sys.exit( 1 )
1241
1242    # Load appropriate DataGatherer depending on which BATCH_API is set
1243    # and any required modules for the Gatherer
1244    #
1245    if BATCH_API == 'pbs':
1246
1247        try:
1248            from PBSQuery import PBSQuery, PBSError
1249
1250        except ImportError:
1251
1252            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
1253            sys.exit( 1 )
1254
1255        gather = PbsDataGatherer()
1256
1257    elif BATCH_API == 'sge':
1258
1259        # Tested with SGE 6.0u11.
1260        #
1261        gather = SgeDataGatherer()
1262
1263    elif BATCH_API == 'lsf':
1264
1265        try:
1266            from lsfObject import lsfObject
1267        except:
1268            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
1269            sys.exit( 1)
1270
1271        gather = LsfDataGatherer()
1272
1273    else:
1274        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
1275
1276        sys.exit( 1 )
1277
1278    if( DAEMONIZE and USE_SYSLOG ):
1279
1280        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1281
1282    if DAEMONIZE:
1283
1284        gather.daemon()
1285    else:
1286        gather.run()
1287
1288# wh00t? someone started me! :)
1289#
1290if __name__ == '__main__':
1291    main()
Note: See TracBrowser for help on using the repository browser.