source: trunk/jobmond/jobmond.py @ 665

Last change on this file since 665 was 665, checked in by ramonb, 12 years ago
  • handle comments in gmond.conf
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 33.4 KB
RevLine 
[23]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[663]5# Copyright (C) 2006-2012  Ramon Bastiaans
[225]6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[228]21# SVN $Id: jobmond.py 665 2012-09-04 13:02:29Z ramonb $
[227]22#
[23]23
[471]24import sys, getopt, ConfigParser, time, os, socket, string, re
25import xdrlib, socket, syslog, xml, xml.sax
[660]26from types import *
[318]27
[663]28VERSION='TRUNK+SVN'
[307]29
[471]30def usage( ver ):
31
[659]32    print 'jobmond %s' %VERSION
[471]33
[659]34    if ver:
35        return 0
[471]36
[659]37    print
38    print 'Purpose:'
39    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
40    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
41    print
42    print 'Usage:   jobmond [OPTIONS]'
43    print
44    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
45    print '  -p, --pidfile=FILE Use pid file to store the process id'
46    print '  -h, --help     Print help and exit'
47    print '  -v, --version          Print version and exit'
48    print
[307]49
[212]50def processArgs( args ):
[26]51
[659]52    SHORT_L     = 'p:hvc:'
53    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
[165]54
[659]55    global PIDFILE
56    PIDFILE     = None
[61]57
[659]58    config_filename = '/etc/jobmond.conf'
[354]59
[659]60    try:
[68]61
[659]62        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
[185]63
[659]64    except getopt.GetoptError, detail:
[212]65
[659]66        print detail
67        usage()
68        sys.exit( 1 )
[212]69
[659]70    for opt, value in opts:
[212]71
[659]72        if opt in [ '--config', '-c' ]:
73       
74            config_filename = value
[212]75
[659]76        if opt in [ '--pidfile', '-p' ]:
[212]77
[659]78            PIDFILE     = value
79       
80        if opt in [ '--help', '-h' ]:
[307]81 
[659]82            usage( False )
83            sys.exit( 0 )
[212]84
[659]85        if opt in [ '--version', '-v' ]:
[471]86
[659]87            usage( True )
88            sys.exit( 0 )
[471]89
[659]90    return loadConfig( config_filename )
[212]91
[622]92# Fixme:  This doesn't DTRT with commented-out bits of the file.  E.g.
93# it picked up a commented-out `mcast_join' and tried to use a
94# multicast channel when it shouldn't have done.
[520]95class GangliaConfigParser:
96
[659]97    def __init__( self, config_file ):
[520]98
[659]99        self.config_file    = config_file
[520]100
[659]101        if not os.path.exists( self.config_file ):
[520]102
[659]103            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
104            sys.exit( 1 )
[520]105
[659]106    def removeQuotes( self, value ):
[520]107
[659]108        clean_value = value
109        clean_value = clean_value.replace( "'", "" )
110        clean_value = clean_value.replace( '"', '' )
111        clean_value = clean_value.strip()
[520]112
[659]113        return clean_value
[520]114
[665]115    def removeComments( self, value ):
116
117        clean_value = value
118
119        if clean_value.find('#') != -1:
120
121            clean_value = value[:value.find('#')]
122
123        if clean_value.find('//') != -1:
124
125            clean_value = value[:value.find('//')]
126
127        return clean_value
128
[659]129    def getVal( self, section, valname ):
[520]130
[659]131        cfg_fp      = open( self.config_file )
[665]132        cfg_lines   = cfg_fp.readlines()
133        cfg_fp.close()
[520]134
[665]135        section_start = False
136        section_found = False
137        value         = None
138        comment_start = False
[520]139
[665]140        for line in cfg_lines:
141
142            line = line.strip()
143            line = self.removeComments( line )
144
145            if line.find( '/*' ) != -1:
146
147                line = line[:line.find('/*')]
148                comment_start = True
149
150            if line.find( '*/' ) != -1:
151
152                line = line[line.find('*/'):]
153                comment_start = False
154
155            if comment_start:
156
157                continue
158
[659]159            if line.find( section ) != -1:
[520]160
[659]161                section_found   = True
[520]162
[659]163            if line.find( '{' ) != -1 and section_found:
[520]164
[659]165                section_start   = True
[520]166
[659]167            if line.find( '}' ) != -1 and section_found:
[520]168
[659]169                section_start   = False
170                section_found   = False
[520]171
[659]172            if line.find( valname ) != -1 and section_start:
[520]173
[659]174                value       = string.join( line.split( '=' )[1:], '' ).strip()
[520]175
[659]176        return value
[520]177
[659]178    def getInt( self, section, valname ):
[520]179
[659]180        value   = self.getVal( section, valname )
[520]181
[659]182        if not value:
183            return False
[520]184
[659]185        value   = self.removeQuotes( value )
[520]186
[659]187        return int( value )
[520]188
[659]189    def getStr( self, section, valname ):
[520]190
[659]191        value   = self.getVal( section, valname )
[520]192
[659]193        if not value:
194            return False
[520]195
[659]196        value   = self.removeQuotes( value )
[520]197
[659]198        return str( value )
[520]199
200def findGmetric():
201
[659]202    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
[520]203
[659]204        guess   = '%s/%s' %( dir, 'gmetric' )
[520]205
[659]206        if os.path.exists( guess ):
[520]207
[659]208            return guess
[520]209
[659]210    return False
[520]211
[212]212def loadConfig( filename ):
213
[660]214    def getlist( cfg_string ):
[215]215
[660]216        my_list = [ ]
[215]217
[660]218        for item_txt in cfg_string.split( ',' ):
[215]219
[660]220                sep_char = None
[215]221
[660]222                item_txt = item_txt.strip()
[215]223
[660]224                for s_char in [ "'", '"' ]:
[215]225
[660]226                        if item_txt.find( s_char ) != -1:
[215]227
[660]228                                if item_txt.count( s_char ) != 2:
[215]229
[660]230                                        print 'Missing quote: %s' %item_txt
231                                        sys.exit( 1 )
[215]232
[660]233                                else:
[215]234
[660]235                                        sep_char = s_char
236                                        break
[215]237
[660]238                if sep_char:
[215]239
[660]240                        item_txt = item_txt.split( sep_char )[1]
[215]241
[660]242                my_list.append( item_txt )
[215]243
[660]244        return my_list
[215]245
[659]246    cfg     = ConfigParser.ConfigParser()
[212]247
[659]248    cfg.read( filename )
[212]249
[659]250    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
251    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
252    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
253    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
[212]254
[659]255    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[212]256
[659]257    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[212]258
[659]259    SYSLOG_LEVEL    = -1
260    SYSLOG_FACILITY = None
[377]261
[659]262    try:
263        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[212]264
[659]265    except ConfigParser.NoOptionError:
[373]266
[659]267        USE_SYSLOG  = True
[373]268
[659]269        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
[373]270
[659]271    if USE_SYSLOG:
[373]272
[659]273        try:
274            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[373]275
[659]276        except ConfigParser.NoOptionError:
[373]277
[659]278            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
279            SYSLOG_LEVEL    = 0
[373]280
[659]281        try:
[373]282
[659]283            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[373]284
[659]285        except ConfigParser.NoOptionError:
[373]286
[659]287            SYSLOG_FACILITY = syslog.LOG_DAEMON
[373]288
[659]289            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
[373]290
[659]291    try:
[373]292
[659]293        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
[212]294
[659]295    except ConfigParser.NoOptionError:
[265]296
[659]297        # Backwards compatibility for old configs
298        #
[265]299
[659]300        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
301        api_guess       = 'pbs'
302   
303    try:
304   
305        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
[265]306
[659]307    except ConfigParser.NoOptionError:
[265]308
[659]309        # Backwards compatibility for old configs
310        #
[265]311
[659]312        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
313        api_guess       = 'pbs'
314   
315    try:
[212]316
[659]317        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
[353]318
[659]319    except ConfigParser.NoOptionError:
[353]320
[661]321        # Not specified: assume /etc/ganglia/gmond.conf
[659]322        #
[661]323        GMOND_CONF      = '/etc/ganglia/gmond.conf'
[353]324
[659]325    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
[449]326
[659]327    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
328    #
329    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
[449]330
[659]331    if not gmetric_dest_ip:
[449]332
[659]333        # Maybe unicast target then
334        #
335        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
[449]336
[660]337        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
[520]338
[659]339    if gmetric_dest_ip and gmetric_dest_port:
[520]340
[659]341        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
342    else:
[520]343
[659]344        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
[520]345
[659]346        # Couldn't figure it out: let's see if it's in our jobmond.conf
347        #
348        try:
[520]349
[659]350            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
[520]351
[659]352        # Guess not: now just give up
353        #
354        except ConfigParser.NoOptionError:
[520]355
[659]356            GMETRIC_TARGET  = None
[520]357
[659]358            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
[520]359
[659]360    gmetric_bin = findGmetric()
[520]361
[659]362    if gmetric_bin:
[520]363
[659]364        GMETRIC_BINARY      = gmetric_bin
365    else:
366        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
[520]367
[659]368        try:
[520]369
[659]370            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
[520]371
[659]372        except ConfigParser.NoOptionError:
[520]373
[659]374            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
375            sys.exit( 1 )
[520]376
[659]377    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
[212]378
[659]379    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
[215]380
[659]381    try:
[256]382
[659]383        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
[266]384
[659]385    except ConfigParser.NoOptionError, detail:
[266]386
[659]387        if BATCH_SERVER and api_guess:
[354]388
[659]389            BATCH_API   = api_guess
390        else:
391            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
392            sys.exit( 1 )
[317]393
[659]394    try:
[317]395
[659]396        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
[317]397
[659]398    except ConfigParser.NoOptionError, detail:
[317]399
[659]400        QUEUE       = None
[353]401
[659]402    return True
[212]403
[507]404def fqdn_parts (fqdn):
[520]405
[659]406    """Return pair of host and domain for fully-qualified domain name arg."""
[520]407
[659]408    parts = fqdn.split (".")
[520]409
[659]410    return (parts[0], string.join(parts[1:], "."))
[507]411
[253]412METRIC_MAX_VAL_LEN = 900
413
[61]414class DataProcessor:
[355]415
[659]416    """Class for processing of data"""
[61]417
[659]418    binary = None
[61]419
[659]420    def __init__( self, binary=None ):
[355]421
[659]422        """Remember alternate binary location if supplied"""
[61]423
[659]424        global GMETRIC_BINARY
[449]425
[659]426        if binary:
427            self.binary = binary
[61]428
[659]429        if not self.binary:
430            self.binary = GMETRIC_BINARY
[449]431
[659]432        # Timeout for XML
433        #
434        # From ganglia's documentation:
435        #
436        # 'A metric will be deleted DMAX seconds after it is received, and
437            # DMAX=0 means eternal life.'
[61]438
[659]439        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
[80]440
[659]441        if GMOND_CONF:
[354]442
[659]443            incompatible = self.checkGmetricVersion()
[61]444
[659]445            if incompatible:
[355]446
[659]447                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
448                sys.exit( 1 )
[65]449
[659]450    def checkGmetricVersion( self ):
[355]451
[659]452        """
453        Check version of gmetric is at least 3.0.1
454        for the syntax we use
455        """
[65]456
[659]457        global METRIC_MAX_VAL_LEN
[255]458
[659]459        incompatible    = 0
[341]460
[659]461        gfp     = os.popen( self.binary + ' --version' )
462        lines       = gfp.readlines()
[65]463
[659]464        gfp.close()
[355]465
[659]466        for line in lines:
[355]467
[659]468            line = line.split( ' ' )
[65]469
[659]470            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
471           
472                gmetric_version = line[1].split( '\n' )[0]
[65]473
[659]474                version_major   = int( gmetric_version.split( '.' )[0] )
475                version_minor   = int( gmetric_version.split( '.' )[1] )
476                version_patch   = int( gmetric_version.split( '.' )[2] )
[65]477
[659]478                incompatible    = 0
[65]479
[659]480                if version_major < 3:
[65]481
[659]482                    incompatible = 1
483               
484                elif version_major == 3:
[65]485
[659]486                    if version_minor == 0:
[65]487
[659]488                        if version_patch < 1:
489                       
490                            incompatible = 1
[65]491
[659]492                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
493                        #
494                        if version_patch < 3:
[255]495
[659]496                            METRIC_MAX_VAL_LEN = 900
[255]497
[659]498                        elif version_patch >= 3:
[255]499
[659]500                            METRIC_MAX_VAL_LEN = 1400
[255]501
[659]502        return incompatible
[65]503
[659]504    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
[355]505
[659]506        """Call gmetric binary and multicast"""
[65]507
[659]508        cmd = self.binary
[65]509
[659]510        if GMETRIC_TARGET:
[61]511
[659]512            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
513            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
[353]514
[659]515            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
[353]516
[659]517            debug_msg( 10, printTime() + ' ' + metric_debug)
[353]518
[659]519            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
[353]520
[659]521            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
[353]522
[659]523        else:
524            try:
525                cmd = cmd + ' -c' + GMOND_CONF
[353]526
[659]527            except NameError:
[353]528
[659]529                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
[353]530
[659]531            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[353]532
[659]533            if len( units ) > 0:
[409]534
[659]535                cmd = cmd + ' -u"' + units + '"'
[409]536
[659]537            debug_msg( 10, printTime() + ' ' + cmd )
[353]538
[659]539            os.system( cmd )
[353]540
[318]541class DataGatherer:
[23]542
[659]543    """Skeleton class for batch system DataGatherer"""
[256]544
[659]545    def printJobs( self, jobs ):
[355]546
[659]547        """Print a jobinfo overview"""
[318]548
[659]549        for name, attrs in self.jobs.items():
[318]550
[659]551            print 'job %s' %(name)
[318]552
[659]553            for name, val in attrs.items():
[318]554
[659]555                print '\t%s = %s' %( name, val )
[318]556
[659]557    def printJob( self, jobs, job_id ):
[355]558
[659]559        """Print job with job_id from jobs"""
[318]560
[659]561        print 'job %s' %(job_id)
[318]562
[659]563        for name, val in jobs[ job_id ].items():
[318]564
[659]565            print '\t%s = %s' %( name, val )
[318]566
[660]567    def getAttr( self, d, name ):
[507]568
[659]569        """Return certain attribute from dictionary, if exists"""
[507]570
[660]571        if d.has_key( name ):
[507]572
[660]573            if type( d[ name ] ) == ListType:
[507]574
[660]575                return string.join( d[ name ], ' ' )
576
577            return d[ name ]
578       
579        return ''
580
[659]581    def jobDataChanged( self, jobs, job_id, attrs ):
[507]582
[659]583        """Check if job with attrs and job_id in jobs has changed"""
[507]584
[659]585        if jobs.has_key( job_id ):
[507]586
[659]587            oldData = jobs[ job_id ]   
588        else:
589            return 1
[507]590
[659]591        for name, val in attrs.items():
[507]592
[659]593            if oldData.has_key( name ):
[507]594
[659]595                if oldData[ name ] != attrs[ name ]:
[507]596
[659]597                    return 1
[507]598
[659]599            else:
600                return 1
[507]601
[659]602        return 0
[507]603
[659]604    def submitJobData( self ):
[507]605
[659]606        """Submit job info list"""
[507]607
[659]608        global BATCH_API
[512]609
[659]610        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[507]611
[659]612        running_jobs    = 0
613        queued_jobs = 0
[507]614
[659]615        # Count how many running/queued jobs we found
[507]616                #
[659]617        for jobid, jobattrs in self.jobs.items():
[507]618
[659]619            if jobattrs[ 'status' ] == 'Q':
[507]620
[659]621                queued_jobs += 1
[507]622
[659]623            elif jobattrs[ 'status' ] == 'R':
[507]624
[659]625                running_jobs += 1
[507]626
[659]627        # Report running/queued jobs as seperate metric for a nice RRD graph
[507]628                #
[659]629        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
630        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
[507]631
[659]632        # Report down/offline nodes in batch (PBS only ATM)
633        #
634        if BATCH_API == 'pbs':
[512]635
[659]636            domain      = fqdn_parts( socket.getfqdn() )[1]
[514]637
[659]638            downed_nodes    = list()
639            offline_nodes   = list()
640       
641            l       = ['state']
642       
643            for name, node in self.pq.getnodes().items():
[512]644
[660]645                if 'down' in node[ 'state' ]:
[512]646
[659]647                    downed_nodes.append( name )
[512]648
[660]649                if 'offline' in node[ 'state' ]:
[512]650
[659]651                    offline_nodes.append( name )
[512]652
[659]653            downnodeslist       = do_nodelist( downed_nodes )
654            offlinenodeslist    = do_nodelist( offline_nodes )
[512]655
[659]656            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
657            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
658            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
659            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
[514]660
[659]661        # Now let's spread the knowledge
662        #
663        for jobid, jobattrs in self.jobs.items():
[507]664
[659]665            # Make gmetric values for each job: respect max gmetric value length
666            #
[660]667            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
[507]668
[660]669            for g_name, g_val in gmetrics.items():
[507]670
[660]671                self.dp.multicastGmetric( g_name, g_val )
[507]672
[659]673    def compileGmetricVal( self, jobid, jobattrs ):
[507]674
[660]675        """Create gmetric name/value pairs of jobinfo"""
[507]676
[660]677        gmetrics = { }
[507]678
[659]679        for val_name, val_value in jobattrs.items():
[507]680
[660]681            gmetric_sequence = 0
[507]682
[660]683            if len( val_value ) > METRIC_MAX_VAL_LEN:
[507]684
[660]685                while len( val_value ) > METRIC_MAX_VAL_LEN:
[507]686
[660]687                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
688                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
[507]689
[660]690                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
[507]691
[660]692                    gmetrics[ gmetric_name ] = gmetric_value
[507]693
[660]694                    gmetric_sequence = gmetric_sequence + 1
695            else:
696                gmetric_value   = val_value
[507]697
[660]698                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
[507]699
[660]700                gmetrics[ gmetric_name ] = gmetric_value
[507]701
[660]702        return gmetrics
[507]703
[660]704    def daemon( self ):
[507]705
[660]706        """Run as daemon forever"""
[507]707
[660]708        # Fork the first child
709        #
710        pid = os.fork()
711        if pid > 0:
712                sys.exit(0)  # end parent
[507]713
[660]714        # creates a session and sets the process group ID
715        #
716        os.setsid()
[507]717
[660]718        # Fork the second child
719        #
720        pid = os.fork()
721        if pid > 0:
722                sys.exit(0)  # end parent
[507]723
[659]724        write_pidfile()
[318]725
[660]726        # Go to the root directory and set the umask
727        #
728        os.chdir('/')
729        os.umask(0)
[318]730
[660]731        sys.stdin.close()
732        sys.stdout.close()
733        sys.stderr.close()
[318]734
[660]735        os.open('/dev/null', os.O_RDWR)
736        os.dup2(0, 1)
737        os.dup2(0, 2)
[318]738
[660]739        self.run()
[318]740
[660]741    def run( self ):
[355]742
[660]743        """Main thread"""
[256]744
[660]745        while ( 1 ):
[659]746       
747            self.getJobData()
748            self.submitJobData()
749            time.sleep( BATCH_POLL_INTERVAL )   
[256]750
[507]751# Abstracted from PBS original.
[520]752#
753def do_nodelist( nodes ):
754
[659]755    """Translate node list as appropriate."""
[520]756
[659]757    nodeslist       = [ ]
758    my_domain       = fqdn_parts( socket.getfqdn() )[1]
[520]759
[659]760    for node in nodes:
[520]761
[659]762        host        = node.split( '/' )[0] # not relevant for SGE
763        h, host_domain  = fqdn_parts(host)
[520]764
[659]765        if host_domain == my_domain:
[520]766
[659]767            host    = h
[520]768
[659]769        if nodeslist.count( host ) == 0:
[520]770
[659]771            for translate_pattern in BATCH_HOST_TRANSLATE:
[520]772
[659]773                if translate_pattern.find( '/' ) != -1:
[520]774
[659]775                    translate_orig  = \
776                        translate_pattern.split( '/' )[1]
777                    translate_new   = \
778                        translate_pattern.split( '/' )[2]
779                    host = re.sub( translate_orig,
780                               translate_new, host )
781            if not host in nodeslist:
782                nodeslist.append( host )
783    return nodeslist
[318]784
[355]785class PbsDataGatherer( DataGatherer ):
[318]786
[659]787    """This is the DataGatherer for PBS and Torque"""
[318]788
[659]789    global PBSQuery, PBSError
[256]790
[659]791    def __init__( self ):
[354]792
[659]793        """Setup appropriate variables"""
[23]794
[659]795        self.jobs   = { }
796        self.timeoffset = 0
797        self.dp     = DataProcessor()
[354]798
[659]799        self.initPbsQuery()
[23]800
[659]801    def initPbsQuery( self ):
[91]802
[659]803        self.pq     = None
[354]804
[659]805        if( BATCH_SERVER ):
[354]806
[659]807            self.pq     = PBSQuery( BATCH_SERVER )
808        else:
809            self.pq     = PBSQuery()
[91]810
[659]811    def getJobData( self ):
[354]812
[659]813        """Gather all data on current jobs in Torque"""
[26]814
[659]815        joblist     = {}
816        self.cur_time   = 0
[349]817
[659]818        try:
819            joblist     = self.pq.getjobs()
820            self.cur_time   = time.time()
[354]821
[659]822        except PBSError, detail:
[354]823
[659]824            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
825            return None
[354]826
[659]827        jobs_processed  = [ ]
[26]828
[659]829        for name, attrs in joblist.items():
830            display_queue       = 1
831            job_id          = name.split( '.' )[0]
[26]832
[661]833            owner           = self.getAttr( attrs, 'Job_Owner' )
[659]834            name            = self.getAttr( attrs, 'Job_Name' )
835            queue           = self.getAttr( attrs, 'queue' )
[662]836            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
[317]837
[660]838            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
839            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
[95]840
[661]841
842            requested_nodes     = ''
[660]843            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
[95]844
[659]845            ppn         = ''
[661]846            attributes  = ''
[281]847
[661]848            if mynoderequest.find( ':' ) != -1:
[95]849
[659]850                mynoderequest_fields    = mynoderequest.split( ':' )
[281]851
[659]852                for mynoderequest_field in mynoderequest_fields:
[281]853
[661]854                    if mynoderequest_field.isdigit():
855
856                        continue #TODO add requested_nodes if is hostname(s)
857
[659]858                    if mynoderequest_field.find( 'ppn' ) != -1:
[281]859
[659]860                        ppn = mynoderequest_field.split( 'ppn=' )[1]
[281]861
[661]862                    else:
863
864                        if attributes == '':
865
866                            attributes = '%s' %mynoderequest_field
867                        else:
868                            attributes = '%s:%s' %( attributes, mynoderequest_field )
869
[659]870            status          = self.getAttr( attrs, 'job_state' )
[25]871
[661]872            if status in [ 'Q', 'R', 'W' ]:
[450]873
[659]874                jobs_processed.append( job_id )
[450]875
[661]876            create_timestamp    = self.getAttr( attrs, 'ctime' )
877            running_nodes       = ''
878            exec_nodestr        = '' 
[243]879
[659]880            if status == 'R':
[133]881
[661]882                #start_timestamp     = self.getAttr( attrs, 'etime' )
883                start_timestamp     = self.getAttr( attrs, 'start_time' )
884                exec_nodestr        = self.getAttr( attrs, 'exec_host' )
[133]885
[661]886                nodes           = exec_nodestr.split( '+' )
[659]887                nodeslist       = do_nodelist( nodes )
[661]888                running_nodes   = string.join( nodeslist, ' ' )
[354]889
[659]890                if DETECT_TIME_DIFFS:
[185]891
[659]892                    # If a job start if later than our current date,
893                    # that must mean the Torque server's time is later
894                    # than our local time.
895               
896                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
[185]897
[659]898                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[185]899
[659]900            elif status == 'Q':
[95]901
[659]902                # 'mynodequest' can be a string in the following syntax according to the
903                # Torque Administator's manual:
904                #
905                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
906                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
907                # etc
908                #
[451]909
[659]910                #
911                # For now we only count the amount of nodes request and ignore properties
912                #
[451]913
[659]914                start_timestamp     = ''
915                count_mynodes       = 0
[354]916
[661]917                queued_timestamp    = self.getAttr( attrs, 'qtime' )
918
[659]919                for node in mynoderequest.split( '+' ):
[67]920
[659]921                    # Just grab the {node_count|hostname} part and ignore properties
922                    #
923                    nodepart    = node.split( ':' )[0]
[67]924
[659]925                    # Let's assume a node_count value
926                    #
927                    numeric_node    = 1
[451]928
[659]929                    # Chop the value up into characters
930                    #
931                    for letter in nodepart:
[67]932
[659]933                        # If this char is not a digit (0-9), this must be a hostname
934                        #
935                        if letter not in string.digits:
[133]936
[659]937                            numeric_node    = 0
[133]938
[659]939                    # If this is a hostname, just count this as one (1) node
940                    #
941                    if not numeric_node:
[354]942
[659]943                        count_mynodes   = count_mynodes + 1
944                    else:
[451]945
[659]946                        # If this a number, it must be the node_count
947                        # and increase our count with it's value
948                        #
949                        try:
950                            count_mynodes   = count_mynodes + int( nodepart )
[354]951
[659]952                        except ValueError, detail:
[354]953
[659]954                            # When we arrive here I must be bugged or very confused
955                            # THIS SHOULD NOT HAPPEN!
956                            #
957                            debug_msg( 10, str( detail ) )
958                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
959                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
960                            debug_msg( 10, 'job = ' + str( name ) )
961                            debug_msg( 10, 'attrs = ' + str( attrs ) )
962                       
963                nodeslist   = str( count_mynodes )
964            else:
965                start_timestamp = ''
966                nodeslist   = ''
[133]967
[659]968            myAttrs             = { }
[26]969
[661]970            myAttrs[ 'name' ]              = str( name )
971            myAttrs[ 'status' ]            = str( status )
972            myAttrs[ 'queue' ]             = str( queue )
973            myAttrs[ 'owner' ]             = str( owner )
974            myAttrs[ 'nodect' ]            = str( nodect )
975            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
976            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
977            myAttrs[ 'req.walltime' ]      = str( requested_time )
978            myAttrs[ 'req.memory' ]        = str( requested_memory )
979            myAttrs[ 'req.nodes' ]         = str( requested_nodes )
980            myAttrs[ 'req.ppn' ]           = str( ppn )
981            myAttrs[ 'req.attributes' ]    = str( attributes )
982            myAttrs[ 'timestamp.running' ] = str( start_timestamp )
983            myAttrs[ 'timestamp.created' ] = str( create_timestamp )
984            myAttrs[ 'timestamp.queued' ]  = str( queued_timestamp )
[354]985
[661]986            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q', 'W' ]:
[61]987
[659]988                self.jobs[ job_id ] = myAttrs
[26]989
[659]990        for id, attrs in self.jobs.items():
[76]991
[659]992            if id not in jobs_processed:
[76]993
[659]994                # This one isn't there anymore; toedeledoki!
995                #
996                del self.jobs[ id ]
[76]997
[362]998GMETRIC_DEFAULT_TYPE    = 'string'
999GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1000GMETRIC_DEFAULT_PORT    = '8649'
[659]1001GMETRIC_DEFAULT_UNITS   = ''
[362]1002
1003class Gmetric:
1004
[659]1005    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
[362]1006
[659]1007    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1008    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1009    protocol        = ( 'udp', 'multicast' )
[362]1010
[659]1011    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
[362]1012               
[659]1013        global GMETRIC_DEFAULT_TYPE
[362]1014
[659]1015        self.prot       = self.checkHostProtocol( host )
[664]1016        self.data_msg   = xdrlib.Packer()
1017        self.meta_msg   = xdrlib.Packer()
[659]1018        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
[362]1019
[659]1020        if self.prot not in self.protocol:
[362]1021
[659]1022            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
[362]1023
[659]1024        if self.prot == 'multicast':
[362]1025
[659]1026            # Set multicast options
1027            #
1028            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
[362]1029
[659]1030        self.hostport   = ( host, int( port ) )
1031        self.slopestr   = 'both'
1032        self.tmax       = 60
[362]1033
[659]1034    def checkHostProtocol( self, ip ):
[362]1035
[659]1036        """Detect if a ip adress is a multicast address"""
[471]1037
[659]1038        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1039        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
[362]1040
[659]1041        ip_fields               = ip.split( '.' )
[362]1042
[659]1043        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
[362]1044
[659]1045            return 'multicast'
1046        else:
1047            return 'udp'
[362]1048
[659]1049    def send( self, name, value, dmax, typestr = '', units = '' ):
[362]1050
[659]1051        if len( units ) == 0:
1052            units       = GMETRIC_DEFAULT_UNITS
[471]1053
[659]1054        if len( typestr ) == 0:
1055            typestr     = GMETRIC_DEFAULT_TYPE
[362]1056
[664]1057        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
[409]1058
[664]1059        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1060        data_rt = self.socket.sendto( data_msg, self.hostport )
[362]1061
[664]1062        return ( meta_rt, data_rt )
[362]1063
[664]1064    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1065
1066        hostname = "unset"
1067
[659]1068        if slopestr not in self.slope:
[362]1069
[659]1070            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
[362]1071
[659]1072        if typestr not in self.type:
[362]1073
[659]1074            raise ValueError( "Type must be one of: " + str( self.type ) )
[362]1075
[659]1076        if len( name ) == 0:
[362]1077
[659]1078            raise ValueError( "Name must be non-empty" )
[362]1079
[664]1080        self.meta_msg.reset()
1081        self.meta_msg.pack_int( 128 )
[362]1082
[664]1083        if not spoof:
1084            self.meta_msg.pack_string( hostname )
1085        else:
1086            self.meta_msg.pack_string( spoof )
[362]1087
[664]1088        self.meta_msg.pack_string( name )
1089
1090        if not spoof:
1091            self.meta_msg.pack_int( 0 )
1092        else:
1093            self.meta_msg.pack_int( 1 )
1094           
1095        self.meta_msg.pack_string( typestr )
1096        self.meta_msg.pack_string( name )
1097        self.meta_msg.pack_string( unitstr )
1098        self.meta_msg.pack_int( self.slope[ slopestr ] )
1099        self.meta_msg.pack_uint( int( tmax ) )
1100        self.meta_msg.pack_uint( int( dmax ) )
1101
1102        if not group:
1103            self.meta_msg.pack_int( 0 )
1104        else:
1105            self.meta_msg.pack_int( 1 )
1106            self.meta_msg.pack_string( "GROUP" )
1107            self.meta_msg.pack_string( group )
1108
1109        self.data_msg.reset()
1110        self.data_msg.pack_int( 128+5 )
1111
1112        if not spoof:
1113            self.data_msg.pack_string( hostname )
1114        else:
1115            self.data_msg.pack_string( spoof )
1116
1117        self.data_msg.pack_string( name )
1118
1119        if not spoof:
1120            self.data_msg.pack_int( 0 )
1121        else:
1122            self.data_msg.pack_int( 1 )
1123
1124        self.data_msg.pack_string( "%s" )
1125        self.data_msg.pack_string( str( value ) )
1126
1127        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
1128
[26]1129def printTime( ):
[354]1130
[659]1131    """Print current time/date in human readable format for log/debug"""
[26]1132
[659]1133    return time.strftime("%a, %d %b %Y %H:%M:%S")
[26]1134
1135def debug_msg( level, msg ):
[354]1136
[659]1137    """Print msg if at or above current debug level"""
[26]1138
[659]1139    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
[377]1140
[660]1141    if (not DAEMONIZE and DEBUG_LEVEL >= level):
[659]1142        sys.stderr.write( msg + '\n' )
[26]1143
[659]1144    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1145        syslog.syslog( msg )
[373]1146
[307]1147def write_pidfile():
1148
[659]1149    # Write pidfile if PIDFILE is set
1150    #
1151    if PIDFILE:
[307]1152
[659]1153        pid = os.getpid()
[354]1154
[659]1155        pidfile = open( PIDFILE, 'w' )
[354]1156
[659]1157        pidfile.write( str( pid ) )
1158        pidfile.close()
[307]1159
[23]1160def main():
[354]1161
[659]1162    """Application start"""
[23]1163
[659]1164    global PBSQuery, PBSError, lsfObject
1165    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
[256]1166
[659]1167    if not processArgs( sys.argv[1:] ):
[354]1168
[659]1169        sys.exit( 1 )
[212]1170
[659]1171    # Load appropriate DataGatherer depending on which BATCH_API is set
1172    # and any required modules for the Gatherer
1173    #
1174    if BATCH_API == 'pbs':
[256]1175
[659]1176        try:
1177            from PBSQuery import PBSQuery, PBSError
[256]1178
[659]1179        except ImportError:
[256]1180
[659]1181            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
1182            sys.exit( 1 )
[256]1183
[659]1184        gather = PbsDataGatherer()
[256]1185
[659]1186    elif BATCH_API == 'sge':
[256]1187
[659]1188        # Tested with SGE 6.0u11.
1189        #
1190        gather = SgeDataGatherer()
[368]1191
[659]1192    elif BATCH_API == 'lsf':
[368]1193
[659]1194        try:
1195            from lsfObject import lsfObject
1196        except:
1197            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
1198            sys.exit( 1)
[256]1199
[659]1200        gather = LsfDataGatherer()
[524]1201
[659]1202    else:
1203        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
[354]1204
[659]1205        sys.exit( 1 )
[256]1206
[659]1207    if( DAEMONIZE and USE_SYSLOG ):
[373]1208
[659]1209        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[373]1210
[659]1211    if DAEMONIZE:
[354]1212
[659]1213        gather.daemon()
1214    else:
1215        gather.run()
[23]1216
[256]1217# wh00t? someone started me! :)
[65]1218#
[23]1219if __name__ == '__main__':
[659]1220    main()
Note: See TracBrowser for help on using the repository browser.