source: trunk/jobmond/jobmond.py @ 980

Last change on this file since 980 was 667, checked in by ramonb, 12 years ago

jobmond.py:

  • added lots of configparsing debug prints
  • fix to comment parsing when /* and */ on same line
  • ignore /* for lines starting with include: can be wildcard
  • fix to section { and } parsing: only close section when {}count hits 0
  • fix to include file parsing: should be string not list
  • include line should be split, not strip
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 35.7 KB
RevLine 
[23]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[663]5# Copyright (C) 2006-2012  Ramon Bastiaans
[225]6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[228]21# SVN $Id: jobmond.py 667 2012-09-14 12:45:50Z ramonb $
[227]22#
[23]23
[471]24import sys, getopt, ConfigParser, time, os, socket, string, re
25import xdrlib, socket, syslog, xml, xml.sax
[660]26from types import *
[666]27from glob import glob
[318]28
[663]29VERSION='TRUNK+SVN'
[307]30
[471]31def usage( ver ):
32
[659]33    print 'jobmond %s' %VERSION
[471]34
[659]35    if ver:
36        return 0
[471]37
[659]38    print
39    print 'Purpose:'
40    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
41    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
42    print
43    print 'Usage:   jobmond [OPTIONS]'
44    print
45    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
46    print '  -p, --pidfile=FILE Use pid file to store the process id'
47    print '  -h, --help     Print help and exit'
48    print '  -v, --version          Print version and exit'
49    print
[307]50
[212]51def processArgs( args ):
[26]52
[659]53    SHORT_L     = 'p:hvc:'
54    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
[165]55
[659]56    global PIDFILE
57    PIDFILE     = None
[61]58
[659]59    config_filename = '/etc/jobmond.conf'
[354]60
[659]61    try:
[68]62
[659]63        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
[185]64
[659]65    except getopt.GetoptError, detail:
[212]66
[659]67        print detail
68        usage()
69        sys.exit( 1 )
[212]70
[659]71    for opt, value in opts:
[212]72
[659]73        if opt in [ '--config', '-c' ]:
74       
75            config_filename = value
[212]76
[659]77        if opt in [ '--pidfile', '-p' ]:
[212]78
[659]79            PIDFILE     = value
80       
81        if opt in [ '--help', '-h' ]:
[307]82 
[659]83            usage( False )
84            sys.exit( 0 )
[212]85
[659]86        if opt in [ '--version', '-v' ]:
[471]87
[659]88            usage( True )
89            sys.exit( 0 )
[471]90
[659]91    return loadConfig( config_filename )
[212]92
[520]93class GangliaConfigParser:
94
[659]95    def __init__( self, config_file ):
[520]96
[666]97        self.config_file     = config_file
98        self.include_parsers = [ ]
[520]99
[659]100        if not os.path.exists( self.config_file ):
[520]101
[659]102            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
103            sys.exit( 1 )
[520]104
[666]105    def removeHaakjes( self, value ):
106
107        clean_value = value
108        clean_value = clean_value.replace( "(", "" )
109        clean_value = clean_value.replace( ")", "" )
110        clean_value = clean_value.strip()
111
112        return clean_value
113
[659]114    def removeQuotes( self, value ):
[520]115
[659]116        clean_value = value
117        clean_value = clean_value.replace( "'", "" )
118        clean_value = clean_value.replace( '"', '' )
119        clean_value = clean_value.strip()
[520]120
[659]121        return clean_value
[520]122
[665]123    def removeComments( self, value ):
124
125        clean_value = value
126
127        if clean_value.find('#') != -1:
128
129            clean_value = value[:value.find('#')]
130
131        if clean_value.find('//') != -1:
132
133            clean_value = value[:value.find('//')]
134
135        return clean_value
136
[659]137    def getVal( self, section, valname ):
[520]138
[667]139        debug_msg( 10, "Parsing: '" + self.config_file + "' - searching for section: " + section + " value: " + valname )
140
[659]141        cfg_fp      = open( self.config_file )
[665]142        cfg_lines   = cfg_fp.readlines()
143        cfg_fp.close()
[520]144
[665]145        section_start = False
146        section_found = False
[667]147        include_start = False
[665]148        value         = None
149        comment_start = False
[520]150
[667]151        acc_count     = 0
152
[665]153        for line in cfg_lines:
154
155            line = line.strip()
[667]156            debug_msg( 10, "line org: " + line )
[665]157            line = self.removeComments( line )
[667]158            debug_msg( 10, "line remove comments: " + line )
[665]159
[667]160            if line.find( '/*' ) != -1 and line.find( 'include', 0, 7 ):
[665]161
[667]162                debug_msg( 10, "line comment start */: " + line )
[665]163
[667]164                if line.find( '*/' ) != -1:
[665]165
[667]166                    begin_line = line[:line.find('/*')]
167                    rest_line = line[(line.find('*/')+2):]
168
169                    line = begin_line + rest_line
170
171                    debug_msg( 10, "line comment end */: " + line )
172
173                else:
174
175                    line = line[:line.find('/*')]
176                    comment_start = True
177
178                    debug_msg( 10, "line find /*: " + line )
179
180                debug_msg( 10, "line find /*: " + line )
181
182            if line.find( '*/' ) != -1 and comment_start:
183
184                line = line[(line.find('*/')+2):]
[665]185                comment_start = False
[667]186                debug_msg( 10, "line comment end*/: " + line )
[665]187
188            if comment_start:
189
[667]190                debug_msg( 10, "line ignoring comment: " + line )
[665]191                continue
192
[666]193            if line.find( 'include' ) != -1:
194
[667]195                debug_msg( 10, "line include: " + line )
196                line = self.removeHaakjes( line )
197                line = self.removeQuotes( line )
198                debug_msg( 10, "line removeHaakjes: " + line )
[666]199
[667]200                include_line  = line.split(' ')[1]
[666]201
[667]202                debug_msg( 10, "line includes: " + str( include_line ) )
203
[666]204                for include_file in glob( include_line ):
205
[667]206                    debug_msg( 10, "include file found: '" + include_file + "'" )
207
[666]208                    include_parser = GangliaConfigParser( include_file )
209                    include_getval = include_parser.getStr( section, valname )
210
211                    if include_getval:
212
213                        value = include_getval
214                        #break ? FIXME value can be overriden by other includes: perhaps users problem
215
[667]216                        debug_msg( 10, "VALUE found: '" + value + "'" )
217
[666]218                    del include_parser
219
[659]220            if line.find( section ) != -1:
[520]221
[659]222                section_found   = True
[520]223
[667]224                debug_msg( 10, "section start: '" + section + "'" )
225
[659]226            if line.find( '{' ) != -1 and section_found:
[520]227
[659]228                section_start   = True
[667]229                acc_count       = acc_count + 1
[520]230
[659]231            if line.find( '}' ) != -1 and section_found:
[520]232
[667]233                acc_count       = acc_count - 1
[520]234
[667]235                if acc_count == 0:
236
237                    debug_msg( 10, "section closed: '" + section + "'" )
238                    section_start   = False
239                    section_found   = False
240
[659]241            if line.find( valname ) != -1 and section_start:
[520]242
[659]243                value       = string.join( line.split( '=' )[1:], '' ).strip()
[520]244
[667]245        debug_msg( 10, "Parsing done: '" + self.config_file + "' " )
246
[659]247        return value
[520]248
[659]249    def getInt( self, section, valname ):
[520]250
[659]251        value   = self.getVal( section, valname )
[520]252
[659]253        if not value:
254            return False
[520]255
[659]256        value   = self.removeQuotes( value )
[520]257
[659]258        return int( value )
[520]259
[659]260    def getStr( self, section, valname ):
[520]261
[659]262        value   = self.getVal( section, valname )
[520]263
[659]264        if not value:
265            return False
[520]266
[659]267        value   = self.removeQuotes( value )
[520]268
[659]269        return str( value )
[520]270
271def findGmetric():
272
[659]273    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
[520]274
[659]275        guess   = '%s/%s' %( dir, 'gmetric' )
[520]276
[659]277        if os.path.exists( guess ):
[520]278
[659]279            return guess
[520]280
[659]281    return False
[520]282
[212]283def loadConfig( filename ):
284
[660]285    def getlist( cfg_string ):
[215]286
[660]287        my_list = [ ]
[215]288
[660]289        for item_txt in cfg_string.split( ',' ):
[215]290
[660]291                sep_char = None
[215]292
[660]293                item_txt = item_txt.strip()
[215]294
[660]295                for s_char in [ "'", '"' ]:
[215]296
[660]297                        if item_txt.find( s_char ) != -1:
[215]298
[660]299                                if item_txt.count( s_char ) != 2:
[215]300
[660]301                                        print 'Missing quote: %s' %item_txt
302                                        sys.exit( 1 )
[215]303
[660]304                                else:
[215]305
[660]306                                        sep_char = s_char
307                                        break
[215]308
[660]309                if sep_char:
[215]310
[660]311                        item_txt = item_txt.split( sep_char )[1]
[215]312
[660]313                my_list.append( item_txt )
[215]314
[660]315        return my_list
[215]316
[659]317    cfg     = ConfigParser.ConfigParser()
[212]318
[659]319    cfg.read( filename )
[212]320
[659]321    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
322    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
323    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
324    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
[212]325
[659]326    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[212]327
[659]328    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[212]329
[659]330    SYSLOG_LEVEL    = -1
331    SYSLOG_FACILITY = None
[377]332
[659]333    try:
334        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[212]335
[659]336    except ConfigParser.NoOptionError:
[373]337
[659]338        USE_SYSLOG  = True
[373]339
[659]340        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
[373]341
[659]342    if USE_SYSLOG:
[373]343
[659]344        try:
345            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[373]346
[659]347        except ConfigParser.NoOptionError:
[373]348
[659]349            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
350            SYSLOG_LEVEL    = 0
[373]351
[659]352        try:
[373]353
[659]354            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[373]355
[659]356        except ConfigParser.NoOptionError:
[373]357
[659]358            SYSLOG_FACILITY = syslog.LOG_DAEMON
[373]359
[659]360            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
[373]361
[659]362    try:
[373]363
[659]364        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
[212]365
[659]366    except ConfigParser.NoOptionError:
[265]367
[659]368        # Backwards compatibility for old configs
369        #
[265]370
[659]371        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
372        api_guess       = 'pbs'
373   
374    try:
375   
376        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
[265]377
[659]378    except ConfigParser.NoOptionError:
[265]379
[659]380        # Backwards compatibility for old configs
381        #
[265]382
[659]383        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
384        api_guess       = 'pbs'
385   
386    try:
[212]387
[659]388        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
[353]389
[659]390    except ConfigParser.NoOptionError:
[353]391
[661]392        # Not specified: assume /etc/ganglia/gmond.conf
[659]393        #
[661]394        GMOND_CONF      = '/etc/ganglia/gmond.conf'
[353]395
[659]396    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
[449]397
[659]398    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
399    #
400    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
[449]401
[659]402    if not gmetric_dest_ip:
[449]403
[659]404        # Maybe unicast target then
405        #
406        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
[449]407
[660]408        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
[520]409
[659]410    if gmetric_dest_ip and gmetric_dest_port:
[520]411
[659]412        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
413    else:
[520]414
[659]415        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
[520]416
[659]417        # Couldn't figure it out: let's see if it's in our jobmond.conf
418        #
419        try:
[520]420
[659]421            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
[520]422
[659]423        # Guess not: now just give up
424        #
425        except ConfigParser.NoOptionError:
[520]426
[659]427            GMETRIC_TARGET  = None
[520]428
[659]429            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
[520]430
[659]431    gmetric_bin = findGmetric()
[520]432
[659]433    if gmetric_bin:
[520]434
[659]435        GMETRIC_BINARY      = gmetric_bin
436    else:
437        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
[520]438
[659]439        try:
[520]440
[659]441            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
[520]442
[659]443        except ConfigParser.NoOptionError:
[520]444
[659]445            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
446            sys.exit( 1 )
[520]447
[659]448    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
[212]449
[659]450    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
[215]451
[659]452    try:
[256]453
[659]454        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
[266]455
[659]456    except ConfigParser.NoOptionError, detail:
[266]457
[659]458        if BATCH_SERVER and api_guess:
[354]459
[659]460            BATCH_API   = api_guess
461        else:
462            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
463            sys.exit( 1 )
[317]464
[659]465    try:
[317]466
[659]467        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
[317]468
[659]469    except ConfigParser.NoOptionError, detail:
[317]470
[659]471        QUEUE       = None
[353]472
[659]473    return True
[212]474
[507]475def fqdn_parts (fqdn):
[520]476
[659]477    """Return pair of host and domain for fully-qualified domain name arg."""
[520]478
[659]479    parts = fqdn.split (".")
[520]480
[659]481    return (parts[0], string.join(parts[1:], "."))
[507]482
[253]483METRIC_MAX_VAL_LEN = 900
484
[61]485class DataProcessor:
[355]486
[659]487    """Class for processing of data"""
[61]488
[659]489    binary = None
[61]490
[659]491    def __init__( self, binary=None ):
[355]492
[659]493        """Remember alternate binary location if supplied"""
[61]494
[659]495        global GMETRIC_BINARY
[449]496
[659]497        if binary:
498            self.binary = binary
[61]499
[659]500        if not self.binary:
501            self.binary = GMETRIC_BINARY
[449]502
[659]503        # Timeout for XML
504        #
505        # From ganglia's documentation:
506        #
507        # 'A metric will be deleted DMAX seconds after it is received, and
508            # DMAX=0 means eternal life.'
[61]509
[659]510        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
[80]511
[659]512        if GMOND_CONF:
[354]513
[659]514            incompatible = self.checkGmetricVersion()
[61]515
[659]516            if incompatible:
[355]517
[659]518                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
519                sys.exit( 1 )
[65]520
[659]521    def checkGmetricVersion( self ):
[355]522
[659]523        """
524        Check version of gmetric is at least 3.0.1
525        for the syntax we use
526        """
[65]527
[659]528        global METRIC_MAX_VAL_LEN
[255]529
[659]530        incompatible    = 0
[341]531
[659]532        gfp     = os.popen( self.binary + ' --version' )
533        lines       = gfp.readlines()
[65]534
[659]535        gfp.close()
[355]536
[659]537        for line in lines:
[355]538
[659]539            line = line.split( ' ' )
[65]540
[659]541            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
542           
543                gmetric_version = line[1].split( '\n' )[0]
[65]544
[659]545                version_major   = int( gmetric_version.split( '.' )[0] )
546                version_minor   = int( gmetric_version.split( '.' )[1] )
547                version_patch   = int( gmetric_version.split( '.' )[2] )
[65]548
[659]549                incompatible    = 0
[65]550
[659]551                if version_major < 3:
[65]552
[659]553                    incompatible = 1
554               
555                elif version_major == 3:
[65]556
[659]557                    if version_minor == 0:
[65]558
[659]559                        if version_patch < 1:
560                       
561                            incompatible = 1
[65]562
[659]563                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
564                        #
565                        if version_patch < 3:
[255]566
[659]567                            METRIC_MAX_VAL_LEN = 900
[255]568
[659]569                        elif version_patch >= 3:
[255]570
[659]571                            METRIC_MAX_VAL_LEN = 1400
[255]572
[659]573        return incompatible
[65]574
[659]575    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
[355]576
[659]577        """Call gmetric binary and multicast"""
[65]578
[659]579        cmd = self.binary
[65]580
[659]581        if GMETRIC_TARGET:
[61]582
[659]583            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
584            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
[353]585
[659]586            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
[353]587
[659]588            debug_msg( 10, printTime() + ' ' + metric_debug)
[353]589
[659]590            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
[353]591
[659]592            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
[353]593
[659]594        else:
595            try:
596                cmd = cmd + ' -c' + GMOND_CONF
[353]597
[659]598            except NameError:
[353]599
[659]600                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
[353]601
[659]602            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[353]603
[659]604            if len( units ) > 0:
[409]605
[659]606                cmd = cmd + ' -u"' + units + '"'
[409]607
[659]608            debug_msg( 10, printTime() + ' ' + cmd )
[353]609
[659]610            os.system( cmd )
[353]611
[318]612class DataGatherer:
[23]613
[659]614    """Skeleton class for batch system DataGatherer"""
[256]615
[659]616    def printJobs( self, jobs ):
[355]617
[659]618        """Print a jobinfo overview"""
[318]619
[659]620        for name, attrs in self.jobs.items():
[318]621
[659]622            print 'job %s' %(name)
[318]623
[659]624            for name, val in attrs.items():
[318]625
[659]626                print '\t%s = %s' %( name, val )
[318]627
[659]628    def printJob( self, jobs, job_id ):
[355]629
[659]630        """Print job with job_id from jobs"""
[318]631
[659]632        print 'job %s' %(job_id)
[318]633
[659]634        for name, val in jobs[ job_id ].items():
[318]635
[659]636            print '\t%s = %s' %( name, val )
[318]637
[660]638    def getAttr( self, d, name ):
[507]639
[659]640        """Return certain attribute from dictionary, if exists"""
[507]641
[660]642        if d.has_key( name ):
[507]643
[660]644            if type( d[ name ] ) == ListType:
[507]645
[660]646                return string.join( d[ name ], ' ' )
647
648            return d[ name ]
649       
650        return ''
651
[659]652    def jobDataChanged( self, jobs, job_id, attrs ):
[507]653
[659]654        """Check if job with attrs and job_id in jobs has changed"""
[507]655
[659]656        if jobs.has_key( job_id ):
[507]657
[659]658            oldData = jobs[ job_id ]   
659        else:
660            return 1
[507]661
[659]662        for name, val in attrs.items():
[507]663
[659]664            if oldData.has_key( name ):
[507]665
[659]666                if oldData[ name ] != attrs[ name ]:
[507]667
[659]668                    return 1
[507]669
[659]670            else:
671                return 1
[507]672
[659]673        return 0
[507]674
[659]675    def submitJobData( self ):
[507]676
[659]677        """Submit job info list"""
[507]678
[659]679        global BATCH_API
[512]680
[659]681        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[507]682
[659]683        running_jobs    = 0
684        queued_jobs = 0
[507]685
[659]686        # Count how many running/queued jobs we found
[507]687                #
[659]688        for jobid, jobattrs in self.jobs.items():
[507]689
[659]690            if jobattrs[ 'status' ] == 'Q':
[507]691
[659]692                queued_jobs += 1
[507]693
[659]694            elif jobattrs[ 'status' ] == 'R':
[507]695
[659]696                running_jobs += 1
[507]697
[659]698        # Report running/queued jobs as seperate metric for a nice RRD graph
[507]699                #
[659]700        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
701        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
[507]702
[659]703        # Report down/offline nodes in batch (PBS only ATM)
704        #
705        if BATCH_API == 'pbs':
[512]706
[659]707            domain      = fqdn_parts( socket.getfqdn() )[1]
[514]708
[659]709            downed_nodes    = list()
710            offline_nodes   = list()
711       
712            l       = ['state']
713       
714            for name, node in self.pq.getnodes().items():
[512]715
[660]716                if 'down' in node[ 'state' ]:
[512]717
[659]718                    downed_nodes.append( name )
[512]719
[660]720                if 'offline' in node[ 'state' ]:
[512]721
[659]722                    offline_nodes.append( name )
[512]723
[659]724            downnodeslist       = do_nodelist( downed_nodes )
725            offlinenodeslist    = do_nodelist( offline_nodes )
[512]726
[659]727            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
728            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
729            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
730            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
[514]731
[659]732        # Now let's spread the knowledge
733        #
734        for jobid, jobattrs in self.jobs.items():
[507]735
[659]736            # Make gmetric values for each job: respect max gmetric value length
737            #
[660]738            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
[507]739
[660]740            for g_name, g_val in gmetrics.items():
[507]741
[660]742                self.dp.multicastGmetric( g_name, g_val )
[507]743
[659]744    def compileGmetricVal( self, jobid, jobattrs ):
[507]745
[660]746        """Create gmetric name/value pairs of jobinfo"""
[507]747
[660]748        gmetrics = { }
[507]749
[659]750        for val_name, val_value in jobattrs.items():
[507]751
[660]752            gmetric_sequence = 0
[507]753
[660]754            if len( val_value ) > METRIC_MAX_VAL_LEN:
[507]755
[660]756                while len( val_value ) > METRIC_MAX_VAL_LEN:
[507]757
[660]758                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
759                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
[507]760
[660]761                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
[507]762
[660]763                    gmetrics[ gmetric_name ] = gmetric_value
[507]764
[660]765                    gmetric_sequence = gmetric_sequence + 1
766            else:
767                gmetric_value   = val_value
[507]768
[660]769                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
[507]770
[660]771                gmetrics[ gmetric_name ] = gmetric_value
[507]772
[660]773        return gmetrics
[507]774
[660]775    def daemon( self ):
[507]776
[660]777        """Run as daemon forever"""
[507]778
[660]779        # Fork the first child
780        #
781        pid = os.fork()
782        if pid > 0:
783                sys.exit(0)  # end parent
[507]784
[660]785        # creates a session and sets the process group ID
786        #
787        os.setsid()
[507]788
[660]789        # Fork the second child
790        #
791        pid = os.fork()
792        if pid > 0:
793                sys.exit(0)  # end parent
[507]794
[659]795        write_pidfile()
[318]796
[660]797        # Go to the root directory and set the umask
798        #
799        os.chdir('/')
800        os.umask(0)
[318]801
[660]802        sys.stdin.close()
803        sys.stdout.close()
804        sys.stderr.close()
[318]805
[660]806        os.open('/dev/null', os.O_RDWR)
807        os.dup2(0, 1)
808        os.dup2(0, 2)
[318]809
[660]810        self.run()
[318]811
[660]812    def run( self ):
[355]813
[660]814        """Main thread"""
[256]815
[660]816        while ( 1 ):
[659]817       
818            self.getJobData()
819            self.submitJobData()
820            time.sleep( BATCH_POLL_INTERVAL )   
[256]821
[507]822# Abstracted from PBS original.
[520]823#
824def do_nodelist( nodes ):
825
[659]826    """Translate node list as appropriate."""
[520]827
[659]828    nodeslist       = [ ]
829    my_domain       = fqdn_parts( socket.getfqdn() )[1]
[520]830
[659]831    for node in nodes:
[520]832
[659]833        host        = node.split( '/' )[0] # not relevant for SGE
834        h, host_domain  = fqdn_parts(host)
[520]835
[659]836        if host_domain == my_domain:
[520]837
[659]838            host    = h
[520]839
[659]840        if nodeslist.count( host ) == 0:
[520]841
[659]842            for translate_pattern in BATCH_HOST_TRANSLATE:
[520]843
[659]844                if translate_pattern.find( '/' ) != -1:
[520]845
[659]846                    translate_orig  = \
847                        translate_pattern.split( '/' )[1]
848                    translate_new   = \
849                        translate_pattern.split( '/' )[2]
850                    host = re.sub( translate_orig,
851                               translate_new, host )
852            if not host in nodeslist:
853                nodeslist.append( host )
854    return nodeslist
[318]855
[355]856class PbsDataGatherer( DataGatherer ):
[318]857
[659]858    """This is the DataGatherer for PBS and Torque"""
[318]859
[659]860    global PBSQuery, PBSError
[256]861
[659]862    def __init__( self ):
[354]863
[659]864        """Setup appropriate variables"""
[23]865
[659]866        self.jobs   = { }
867        self.timeoffset = 0
868        self.dp     = DataProcessor()
[354]869
[659]870        self.initPbsQuery()
[23]871
[659]872    def initPbsQuery( self ):
[91]873
[659]874        self.pq     = None
[354]875
[659]876        if( BATCH_SERVER ):
[354]877
[659]878            self.pq     = PBSQuery( BATCH_SERVER )
879        else:
880            self.pq     = PBSQuery()
[91]881
[659]882    def getJobData( self ):
[354]883
[659]884        """Gather all data on current jobs in Torque"""
[26]885
[659]886        joblist     = {}
887        self.cur_time   = 0
[349]888
[659]889        try:
890            joblist     = self.pq.getjobs()
891            self.cur_time   = time.time()
[354]892
[659]893        except PBSError, detail:
[354]894
[659]895            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
896            return None
[354]897
[659]898        jobs_processed  = [ ]
[26]899
[659]900        for name, attrs in joblist.items():
901            display_queue       = 1
902            job_id          = name.split( '.' )[0]
[26]903
[661]904            owner           = self.getAttr( attrs, 'Job_Owner' )
[659]905            name            = self.getAttr( attrs, 'Job_Name' )
906            queue           = self.getAttr( attrs, 'queue' )
[662]907            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
[317]908
[660]909            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
910            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
[95]911
[661]912
913            requested_nodes     = ''
[660]914            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
[95]915
[659]916            ppn         = ''
[661]917            attributes  = ''
[281]918
[661]919            if mynoderequest.find( ':' ) != -1:
[95]920
[659]921                mynoderequest_fields    = mynoderequest.split( ':' )
[281]922
[659]923                for mynoderequest_field in mynoderequest_fields:
[281]924
[661]925                    if mynoderequest_field.isdigit():
926
927                        continue #TODO add requested_nodes if is hostname(s)
928
[659]929                    if mynoderequest_field.find( 'ppn' ) != -1:
[281]930
[659]931                        ppn = mynoderequest_field.split( 'ppn=' )[1]
[281]932
[661]933                    else:
934
935                        if attributes == '':
936
937                            attributes = '%s' %mynoderequest_field
938                        else:
939                            attributes = '%s:%s' %( attributes, mynoderequest_field )
940
[659]941            status          = self.getAttr( attrs, 'job_state' )
[25]942
[661]943            if status in [ 'Q', 'R', 'W' ]:
[450]944
[659]945                jobs_processed.append( job_id )
[450]946
[661]947            create_timestamp    = self.getAttr( attrs, 'ctime' )
948            running_nodes       = ''
949            exec_nodestr        = '' 
[243]950
[659]951            if status == 'R':
[133]952
[661]953                #start_timestamp     = self.getAttr( attrs, 'etime' )
954                start_timestamp     = self.getAttr( attrs, 'start_time' )
955                exec_nodestr        = self.getAttr( attrs, 'exec_host' )
[133]956
[661]957                nodes           = exec_nodestr.split( '+' )
[659]958                nodeslist       = do_nodelist( nodes )
[661]959                running_nodes   = string.join( nodeslist, ' ' )
[354]960
[659]961                if DETECT_TIME_DIFFS:
[185]962
[659]963                    # If a job start if later than our current date,
964                    # that must mean the Torque server's time is later
965                    # than our local time.
966               
967                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
[185]968
[659]969                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[185]970
[659]971            elif status == 'Q':
[95]972
[659]973                # 'mynodequest' can be a string in the following syntax according to the
974                # Torque Administator's manual:
975                #
976                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
977                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
978                # etc
979                #
[451]980
[659]981                #
982                # For now we only count the amount of nodes request and ignore properties
983                #
[451]984
[659]985                start_timestamp     = ''
986                count_mynodes       = 0
[354]987
[661]988                queued_timestamp    = self.getAttr( attrs, 'qtime' )
989
[659]990                for node in mynoderequest.split( '+' ):
[67]991
[659]992                    # Just grab the {node_count|hostname} part and ignore properties
993                    #
994                    nodepart    = node.split( ':' )[0]
[67]995
[659]996                    # Let's assume a node_count value
997                    #
998                    numeric_node    = 1
[451]999
[659]1000                    # Chop the value up into characters
1001                    #
1002                    for letter in nodepart:
[67]1003
[659]1004                        # If this char is not a digit (0-9), this must be a hostname
1005                        #
1006                        if letter not in string.digits:
[133]1007
[659]1008                            numeric_node    = 0
[133]1009
[659]1010                    # If this is a hostname, just count this as one (1) node
1011                    #
1012                    if not numeric_node:
[354]1013
[659]1014                        count_mynodes   = count_mynodes + 1
1015                    else:
[451]1016
[659]1017                        # If this a number, it must be the node_count
1018                        # and increase our count with it's value
1019                        #
1020                        try:
1021                            count_mynodes   = count_mynodes + int( nodepart )
[354]1022
[659]1023                        except ValueError, detail:
[354]1024
[659]1025                            # When we arrive here I must be bugged or very confused
1026                            # THIS SHOULD NOT HAPPEN!
1027                            #
1028                            debug_msg( 10, str( detail ) )
1029                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1030                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1031                            debug_msg( 10, 'job = ' + str( name ) )
1032                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1033                       
1034                nodeslist   = str( count_mynodes )
1035            else:
1036                start_timestamp = ''
1037                nodeslist   = ''
[133]1038
[659]1039            myAttrs             = { }
[26]1040
[661]1041            myAttrs[ 'name' ]              = str( name )
1042            myAttrs[ 'status' ]            = str( status )
1043            myAttrs[ 'queue' ]             = str( queue )
1044            myAttrs[ 'owner' ]             = str( owner )
1045            myAttrs[ 'nodect' ]            = str( nodect )
1046            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
1047            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
1048            myAttrs[ 'req.walltime' ]      = str( requested_time )
1049            myAttrs[ 'req.memory' ]        = str( requested_memory )
1050            myAttrs[ 'req.nodes' ]         = str( requested_nodes )
1051            myAttrs[ 'req.ppn' ]           = str( ppn )
1052            myAttrs[ 'req.attributes' ]    = str( attributes )
1053            myAttrs[ 'timestamp.running' ] = str( start_timestamp )
1054            myAttrs[ 'timestamp.created' ] = str( create_timestamp )
1055            myAttrs[ 'timestamp.queued' ]  = str( queued_timestamp )
[354]1056
[661]1057            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q', 'W' ]:
[61]1058
[659]1059                self.jobs[ job_id ] = myAttrs
[26]1060
[659]1061        for id, attrs in self.jobs.items():
[76]1062
[659]1063            if id not in jobs_processed:
[76]1064
[659]1065                # This one isn't there anymore; toedeledoki!
1066                #
1067                del self.jobs[ id ]
[76]1068
[362]1069GMETRIC_DEFAULT_TYPE    = 'string'
1070GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1071GMETRIC_DEFAULT_PORT    = '8649'
[659]1072GMETRIC_DEFAULT_UNITS   = ''
[362]1073
1074class Gmetric:
1075
[659]1076    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
[362]1077
[659]1078    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1079    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1080    protocol        = ( 'udp', 'multicast' )
[362]1081
[659]1082    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
[362]1083               
[659]1084        global GMETRIC_DEFAULT_TYPE
[362]1085
[659]1086        self.prot       = self.checkHostProtocol( host )
[664]1087        self.data_msg   = xdrlib.Packer()
1088        self.meta_msg   = xdrlib.Packer()
[659]1089        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
[362]1090
[659]1091        if self.prot not in self.protocol:
[362]1092
[659]1093            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
[362]1094
[659]1095        if self.prot == 'multicast':
[362]1096
[659]1097            # Set multicast options
1098            #
1099            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
[362]1100
[659]1101        self.hostport   = ( host, int( port ) )
1102        self.slopestr   = 'both'
1103        self.tmax       = 60
[362]1104
[659]1105    def checkHostProtocol( self, ip ):
[362]1106
[659]1107        """Detect if a ip adress is a multicast address"""
[471]1108
[659]1109        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1110        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
[362]1111
[659]1112        ip_fields               = ip.split( '.' )
[362]1113
[659]1114        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
[362]1115
[659]1116            return 'multicast'
1117        else:
1118            return 'udp'
[362]1119
[659]1120    def send( self, name, value, dmax, typestr = '', units = '' ):
[362]1121
[659]1122        if len( units ) == 0:
1123            units       = GMETRIC_DEFAULT_UNITS
[471]1124
[659]1125        if len( typestr ) == 0:
1126            typestr     = GMETRIC_DEFAULT_TYPE
[362]1127
[664]1128        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
[409]1129
[664]1130        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1131        data_rt = self.socket.sendto( data_msg, self.hostport )
[362]1132
[664]1133        return ( meta_rt, data_rt )
[362]1134
[664]1135    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1136
1137        hostname = "unset"
1138
[659]1139        if slopestr not in self.slope:
[362]1140
[659]1141            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
[362]1142
[659]1143        if typestr not in self.type:
[362]1144
[659]1145            raise ValueError( "Type must be one of: " + str( self.type ) )
[362]1146
[659]1147        if len( name ) == 0:
[362]1148
[659]1149            raise ValueError( "Name must be non-empty" )
[362]1150
[664]1151        self.meta_msg.reset()
1152        self.meta_msg.pack_int( 128 )
[362]1153
[664]1154        if not spoof:
1155            self.meta_msg.pack_string( hostname )
1156        else:
1157            self.meta_msg.pack_string( spoof )
[362]1158
[664]1159        self.meta_msg.pack_string( name )
1160
1161        if not spoof:
1162            self.meta_msg.pack_int( 0 )
1163        else:
1164            self.meta_msg.pack_int( 1 )
1165           
1166        self.meta_msg.pack_string( typestr )
1167        self.meta_msg.pack_string( name )
1168        self.meta_msg.pack_string( unitstr )
1169        self.meta_msg.pack_int( self.slope[ slopestr ] )
1170        self.meta_msg.pack_uint( int( tmax ) )
1171        self.meta_msg.pack_uint( int( dmax ) )
1172
1173        if not group:
1174            self.meta_msg.pack_int( 0 )
1175        else:
1176            self.meta_msg.pack_int( 1 )
1177            self.meta_msg.pack_string( "GROUP" )
1178            self.meta_msg.pack_string( group )
1179
1180        self.data_msg.reset()
1181        self.data_msg.pack_int( 128+5 )
1182
1183        if not spoof:
1184            self.data_msg.pack_string( hostname )
1185        else:
1186            self.data_msg.pack_string( spoof )
1187
1188        self.data_msg.pack_string( name )
1189
1190        if not spoof:
1191            self.data_msg.pack_int( 0 )
1192        else:
1193            self.data_msg.pack_int( 1 )
1194
1195        self.data_msg.pack_string( "%s" )
1196        self.data_msg.pack_string( str( value ) )
1197
1198        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
1199
[26]1200def printTime( ):
[354]1201
[659]1202    """Print current time/date in human readable format for log/debug"""
[26]1203
[659]1204    return time.strftime("%a, %d %b %Y %H:%M:%S")
[26]1205
1206def debug_msg( level, msg ):
[354]1207
[659]1208    """Print msg if at or above current debug level"""
[26]1209
[659]1210    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
[377]1211
[660]1212    if (not DAEMONIZE and DEBUG_LEVEL >= level):
[659]1213        sys.stderr.write( msg + '\n' )
[26]1214
[659]1215    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1216        syslog.syslog( msg )
[373]1217
[307]1218def write_pidfile():
1219
[659]1220    # Write pidfile if PIDFILE is set
1221    #
1222    if PIDFILE:
[307]1223
[659]1224        pid = os.getpid()
[354]1225
[659]1226        pidfile = open( PIDFILE, 'w' )
[354]1227
[659]1228        pidfile.write( str( pid ) )
1229        pidfile.close()
[307]1230
[23]1231def main():
[354]1232
[659]1233    """Application start"""
[23]1234
[659]1235    global PBSQuery, PBSError, lsfObject
1236    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
[256]1237
[659]1238    if not processArgs( sys.argv[1:] ):
[354]1239
[659]1240        sys.exit( 1 )
[212]1241
[659]1242    # Load appropriate DataGatherer depending on which BATCH_API is set
1243    # and any required modules for the Gatherer
1244    #
1245    if BATCH_API == 'pbs':
[256]1246
[659]1247        try:
1248            from PBSQuery import PBSQuery, PBSError
[256]1249
[659]1250        except ImportError:
[256]1251
[659]1252            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
1253            sys.exit( 1 )
[256]1254
[659]1255        gather = PbsDataGatherer()
[256]1256
[659]1257    elif BATCH_API == 'sge':
[256]1258
[659]1259        # Tested with SGE 6.0u11.
1260        #
1261        gather = SgeDataGatherer()
[368]1262
[659]1263    elif BATCH_API == 'lsf':
[368]1264
[659]1265        try:
1266            from lsfObject import lsfObject
1267        except:
1268            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
1269            sys.exit( 1)
[256]1270
[659]1271        gather = LsfDataGatherer()
[524]1272
[659]1273    else:
1274        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
[354]1275
[659]1276        sys.exit( 1 )
[256]1277
[659]1278    if( DAEMONIZE and USE_SYSLOG ):
[373]1279
[659]1280        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[373]1281
[659]1282    if DAEMONIZE:
[354]1283
[659]1284        gather.daemon()
1285    else:
1286        gather.run()
[23]1287
[256]1288# wh00t? someone started me! :)
[65]1289#
[23]1290if __name__ == '__main__':
[659]1291    main()
Note: See TracBrowser for help on using the repository browser.