source: branches/1.0/jobmond/jobmond.py

Last change on this file was 913, checked in by olahaye, 9 years ago

[rpm&deb packaging] Now fixes the VERSION outside current directory (can be SVN)
This avoids .in files and let generate tarballs and packages (binary and sources) without any VERSION values.
make deb or rpm or install even from svn is now safe from "sed -i -e"

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 64.6 KB
RevLine 
[23]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[691]5# Copyright (C) 2006-2013  Ramon Bastiaans
[623]6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
[225]7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
[228]22# SVN $Id: jobmond.py 913 2013-05-22 17:00:22Z olahaye $
[227]23#
[23]24
[694]25# vi :set ts=4
26
[471]27import sys, getopt, ConfigParser, time, os, socket, string, re
[837]28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
[318]29from xml.sax.handler import feature_namespaces
[623]30from collections import deque
[699]31from glob import glob
[318]32
[913]33VERSION='__VERSION__'
[307]34
[471]35def usage( ver ):
36
[691]37    print 'jobmond %s' %VERSION
[471]38
[691]39    if ver:
40        return 0
[471]41
[691]42    print
43    print 'Purpose:'
44    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
45    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
46    print
47    print 'Usage:    jobmond [OPTIONS]'
48    print
49    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
50    print '  -p, --pidfile=FILE    Use pid file to store the process id'
51    print '  -h, --help        Print help and exit'
52    print '  -v, --version      Print version and exit'
53    print
[307]54
[212]55def processArgs( args ):
[26]56
[701]57    SHORT_L      = 'p:hvc:'
58    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
[165]59
[704]60    global PIDFILE, JOBMOND_CONF
[701]61    PIDFILE      = None
[61]62
[701]63    JOBMOND_CONF = '/etc/jobmond.conf'
[354]64
[691]65    try:
[68]66
[691]67        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
[185]68
[691]69    except getopt.GetoptError, detail:
[212]70
[691]71        print detail
[851]72        usage( False )
[691]73        sys.exit( 1 )
[212]74
[691]75    for opt, value in opts:
[212]76
[691]77        if opt in [ '--config', '-c' ]:
78       
[701]79            JOBMOND_CONF = value
[212]80
[691]81        if opt in [ '--pidfile', '-p' ]:
[212]82
[701]83            PIDFILE      = value
[691]84       
85        if opt in [ '--help', '-h' ]:
[307]86 
[691]87            usage( False )
88            sys.exit( 0 )
[212]89
[691]90        if opt in [ '--version', '-v' ]:
[471]91
[691]92            usage( True )
93            sys.exit( 0 )
[471]94
[701]95    return loadConfig( JOBMOND_CONF )
[212]96
[520]97class GangliaConfigParser:
98
[699]99    def __init__( self, filename ):
[520]100
[699]101        self.conf_lijst   = [ ]
102        self.conf_dict    = { }
103        self.filename     = filename
104        self.file_pointer = file( filename, 'r' )
105        self.lexx         = shlex.shlex( self.file_pointer )
106        self.lexx.whitespace_split = True
[520]107
[699]108        self.parse()
[520]109
[699]110    def __del__( self ):
[520]111
[699]112        """
113        Cleanup: close file descriptor
114        """
115
116        self.file_pointer.close()
117        del self.lexx
118        del self.conf_lijst
119
[691]120    def removeQuotes( self, value ):
[520]121
[699]122        clean_value = value
123        clean_value = clean_value.replace( "'", "" )
124        clean_value = clean_value.replace( '"', '' )
125        clean_value = clean_value.strip()
[520]126
[691]127        return clean_value
[520]128
[699]129    def removeBraces( self, value ):
[520]130
[699]131        clean_value = value
132        clean_value = clean_value.replace( "(", "" )
133        clean_value = clean_value.replace( ')', '' )
134        clean_value = clean_value.strip()
[520]135
[699]136        return clean_value
[520]137
[699]138    def parse( self ):
[520]139
[699]140        """
141        Parse self.filename using shlex scanning.
142        - Removes /* comments */
143        - Traverses (recursively) through all include () statements
144        - Stores complete valid config tokens in self.conf_list
[520]145
[699]146        i.e.:
147            ['globals',
148             '{',
149             'daemonize',
150             '=',
151             'yes',
152             'setuid',
153             '=',
154             'yes',
155             'user',
156             '=',
157             'ganglia',
158             'debug_level',
159             '=',
160             '0',
161             <etc> ]
162        """
[520]163
[699]164        t = 'bogus'
165        c = False
166        i = False
[520]167
[699]168        while t != self.lexx.eof:
169            #print 'get token'
170            t = self.lexx.get_token()
[520]171
[699]172            if len( t ) >= 2:
[520]173
[699]174                if len( t ) >= 4:
[520]175
[699]176                    if t[:2] == '/*' and t[-2:] == '*/':
[520]177
[699]178                        #print 'comment line'
179                        #print 'skipping: %s' %t
180                        continue
[520]181
[699]182                if t == '/*' or t[:2] == '/*':
183                    c = True
184                    #print 'comment start'
185                    #print 'skipping: %s' %t
186                    continue
[520]187
[699]188                if t == '*/' or t[-2:] == '*/':
189                    c = False
190                    #print 'skipping: %s' %t
191                    #print 'comment end'
192                    continue
193
194            if c:
195                #print 'skipping: %s' %t
196                continue
197
198            if t == 'include':
199                i = True
200                #print 'include start'
201                #print 'skipping: %s' %t
202                continue
203
204            if i:
205
206                #print 'include start: %s' %t
207
208                t2 = self.removeQuotes( t )
209                t2 = self.removeBraces( t )
210
211                for in_file in glob( self.removeQuotes(t2) ):
212
213                    #print 'including file: %s' %in_file
214                    parse_infile = GangliaConfigParser( in_file )
215
216                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
217
218                    del parse_infile
219
220                i = False
221                #print 'include end'
222                #print 'skipping: %s' %t
223                continue
224
225            #print 'keep: %s' %t
226            self.conf_lijst.append( self.removeQuotes(t) )
227
228    def getConfLijst( self ):
229
230        return self.conf_lijst
231
232    def confListToDict( self, parent_list=None ):
233
234        """
235        Recursively traverses a conf_list and creates dictionary from it
236        """
237
238        new_dict = { }
239        count    = 0
240        skip     = 0
241
242        if not parent_list:
243            parent_list = self.conf_lijst
244
245        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
246
247        for n, c in enumerate( parent_list ):
248
249            count = count + 1
250
251            #print 'CL: n %d c %s' %(n, c)
252
253            if skip > 0:
254
255                #print '- skipped'
256                skip = skip - 1
257                continue
258
259            if (n+1) <= (len( parent_list )-1):
260
261                if parent_list[(n+1)] == '{':
262
263                    if not new_dict.has_key( c ):
264                        new_dict[ c ] = [ ]
265
266                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
267                    new_dict[ c ].append( temp_new_dict )
268
269                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
270
271                    if not new_dict.has_key( c ):
272                        new_dict[ c ] = [ ]
273
274                    new_dict[ c ].append( parent_list[ (n+2) ] )
275
276                    skip = 2
277
278                if parent_list[n] == '}':
279
280                    #print 'leaving confListToDict(): new dict = %s' %new_dict
281                    return (new_dict, count)
282
283    def makeConfDict( self ):
284
285        """
286        Walks through self.conf_list and creates a dictionary based upon config values
287
288        i.e.:
289            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
290                                                         'ip': ['"127.0.0.1"'],
291                                                         'mask': ['32']}]}],
292                                    'port': ['8649']}],
293            'udp_recv_channel': [{'port': ['8649']}],
294            'udp_send_channel': [{'host': ['145.101.32.3'],
295                                  'port': ['8649']},
296                                 {'host': ['145.101.32.207'],
297                                  'port': ['8649']}]}
298        """
299
300        new_dict = { }
301        skip     = 0
302
303        #print 'entering makeConfDict()'
304
305        for n, c in enumerate( self.conf_lijst ):
306
307            #print 'M: n %d c %s' %(n, c)
308
309            if skip > 0:
310
311                #print '- skipped'
312                skip = skip - 1
313                continue
314
315            if (n+1) <= (len( self.conf_lijst )-1):
316
317                if self.conf_lijst[(n+1)] == '{':
318
319                    if not new_dict.has_key( c ):
320                        new_dict[ c ] = [ ]
321
322                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
323                    new_dict[ c ].append( temp_new_dict )
324
325                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
326
327                    if not new_dict.has_key( c ):
328                        new_dict[ c ] = [ ]
329
330                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
331
332                    skip = 2
333
334        self.conf_dict = new_dict
335        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
336
337    def checkConfDict( self ):
338
339        if len( self.conf_lijst ) == 0:
340
341            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
342
343        if len( self.conf_dict ) == 0:
344
345            self.makeConfDict()
346
347    def getConfDict( self ):
348
349        self.checkConfDict()
350        return self.conf_dict
351
352    def getUdpSendChannels( self ):
353
354        self.checkConfDict()
355
[707]356        udp_send_channels = [ ] # IP:PORT
357
358        if not self.conf_dict.has_key( 'udp_send_channel' ):
359            return None
360
361        for u in self.conf_dict[ 'udp_send_channel' ]:
362
363            if u.has_key( 'mcast_join' ):
364
365                ip = u['mcast_join'][0]
366
367            elif u.has_key( 'host' ):
368
369                ip = u['host'][0]
370
371            port = u['port'][0]
372
373            udp_send_channels.append( ( ip, port ) )
374
375        if len( udp_send_channels ) == 0:
376            return None
377
378        return udp_send_channels
379
[699]380    def getSectionLastOption( self, section, option ):
381
382        """
383        Get last option set in a config section that could be set multiple times in multiple (include) files.
384
385        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
386        """
387
388        self.checkConfDict()
389        value = None
390
391        if not self.conf_dict.has_key( section ):
392
393            return None
394
395        # Could be set multiple times in multiple (include) files: get last one set
396        for c in self.conf_dict[ section ]:
397
398                if c.has_key( option ):
399
[705]400                    value = c[ option ][0]
[699]401
[705]402        return value
[699]403
404    def getClusterName( self ):
405
406        return self.getSectionLastOption( 'cluster', 'name' )
407
408    def getVal( self, section, option ):
409
410        return self.getSectionLastOption( section, option )
411
[691]412    def getInt( self, section, valname ):
[520]413
[691]414        value    = self.getVal( section, valname )
[520]415
[691]416        if not value:
[699]417            return None
[520]418
[691]419        return int( value )
[520]420
[691]421    def getStr( self, section, valname ):
[520]422
[691]423        value    = self.getVal( section, valname )
[520]424
[691]425        if not value:
[699]426            return None
[520]427
[691]428        return str( value )
[520]429
430def findGmetric():
431
[691]432    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
[520]433
[691]434        guess    = '%s/%s' %( dir, 'gmetric' )
[520]435
[691]436        if os.path.exists( guess ):
[520]437
[691]438            return guess
[520]439
[691]440    return False
[520]441
[212]442def loadConfig( filename ):
443
[691]444    def getlist( cfg_string ):
[215]445
[691]446        my_list = [ ]
[215]447
[691]448        for item_txt in cfg_string.split( ',' ):
[215]449
[691]450            sep_char = None
[215]451
[691]452            item_txt = item_txt.strip()
[215]453
[691]454            for s_char in [ "'", '"' ]:
[215]455
[691]456                if item_txt.find( s_char ) != -1:
[215]457
[691]458                    if item_txt.count( s_char ) != 2:
[215]459
[691]460                        print 'Missing quote: %s' %item_txt
461                        sys.exit( 1 )
[215]462
[691]463                    else:
[215]464
[691]465                        sep_char = s_char
466                        break
[215]467
[691]468            if sep_char:
[215]469
[691]470                item_txt = item_txt.split( sep_char )[1]
[215]471
[691]472            my_list.append( item_txt )
[215]473
[691]474        return my_list
[215]475
[784]476    if not os.path.isfile( JOBMOND_CONF ):
477
478        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
479        sys.exit( 1 )
480
481    try:
482        f = open( JOBMOND_CONF, 'r' )
483    except IOError, detail:
484        print "Cannot read config file: '%s'" %JOBMOND_CONF
485        sys.exit( 1 )
486    else:
487        f.close()
488
[691]489    cfg        = ConfigParser.ConfigParser()
[212]490
[691]491    cfg.read( filename )
[212]492
[691]493    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
494    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
495    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
496    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
[707]497    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
[212]498
[701]499    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[212]500
[701]501    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[212]502
[691]503    SYSLOG_LEVEL    = -1
[701]504    SYSLOG_FACILITY = None
[377]505
[691]506    try:
[701]507        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[212]508
[691]509    except ConfigParser.NoOptionError:
[373]510
[701]511        USE_SYSLOG  = True
[373]512
[691]513        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
[373]514
[691]515    if USE_SYSLOG:
[373]516
[691]517        try:
[701]518            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[373]519
[691]520        except ConfigParser.NoOptionError:
[373]521
[691]522            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
[701]523            SYSLOG_LEVEL = 0
[373]524
[691]525        try:
[373]526
[691]527            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[373]528
[691]529        except ConfigParser.NoOptionError:
[373]530
[691]531            SYSLOG_FACILITY = syslog.LOG_DAEMON
[373]532
[691]533            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
[373]534
[691]535    try:
[373]536
[701]537        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
[212]538
[691]539    except ConfigParser.NoOptionError:
[265]540
[864]541        # Not required for all API's: only pbs api allows remote connections
542        BATCH_SERVER = None
[265]543
[691]544    try:
545   
[701]546        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
[265]547
[691]548    except ConfigParser.NoOptionError:
[265]549
[691]550        # Backwards compatibility for old configs
551        #
[265]552
[701]553        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
554        api_guess           = 'pbs'
[691]555   
556    try:
[212]557
[701]558        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
[353]559
[691]560    except ConfigParser.NoOptionError:
[353]561
[703]562        # Not specified: assume /etc/ganglia/gmond.conf
[691]563        #
[703]564        GMOND_CONF          = '/etc/ganglia/gmond.conf'
[353]565
[707]566    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
567    GMETRIC_TARGET          = None
[449]568
[707]569    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
[449]570
[707]571    if not GMOND_UDP_SEND_CHANNELS:
[449]572
[701]573        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
[520]574
[691]575        # Couldn't figure it out: let's see if it's in our jobmond.conf
576        #
577        try:
[520]578
[691]579            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
[520]580
[691]581        # Guess not: now just give up
[701]582       
[691]583        except ConfigParser.NoOptionError:
[520]584
[691]585            GMETRIC_TARGET    = None
[520]586
[691]587            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
[520]588
[701]589            gmetric_bin    = findGmetric()
[520]590
[701]591            if gmetric_bin:
[520]592
[701]593                GMETRIC_BINARY     = gmetric_bin
594            else:
595                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
[520]596
[701]597                try:
[520]598
[701]599                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
[520]600
[701]601                except ConfigParser.NoOptionError:
[520]602
[786]603                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
[701]604                    sys.exit( 1 )
[520]605
[701]606    #TODO: is this really still needed or should be automatic
[691]607    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
[212]608
[701]609    BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
[215]610
[691]611    try:
[256]612
[691]613        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
[266]614
[691]615    except ConfigParser.NoOptionError, detail:
[266]616
[864]617        print "FATAL ERROR: BATCH_API not set"
618        sys.exit( 1 )
[354]619
[691]620    try:
[317]621
[691]622        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
[317]623
[691]624    except ConfigParser.NoOptionError, detail:
[317]625
[691]626        QUEUE        = None
[353]627
[701]628    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
629
[691]630    return True
[212]631
[507]632def fqdn_parts (fqdn):
[520]633
[691]634    """Return pair of host and domain for fully-qualified domain name arg."""
[520]635
[691]636    parts = fqdn.split (".")
[520]637
[691]638    return (parts[0], string.join(parts[1:], "."))
[507]639
[61]640class DataProcessor:
[355]641
[691]642    """Class for processing of data"""
[61]643
[691]644    binary = None
[61]645
[691]646    def __init__( self, binary=None ):
[355]647
[691]648        """Remember alternate binary location if supplied"""
[61]649
[691]650        global GMETRIC_BINARY, GMOND_CONF
[449]651
[691]652        if binary:
653            self.binary = binary
[61]654
[707]655        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[691]656            self.binary = GMETRIC_BINARY
[449]657
[691]658        # Timeout for XML
659        #
660        # From ganglia's documentation:
661        #
662        # 'A metric will be deleted DMAX seconds after it is received, and
663        # DMAX=0 means eternal life.'
[61]664
[691]665        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
[80]666
[707]667        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[354]668
[691]669            incompatible = self.checkGmetricVersion()
[61]670
[691]671            if incompatible:
[355]672
[903]673                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
[691]674                sys.exit( 1 )
[65]675
[691]676    def checkGmetricVersion( self ):
[355]677
[691]678        """
[903]679        Check version of gmetric is at least 3.3.8
[691]680        for the syntax we use
681        """
[65]682
[691]683        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
[255]684
[691]685        incompatible    = 0
[341]686
[691]687        gfp        = os.popen( self.binary + ' --version' )
[692]688        lines      = gfp.readlines()
[65]689
[691]690        gfp.close()
[355]691
[691]692        for line in lines:
[355]693
[691]694            line = line.split( ' ' )
[65]695
[691]696            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
697           
698                gmetric_version    = line[1].split( '\n' )[0]
[65]699
[691]700                version_major    = int( gmetric_version.split( '.' )[0] )
701                version_minor    = int( gmetric_version.split( '.' )[1] )
702                version_patch    = int( gmetric_version.split( '.' )[2] )
[65]703
[691]704                incompatible    = 0
[65]705
[691]706                if version_major < 3:
[65]707
[691]708                    incompatible = 1
709               
710                elif version_major == 3:
[65]711
[903]712                    if version_minor < 3:
[65]713
[692]714                        incompatible = 1
[65]715
[903]716                    elif version_patch < 8:
717
718                        incompatible = 1
719
[691]720        return incompatible
[65]721
[691]722    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
[355]723
[691]724        """Call gmetric binary and multicast"""
[65]725
[691]726        cmd = self.binary
[65]727
[707]728        if GMOND_UDP_SEND_CHANNELS:
[61]729
[707]730            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
731
732                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
733
734                debug_msg( 10, printTime() + ' ' + metric_debug)
735
736                gm = Gmetric( c_ip, c_port )
737
738                gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
739
740        elif GMETRIC_TARGET:
741
[691]742            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
743            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
[353]744
[691]745            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
[353]746
[691]747            debug_msg( 10, printTime() + ' ' + metric_debug)
[353]748
[691]749            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
[353]750
[691]751            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
[353]752
[691]753        else:
754            try:
755                cmd = cmd + ' -c' + GMOND_CONF
[353]756
[691]757            except NameError:
[353]758
[705]759                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
[353]760
[691]761            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[353]762
[691]763            if len( units ) > 0:
[409]764
[691]765                cmd = cmd + ' -u"' + units + '"'
[409]766
[691]767            debug_msg( 10, printTime() + ' ' + cmd )
[353]768
[691]769            os.system( cmd )
[353]770
[318]771class DataGatherer:
[23]772
[691]773    """Skeleton class for batch system DataGatherer"""
[256]774
[691]775    def printJobs( self, jobs ):
[355]776
[691]777        """Print a jobinfo overview"""
[318]778
[691]779        for name, attrs in self.jobs.items():
[318]780
[691]781            print 'job %s' %(name)
[318]782
[691]783            for name, val in attrs.items():
[318]784
[691]785                print '\t%s = %s' %( name, val )
[318]786
[691]787    def printJob( self, jobs, job_id ):
[355]788
[691]789        """Print job with job_id from jobs"""
[318]790
[691]791        print 'job %s' %(job_id)
[318]792
[691]793        for name, val in jobs[ job_id ].items():
[318]794
[691]795            print '\t%s = %s' %( name, val )
[318]796
[691]797    def getAttr( self, attrs, name ):
[507]798
[691]799        """Return certain attribute from dictionary, if exists"""
[507]800
[691]801        if attrs.has_key( name ):
[507]802
[691]803            return attrs[ name ]
804        else:
805            return ''
[507]806
[691]807    def jobDataChanged( self, jobs, job_id, attrs ):
[507]808
[691]809        """Check if job with attrs and job_id in jobs has changed"""
[507]810
[691]811        if jobs.has_key( job_id ):
[507]812
[691]813            oldData = jobs[ job_id ]   
814        else:
815            return 1
[507]816
[691]817        for name, val in attrs.items():
[507]818
[691]819            if oldData.has_key( name ):
[507]820
[691]821                if oldData[ name ] != attrs[ name ]:
[507]822
[691]823                    return 1
[507]824
[691]825            else:
826                return 1
[507]827
[691]828        return 0
[507]829
[691]830    def submitJobData( self ):
[507]831
[691]832        """Submit job info list"""
[507]833
[691]834        global BATCH_API
[512]835
[728]836        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[507]837
[724]838        running_jobs = 0
839        queued_jobs  = 0
[507]840
[691]841        # Count how many running/queued jobs we found
842        #
843        for jobid, jobattrs in self.jobs.items():
[507]844
[691]845            if jobattrs[ 'status' ] == 'Q':
[507]846
[691]847                queued_jobs += 1
[507]848
[691]849            elif jobattrs[ 'status' ] == 'R':
[507]850
[691]851                running_jobs += 1
[507]852
[691]853        # Report running/queued jobs as seperate metric for a nice RRD graph
854        #
[728]855        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
856        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
[507]857
[691]858        # Report down/offline nodes in batch (PBS only ATM)
859        #
[866]860        if BATCH_API in [ 'pbs', 'slurm' ]:
[512]861
[691]862            domain        = fqdn_parts( socket.getfqdn() )[1]
[514]863
[728]864            downed_nodes  = list()
865            offline_nodes = list()
[691]866       
867            l        = ['state']
[512]868
[790]869            nodelist = self.getNodeData()
870
871            for name, node in nodelist.items():
872
[691]873                if ( node[ 'state' ].find( "down" ) != -1 ):
[512]874
[691]875                    downed_nodes.append( name )
[512]876
[691]877                if ( node[ 'state' ].find( "offline" ) != -1 ):
[512]878
[691]879                    offline_nodes.append( name )
[512]880
[728]881            downnodeslist    = do_nodelist( downed_nodes )
882            offlinenodeslist = do_nodelist( offline_nodes )
[512]883
[691]884            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
885            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[728]886            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
887            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
[514]888
[691]889        # Now let's spread the knowledge
890        #
891        for jobid, jobattrs in self.jobs.items():
[507]892
[691]893            # Make gmetric values for each job: respect max gmetric value length
894            #
895            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
896            metric_increment    = 0
[507]897
[691]898            # If we have more job info than max gmetric value length allows, split it up
899            # amongst multiple metrics
900            #
901            for val in gmetric_val:
[507]902
[728]903                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
904                self.dp.multicastGmetric( metric_name, val )
[507]905
[691]906                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
907                #
908                metric_increment    = metric_increment + 1
[507]909
[691]910    def compileGmetricVal( self, jobid, jobattrs ):
[507]911
[691]912        """Create a val string for gmetric of jobinfo"""
[507]913
[691]914        gval_lists    = [ ]
915        val_list    = { }
[507]916
[691]917        for val_name, val_value in jobattrs.items():
[507]918
[691]919            # These are our own metric names, i.e.: status, start_timestamp, etc
920            #
921            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
[507]922
[691]923            # These are their corresponding values
924            #
925            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
[507]926
[691]927            if val_name == 'nodes' and jobattrs['status'] == 'R':
[507]928
[691]929                node_str = None
[507]930
[691]931                for node in val_value:
[507]932
[691]933                    if node_str:
[507]934
[691]935                        node_str = node_str + ';' + node
936                    else:
937                        node_str = node
[507]938
[691]939                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
940                    #
941                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
[507]942
[691]943                        # It's too big, we need to make a new gmetric for the additional info
944                        #
945                        val_list[ val_name ]    = node_str
[507]946
[691]947                        gval_lists.append( val_list )
[507]948
[691]949                        val_list        = { }
950                        node_str        = None
[507]951
[691]952                val_list[ val_name ]    = node_str
[507]953
[691]954                gval_lists.append( val_list )
[507]955
[691]956                val_list        = { }
[507]957
[691]958            elif val_value != '':
[507]959
[691]960                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
961                #
962                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
[507]963
[691]964                    # It's too big, we need to make a new gmetric for the additional info
965                    #
966                    gval_lists.append( val_list )
[507]967
[691]968                    val_list        = { }
[507]969
[691]970                val_list[ val_name ]    = val_value
[507]971
[691]972        if len( val_list ) > 0:
[507]973
[691]974            gval_lists.append( val_list )
[507]975
[691]976        str_list    = [ ]
[507]977
[691]978        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
979        #
980        for val_list in gval_lists:
[507]981
[691]982            my_val_str    = None
[507]983
[691]984            for val_name, val_value in val_list.items():
[507]985
[691]986                if type(val_value) == list:
[579]987
[691]988                    val_value    = val_value.join( ',' )
[579]989
[691]990                if my_val_str:
[507]991
[691]992                    try:
993                        # fixme: It's getting
994                        # ('nodes', None) items
995                        my_val_str = my_val_str + ' ' + val_name + '=' + val_value
996                    except:
997                        pass
[623]998
[691]999                else:
1000                    my_val_str = val_name + '=' + val_value
[507]1001
[691]1002            str_list.append( my_val_str )
[507]1003
[691]1004        return str_list
[507]1005
[691]1006    def daemon( self ):
[355]1007
[691]1008        """Run as daemon forever"""
[256]1009
[691]1010        # Fork the first child
1011        #
1012        pid = os.fork()
1013        if pid > 0:
1014            sys.exit(0)  # end parent
[256]1015
[691]1016        # creates a session and sets the process group ID
1017        #
1018        os.setsid()
[318]1019
[691]1020        # Fork the second child
1021        #
1022        pid = os.fork()
1023        if pid > 0:
1024            sys.exit(0)  # end parent
[318]1025
[691]1026        write_pidfile()
[318]1027
[691]1028        # Go to the root directory and set the umask
1029        #
1030        os.chdir('/')
1031        os.umask(0)
[318]1032
[691]1033        sys.stdin.close()
1034        sys.stdout.close()
1035        sys.stderr.close()
[318]1036
[691]1037        os.open('/dev/null', os.O_RDWR)
1038        os.dup2(0, 1)
1039        os.dup2(0, 2)
[318]1040
[691]1041        self.run()
[318]1042
[691]1043    def run( self ):
[355]1044
[691]1045        """Main thread"""
[256]1046
[691]1047        while ( 1 ):
1048       
1049            self.getJobData()
1050            self.submitJobData()
1051            time.sleep( BATCH_POLL_INTERVAL )   
[256]1052
[623]1053# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1054# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1055# with 6.2.  See also the fixmes.
[256]1056
[507]1057class NoJobs (Exception):
[691]1058    """Exception raised by empty job list in qstat output."""
1059    pass
[256]1060
[507]1061class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
[691]1062    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
[318]1063
[691]1064    def __init__(self):
1065        self.value = ""
1066        self.joblist = []
1067        self.job = {}
1068        self.queue = ""
1069        self.in_joblist = False
1070        self.lrequest = False
1071        self.eltq = deque()
1072        xml.sax.handler.ContentHandler.__init__(self)
[318]1073
[691]1074    # The structure of the output is as follows (for SGE 6.0).  It's
1075    # similar for 6.1, but radically different for 6.2, and is
1076    # undocumented generally.  Unfortunately it's voluminous, and probably
1077    # doesn't scale to large clusters/queues.
[318]1078
[691]1079    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1080    #   <djob_info>
1081    #     <qmaster_response>  <!-- job -->
1082    #       ...
1083    #       <JB_ja_template> 
1084    #     <ulong_sublist>
1085    #     ...         <!-- start_time, state ... -->
1086    #     </ulong_sublist>
1087    #       </JB_ja_template> 
1088    #       <JB_ja_tasks>
1089    #     <ulong_sublist>
1090    #       ...       <!-- task info
1091    #     </ulong_sublist>
1092    #     ...
1093    #       </JB_ja_tasks>
1094    #       ...
1095    #     </qmaster_response>
1096    #   </djob_info>
1097    #   <messages>
1098    #   ...
[318]1099
[691]1100    # NB.  We might treat each task as a separate job, like
1101    # straight qstat output, but the web interface expects jobs to
1102    # be identified by integers, not, say, <job number>.<task>.
[318]1103
[691]1104    # So, I lied.  If the job list is empty, we get invalid XML
1105    # like this, which we need to defend against:
[318]1106
[691]1107    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1108    #   <>
1109    #     <ST_name>*</ST_name>
1110    #   </>
1111    # </unknown_jobs>
[318]1112
[691]1113    def startElement(self, name, attrs):
1114        self.value = ""
1115        if name == "djob_info":    # job list
1116            self.in_joblist = True
1117        # The job container is "qmaster_response" in SGE 6.0
1118        # and 6.1, but "element" in 6.2.  This is only the very
1119        # start of what's necessary for 6.2, though (sigh).
1120        elif (name == "qmaster_response" or name == "element") \
1121                and self.eltq[-1] == "djob_info": # job
1122            self.job = {"job_state": "U", "slots": 0,
1123                    "nodes": [], "queued_timestamp": "",
1124                    "queued_timestamp": "", "queue": "",
1125                    "ppn": "0", "RN_max": 0,
1126                    # fixme in endElement
1127                    "requested_memory": 0, "requested_time": 0
1128                    }
1129            self.joblist.append(self.job)
1130        elif name == "qstat_l_requests": # resource request
1131            self.lrequest = True
1132        elif name == "unknown_jobs":
1133            raise NoJobs
1134        self.eltq.append (name)
[318]1135
[691]1136    def characters(self, ch):
1137        self.value += ch
[318]1138
[691]1139    def endElement(self, name): 
1140        """Snarf job elements contents into job dictionary.
1141           Translate keys if appropriate."""
[318]1142
[691]1143        name_trans = {
1144          "JB_job_number": "number",
1145          "JB_job_name": "name", "JB_owner": "owner",
1146          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1147          "JB_submission_time": "queued_timestamp"
1148          }
1149        value = self.value
1150        self.eltq.pop ()
[318]1151
[691]1152        if name == "djob_info":
1153            self.in_joblist = False
1154            self.job = {}
1155        elif name == "JAT_master_queue":
1156            self.job["queue"] = value.split("@")[0]
1157        elif name == "JG_qhostname":
1158            if not (value in self.job["nodes"]):
1159                self.job["nodes"].append(value)
1160        elif name == "JG_slots": # slots in use
1161            self.job["slots"] += int(value)
1162        elif name == "RN_max": # requested slots (tasks or parallel)
1163            self.job["RN_max"] = max (self.job["RN_max"],
1164                          int(value))
1165        elif name == "JAT_state": # job state (bitwise or)
1166            value = int (value)
1167            # Status values from sge_jobL.h
1168            #define JIDLE           0x00000000
1169            #define JHELD           0x00000010
1170            #define JMIGRATING          0x00000020
1171            #define JQUEUED         0x00000040
1172            #define JRUNNING        0x00000080
1173            #define JSUSPENDED          0x00000100
1174            #define JTRANSFERING        0x00000200
1175            #define JDELETED        0x00000400
1176            #define JWAITING        0x00000800
1177            #define JEXITING        0x00001000
1178            #define JWRITTEN        0x00002000
1179            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1180            #define JFINISHED           0x00010000
1181            if value & 0x80:
1182                self.job["status"] = "R"
1183            elif value & 0x40:
1184                self.job["status"] = "Q"
1185            else:
1186                self.job["status"] = "O" # `other'
1187        elif name == "CE_name" and self.lrequest and self.value in \
1188                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1189            # We're in a container for an interesting resource
1190            # request; record which type.
1191            self.lrequest = self.value
1192        elif name == "CE_doubleval" and self.lrequest:
1193            # if we're in a container for an interesting
1194            # resource request, use the maxmimum of the hard
1195            # and soft requests to record the requested CPU
1196            # or core.  Fixme:  I'm not sure if this logic is
1197            # right.
1198            if self.lrequest in ("h_core", "s_core"):
1199                self.job["requested_memory"] = \
1200                    max (float (value),
1201                     self.job["requested_memory"])
1202            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1203            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1204                self.job["requested_time"] = \
1205                    max (float (value),
1206                     self.job["requested_time"])
1207        elif name == "qstat_l_requests":
1208            self.lrequest = False
1209        elif self.job and self.in_joblist:
1210            if name in name_trans:
1211                name = name_trans[name]
1212                self.job[name] = value
[318]1213
[507]1214# Abstracted from PBS original.
1215# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
[520]1216#
1217def do_nodelist( nodes ):
1218
[691]1219    """Translate node list as appropriate."""
[520]1220
[691]1221    nodeslist        = [ ]
1222    my_domain        = fqdn_parts( socket.getfqdn() )[1]
[520]1223
[691]1224    for node in nodes:
[520]1225
[691]1226        host        = node.split( '/' )[0] # not relevant for SGE
1227        h, host_domain    = fqdn_parts(host)
[520]1228
[691]1229        if host_domain == my_domain:
[520]1230
[691]1231            host    = h
[520]1232
[691]1233        if nodeslist.count( host ) == 0:
[520]1234
[691]1235            for translate_pattern in BATCH_HOST_TRANSLATE:
[520]1236
[691]1237                if translate_pattern.find( '/' ) != -1:
[520]1238
[691]1239                    translate_orig    = \
1240                        translate_pattern.split( '/' )[1]
1241                    translate_new    = \
1242                        translate_pattern.split( '/' )[2]
1243                    host = re.sub( translate_orig,
1244                               translate_new, host )
1245            if not host in nodeslist:
1246                nodeslist.append( host )
1247    return nodeslist
[318]1248
[837]1249class SLURMDataGatherer( DataGatherer ):
1250
1251    global pyslurm
1252
1253    """This is the DataGatherer for SLURM"""
1254
1255    def __init__( self ):
1256
1257        """Setup appropriate variables"""
1258
1259        self.jobs       = { }
1260        self.timeoffset = 0
1261        self.dp         = DataProcessor()
1262
1263    def getNodeData( self ):
1264
[866]1265        slurm_type  = pyslurm.node()
[837]1266
[866]1267        slurm_nodes = slurm_type.get()
[837]1268
[866]1269        nodedict    = { }
1270
1271        for node, attrs in slurm_nodes.items():
1272
1273            ( num_state, name_state ) = attrs['node_state'] 
1274
1275            if name_state == 'DOWN':
1276
1277                nodedict[ node ] = { 'state' : 'down' }
1278
1279            elif name_state == 'DRAIN':
1280
1281                nodedict[ node ] = { 'state' : 'offline' }
1282
[837]1283        return nodedict
1284
1285    def getJobData( self ):
1286
1287        """Gather all data on current jobs"""
1288
1289        joblist            = {}
1290
1291        self.cur_time  = time.time()
1292
1293        slurm_type = pyslurm.job()
1294        joblist    = slurm_type.get()
1295
1296        jobs_processed    = [ ]
1297
1298        for name, attrs in joblist.items():
1299            display_queue = 1
1300            job_id        = name
1301
1302            name          = self.getAttr( attrs, 'name' )
1303            queue         = self.getAttr( attrs, 'partition' )
1304
1305            if QUEUE:
1306                for q in QUEUE:
1307                    if q == queue:
1308                        display_queue = 1
1309                        break
1310                    else:
1311                        display_queue = 0
1312                        continue
1313            if display_queue == 0:
1314                continue
1315
1316            owner_uid        = attrs[ 'user_id' ]
1317            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1318
1319            requested_time   = self.getAttr( attrs, 'time_limit' )
[852]1320            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
[837]1321
[852]1322            if min_memory == 0:
1323
1324                requested_memory = ''
1325
1326            else:
1327                requested_memory = min_memory
1328
[853]1329            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
[837]1330
[853]1331            if min_cpus == 0:
1332
1333                ppn = ''
1334
1335            else:
1336                ppn = min_cpus
1337
[837]1338            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1339
1340            status = 'Q'
1341
1342            if status_long == 'RUNNING':
1343
1344                status = 'R'
1345
1346            elif status_long == 'COMPLETED':
1347
1348                continue
1349
1350            jobs_processed.append( job_id )
1351
1352            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1353
1354            start_timestamp = ''
1355            nodeslist       = ''
1356
1357            if status == 'R':
1358
1359                start_timestamp = self.getAttr( attrs, 'start_time' )
[851]1360                nodes           = attrs[ 'nodes' ]
[837]1361
[851]1362                if not nodes:
[837]1363
[851]1364                    # This should not happen
1365
1366                    # Something wrong: running but 'nodes' returned empty by pyslurm
1367                    # Possible pyslurm bug: abort/quit/warning
1368
1369                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1370
1371                    print err_msg
1372                    debug_msg( 0, err_msg )
1373                    sys.exit(1)
1374
1375                my_nodelist = [ ]
1376
1377                slurm_hostlist  = pyslurm.hostlist()
1378                slurm_hostlist.create( nodes )
1379                slurm_hostlist.uniq()
1380
1381                while slurm_hostlist.count() > 0:
1382
1383                    my_nodelist.append( slurm_hostlist.pop() )
1384
1385                slurm_hostlist.destroy()
1386
1387                del slurm_hostlist
1388
1389                nodeslist       = do_nodelist( my_nodelist )
1390
[837]1391                if DETECT_TIME_DIFFS:
1392
1393                    # If a job start if later than our current date,
1394                    # that must mean the Torque server's time is later
1395                    # than our local time.
1396               
1397                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1398
1399                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1400
1401            elif status == 'Q':
1402
1403                nodeslist       = str( attrs[ 'num_nodes' ] )
1404
1405            else:
1406                start_timestamp = ''
1407                nodeslist       = ''
1408
1409            myAttrs                = { }
1410
1411            myAttrs[ 'name' ]             = str( name )
1412            myAttrs[ 'queue' ]            = str( queue )
1413            myAttrs[ 'owner' ]            = str( owner )
1414            myAttrs[ 'requested_time' ]   = str( requested_time )
1415            myAttrs[ 'requested_memory' ] = str( requested_memory )
1416            myAttrs[ 'ppn' ]              = str( ppn )
1417            myAttrs[ 'status' ]           = str( status )
1418            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1419            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1420            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1421            myAttrs[ 'nodes' ]            = nodeslist
1422            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1423            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1424
1425            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1426
1427                self.jobs[ job_id ] = myAttrs
1428
1429        for id, attrs in self.jobs.items():
1430
1431            if id not in jobs_processed:
1432
1433                # This one isn't there anymore; toedeledoki!
1434                #
1435                del self.jobs[ id ]
1436
[318]1437class SgeDataGatherer(DataGatherer):
1438
[691]1439    jobs = {}
[61]1440
[691]1441    def __init__( self ):
1442        self.jobs = {}
1443        self.timeoffset = 0
1444        self.dp = DataProcessor()
[318]1445
[691]1446    def getJobData( self ):
1447        """Gather all data on current jobs in SGE"""
[318]1448
[691]1449        import popen2
[318]1450
[691]1451        self.cur_time = 0
1452        queues = ""
1453        if QUEUE:    # only for specific queues
1454            # Fixme:  assumes queue names don't contain single
1455            # quote or comma.  Don't know what the SGE rules are.
1456            queues = " -q '" + string.join (QUEUE, ",") + "'"
1457        # Note the comment in SgeQstatXMLParser about scaling with
1458        # this method of getting data.  I haven't found better one.
1459        # Output with args `-xml -ext -f -r' is easier to parse
1460        # in some ways, harder in others, but it doesn't provide
1461        # the submission time (at least SGE 6.0).  The pipeline
1462        # into sed corrects bogus XML observed with a configuration
1463        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1464        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
[623]1465sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
[691]1466                           + queues, True)
1467        qstatparser = SgeQstatXMLParser()
1468        parse_err = 0
1469        try:
1470            xml.sax.parse(piping.fromchild, qstatparser)
1471        except NoJobs:
1472            pass
1473        except:
1474            parse_err = 1
[704]1475        if piping.wait():
1476            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
[691]1477            return None
1478        elif parse_err:
1479            debug_msg(10, "Bad XML output from qstat"())
1480            exit (1)
1481        for f in piping.fromchild, piping.tochild, piping.childerr:
1482            f.close()
1483        self.cur_time = time.time()
1484        jobs_processed = []
1485        for job in qstatparser.joblist:
1486            job_id = job["number"]
1487            if job["status"] in [ 'Q', 'R' ]:
1488                jobs_processed.append(job_id)
1489            if job["status"] == "R":
1490                job["nodes"] = do_nodelist (job["nodes"])
1491                # Fixme: why is job["nodes"] sometimes null?
1492                try:
1493                    # Fixme: Is this sensible?  The
1494                    # PBS-type PPN isn't something you use
1495                    # with SGE.
[704]1496                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
[691]1497                except:
1498                    job["ppn"] = 0
1499                if DETECT_TIME_DIFFS:
1500                    # If a job start is later than our
1501                    # current date, that must mean
1502                    # the SGE server's time is later
1503                    # than our local time.
[704]1504                    start_timestamp = int (job["start_timestamp"])
1505                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
[318]1506
[704]1507                        self.timeoffset    = start_timestamp - int(self.cur_time)
[691]1508            else:
1509                # fixme: Note sure what this should be:
1510                job["ppn"] = job["RN_max"]
1511                job["nodes"] = "1"
[318]1512
[691]1513            myAttrs = {}
1514            for attr in ["name", "queue", "owner",
1515                     "requested_time", "status",
1516                     "requested_memory", "ppn",
1517                     "start_timestamp", "queued_timestamp"]:
1518                myAttrs[attr] = str(job[attr])
1519            myAttrs["nodes"] = job["nodes"]
[704]1520            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
[691]1521            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1522            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
[318]1523
[704]1524            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
[691]1525                self.jobs[job_id] = myAttrs
1526        for id, attrs in self.jobs.items():
1527            if id not in jobs_processed:
1528                del self.jobs[id]
[318]1529
[524]1530# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1531# Requres LSFObject http://sourceforge.net/projects/lsfobject
1532#
1533class LsfDataGatherer(DataGatherer):
[525]1534
[691]1535    """This is the DataGatherer for LSf"""
[524]1536
[691]1537    global lsfObject
[524]1538
[691]1539    def __init__( self ):
[525]1540
[691]1541        self.jobs = { }
1542        self.timeoffset = 0
1543        self.dp = DataProcessor()
1544        self.initLsfQuery()
[524]1545
[691]1546    def _countDuplicatesInList( self, dupedList ):
[525]1547
[691]1548        countDupes    = { }
[525]1549
[691]1550        for item in dupedList:
[525]1551
[691]1552            if not countDupes.has_key( item ):
[525]1553
[691]1554                countDupes[ item ]    = 1
1555            else:
1556                countDupes[ item ]    = countDupes[ item ] + 1
[525]1557
[691]1558        dupeCountList    = [ ]
[525]1559
[691]1560        for item, count in countDupes.items():
[525]1561
[691]1562            dupeCountList.append( ( item, count ) )
[525]1563
[691]1564        return dupeCountList
[524]1565#
1566#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1567#print _countDuplicatesInList(lst)
1568#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1569########################
1570
[691]1571    def initLsfQuery( self ):
1572        self.pq = None
1573        self.pq = lsfObject.jobInfoEntObject()
[524]1574
[691]1575    def getJobData( self, known_jobs="" ):
1576        """Gather all data on current jobs in LSF"""
1577        if len( known_jobs ) > 0:
1578            jobs = known_jobs
1579        else:
1580            jobs = { }
1581        joblist = {}
1582        joblist = self.pq.getJobInfo()
1583        nodelist = ''
[524]1584
[691]1585        self.cur_time = time.time()
[524]1586
[691]1587        jobs_processed = [ ]
[524]1588
[691]1589        for name, attrs in joblist.items():
1590            job_id = str(name)
1591            jobs_processed.append( job_id )
1592            name = self.getAttr( attrs, 'jobName' )
1593            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1594            owner = self.getAttr( attrs, 'user' )
[524]1595
1596### THIS IS THE rLimit List index values
[691]1597#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1598#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1599#define LSF_RLIMIT_DATA     2        /* data size */
1600#define LSF_RLIMIT_STACK    3        /* stack size */
1601#define LSF_RLIMIT_CORE     4        /* core file size */
1602#define LSF_RLIMIT_RSS      5        /* resident set size */
1603#define LSF_RLIMIT_NOFILE   6        /* open files */
1604#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1605#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
[524]1606#define LSF_RLIMIT_SWAP     8
[691]1607#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1608#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1609#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1610#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
[524]1611
[691]1612            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1613            if requested_time == -1: 
1614                requested_time = ""
1615            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1616            if requested_memory == -1: 
1617                requested_memory = ""
[524]1618# This tries to get proc per node. We don't support this right now
[691]1619            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1620            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1621            if requested_cpus == None or requested_cpus == "":
1622                requested_cpus = 1
[524]1623
[691]1624            if QUEUE:
1625                for q in QUEUE:
1626                    if q == queue:
1627                        display_queue = 1
1628                        break
1629                    else:
1630                        display_queue = 0
1631                        continue
1632            if display_queue == 0:
1633                continue
[524]1634
[691]1635            runState = self.getAttr( attrs, 'status' )
1636            if runState == 4:
1637                status = 'R'
1638            else:
1639                status = 'Q'
1640            queued_timestamp = self.getAttr( attrs, 'submitTime' )
[524]1641
[691]1642            if status == 'R':
1643                start_timestamp = self.getAttr( attrs, 'startTime' )
1644                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1645                nodelist = nodesCpu.keys()
[524]1646
[691]1647                if DETECT_TIME_DIFFS:
[524]1648
[691]1649                    # If a job start if later than our current date,
1650                    # that must mean the Torque server's time is later
1651                    # than our local time.
[524]1652
[691]1653                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
[524]1654
[691]1655                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[524]1656
[691]1657            elif status == 'Q':
1658                start_timestamp = ''
1659                count_mynodes = 0
1660                numeric_node = 1
1661                nodelist = ''
[524]1662
[691]1663            myAttrs = { }
1664            if name == "":
1665                myAttrs['name'] = "none"
1666            else:
1667                myAttrs['name'] = name
[524]1668
[691]1669            myAttrs[ 'owner' ]        = owner
1670            myAttrs[ 'requested_time' ]    = str(requested_time)
1671            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1672            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1673            myAttrs[ 'ppn' ]        = str( ppn )
1674            myAttrs[ 'status' ]        = status
1675            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1676            myAttrs[ 'queue' ]        = str(queue)
1677            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1678            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1679            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1680            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1681            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
[524]1682
[691]1683            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1684                jobs[ job_id ] = myAttrs
[524]1685
[691]1686                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
[524]1687
[691]1688        for id, attrs in jobs.items():
1689            if id not in jobs_processed:
1690                # This one isn't there anymore
1691                #
1692                del jobs[ id ]
1693        self.jobs=jobs
[524]1694
1695
[355]1696class PbsDataGatherer( DataGatherer ):
[318]1697
[691]1698    """This is the DataGatherer for PBS and Torque"""
[318]1699
[691]1700    global PBSQuery, PBSError
[256]1701
[691]1702    def __init__( self ):
[354]1703
[691]1704        """Setup appropriate variables"""
[23]1705
[785]1706        self.jobs       = { }
1707        self.timeoffset = 0
1708        self.dp         = DataProcessor()
[354]1709
[691]1710        self.initPbsQuery()
[23]1711
[691]1712    def initPbsQuery( self ):
[91]1713
[785]1714        self.pq = None
[354]1715
[788]1716        try:
[354]1717
[788]1718            if( BATCH_SERVER ):
[91]1719
[788]1720                self.pq = PBSQuery( BATCH_SERVER )
1721            else:
1722                self.pq = PBSQuery()
1723
1724        except PBSError, details:
1725            print 'Cannot connect to pbs server'
1726            print details
1727            sys.exit( 1 )
1728
[691]1729        try:
[791]1730            # TODO: actually use new data structure
[691]1731            self.pq.old_data_structure()
[656]1732
[691]1733        except AttributeError:
[656]1734
[691]1735            # pbs_query is older
1736            #
1737            pass
[656]1738
[790]1739    def getNodeData( self ):
1740
1741        nodedict = { }
1742
1743        try:
1744            nodedict = self.pq.getnodes()
1745
1746        except PBSError, detail:
1747
1748            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1749
1750        return nodedict
1751
[691]1752    def getJobData( self ):
[354]1753
[691]1754        """Gather all data on current jobs in Torque"""
[26]1755
[785]1756        joblist            = {}
1757        self.cur_time      = 0
[349]1758
[691]1759        try:
1760            joblist        = self.pq.getjobs()
[785]1761            self.cur_time  = time.time()
[354]1762
[691]1763        except PBSError, detail:
[354]1764
[790]1765            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
[691]1766            return None
[354]1767
[691]1768        jobs_processed    = [ ]
[26]1769
[691]1770        for name, attrs in joblist.items():
[785]1771            display_queue = 1
1772            job_id        = name.split( '.' )[0]
[26]1773
[785]1774            name          = self.getAttr( attrs, 'Job_Name' )
1775            queue         = self.getAttr( attrs, 'queue' )
[317]1776
[691]1777            if QUEUE:
1778                for q in QUEUE:
1779                    if q == queue:
1780                        display_queue = 1
1781                        break
1782                    else:
1783                        display_queue = 0
1784                        continue
1785            if display_queue == 0:
1786                continue
[317]1787
1788
[691]1789            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
[785]1790            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1791            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
[95]1792
[785]1793            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
[95]1794
[785]1795            ppn = ''
[281]1796
[691]1797            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
[95]1798
[785]1799                mynoderequest_fields = mynoderequest.split( ':' )
[281]1800
[691]1801                for mynoderequest_field in mynoderequest_fields:
[281]1802
[691]1803                    if mynoderequest_field.find( 'ppn' ) != -1:
[281]1804
[785]1805                        ppn = mynoderequest_field.split( 'ppn=' )[1]
[281]1806
[785]1807            status = self.getAttr( attrs, 'job_state' )
[25]1808
[691]1809            if status in [ 'Q', 'R' ]:
[450]1810
[691]1811                jobs_processed.append( job_id )
[450]1812
[785]1813            queued_timestamp = self.getAttr( attrs, 'ctime' )
[243]1814
[691]1815            if status == 'R':
[133]1816
[785]1817                start_timestamp = self.getAttr( attrs, 'mtime' )
1818                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
[133]1819
[785]1820                nodeslist       = do_nodelist( nodes )
[354]1821
[691]1822                if DETECT_TIME_DIFFS:
[185]1823
[691]1824                    # If a job start if later than our current date,
1825                    # that must mean the Torque server's time is later
1826                    # than our local time.
1827               
1828                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
[185]1829
[785]1830                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[185]1831
[691]1832            elif status == 'Q':
[95]1833
[691]1834                # 'mynodequest' can be a string in the following syntax according to the
1835                # Torque Administator's manual:
1836                #
1837                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1838                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1839                # etc
1840                #
[451]1841
[691]1842                #
1843                # For now we only count the amount of nodes request and ignore properties
1844                #
[451]1845
[785]1846                start_timestamp = ''
1847                count_mynodes   = 0
[354]1848
[691]1849                for node in mynoderequest.split( '+' ):
[67]1850
[691]1851                    # Just grab the {node_count|hostname} part and ignore properties
1852                    #
[785]1853                    nodepart     = node.split( ':' )[0]
[67]1854
[691]1855                    # Let's assume a node_count value
1856                    #
[785]1857                    numeric_node = 1
[451]1858
[691]1859                    # Chop the value up into characters
1860                    #
1861                    for letter in nodepart:
[67]1862
[691]1863                        # If this char is not a digit (0-9), this must be a hostname
1864                        #
1865                        if letter not in string.digits:
[133]1866
[785]1867                            numeric_node = 0
[133]1868
[691]1869                    # If this is a hostname, just count this as one (1) node
1870                    #
1871                    if not numeric_node:
[354]1872
[785]1873                        count_mynodes = count_mynodes + 1
[691]1874                    else:
[451]1875
[691]1876                        # If this a number, it must be the node_count
1877                        # and increase our count with it's value
1878                        #
1879                        try:
[785]1880                            count_mynodes = count_mynodes + int( nodepart )
[354]1881
[691]1882                        except ValueError, detail:
[354]1883
[691]1884                            # When we arrive here I must be bugged or very confused
1885                            # THIS SHOULD NOT HAPPEN!
1886                            #
1887                            debug_msg( 10, str( detail ) )
1888                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1889                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1890                            debug_msg( 10, 'job = ' + str( name ) )
1891                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1892                       
[785]1893                nodeslist       = str( count_mynodes )
[691]1894            else:
[785]1895                start_timestamp = ''
1896                nodeslist       = ''
[133]1897
[691]1898            myAttrs                = { }
[26]1899
[785]1900            myAttrs[ 'name' ]             = str( name )
1901            myAttrs[ 'queue' ]            = str( queue )
1902            myAttrs[ 'owner' ]            = str( owner )
1903            myAttrs[ 'requested_time' ]   = str( requested_time )
1904            myAttrs[ 'requested_memory' ] = str( requested_memory )
1905            myAttrs[ 'ppn' ]              = str( ppn )
1906            myAttrs[ 'status' ]           = str( status )
1907            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1908            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1909            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1910            myAttrs[ 'nodes' ]            = nodeslist
1911            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
[691]1912            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
[354]1913
[691]1914            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
[61]1915
[785]1916                self.jobs[ job_id ] = myAttrs
[26]1917
[691]1918        for id, attrs in self.jobs.items():
[76]1919
[691]1920            if id not in jobs_processed:
[76]1921
[691]1922                # This one isn't there anymore; toedeledoki!
1923                #
1924                del self.jobs[ id ]
[76]1925
[362]1926GMETRIC_DEFAULT_TYPE    = 'string'
1927GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1928GMETRIC_DEFAULT_PORT    = '8649'
[700]1929GMETRIC_DEFAULT_UNITS   = ''
[362]1930
1931class Gmetric:
1932
[691]1933    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
[362]1934
[700]1935    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1936    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1937    protocol        = ( 'udp', 'multicast' )
[362]1938
[691]1939    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
[700]1940               
[691]1941        global GMETRIC_DEFAULT_TYPE
[362]1942
[691]1943        self.prot       = self.checkHostProtocol( host )
[700]1944        self.data_msg   = xdrlib.Packer()
1945        self.meta_msg   = xdrlib.Packer()
[691]1946        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
[362]1947
[691]1948        if self.prot not in self.protocol:
[362]1949
[691]1950            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
[362]1951
[691]1952        if self.prot == 'multicast':
[362]1953
[691]1954            # Set multicast options
1955            #
1956            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
[362]1957
[691]1958        self.hostport   = ( host, int( port ) )
1959        self.slopestr   = 'both'
1960        self.tmax       = 60
[362]1961
[691]1962    def checkHostProtocol( self, ip ):
[362]1963
[691]1964        """Detect if a ip adress is a multicast address"""
[471]1965
[691]1966        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1967        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
[362]1968
[700]1969        ip_fields               = ip.split( '.' )
[362]1970
[691]1971        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
[362]1972
[691]1973            return 'multicast'
1974        else:
1975            return 'udp'
[362]1976
[691]1977    def send( self, name, value, dmax, typestr = '', units = '' ):
[362]1978
[691]1979        if len( units ) == 0:
[700]1980            units       = GMETRIC_DEFAULT_UNITS
[471]1981
[691]1982        if len( typestr ) == 0:
[700]1983            typestr     = GMETRIC_DEFAULT_TYPE
[362]1984
[700]1985        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
[409]1986
[700]1987        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1988        data_rt = self.socket.sendto( data_msg, self.hostport )
[362]1989
[700]1990        return ( meta_rt, data_rt )
[362]1991
[700]1992    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1993
1994        hostname = "unset"
1995
[691]1996        if slopestr not in self.slope:
[362]1997
[691]1998            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
[362]1999
[691]2000        if typestr not in self.type:
[362]2001
[691]2002            raise ValueError( "Type must be one of: " + str( self.type ) )
[362]2003
[691]2004        if len( name ) == 0:
[362]2005
[691]2006            raise ValueError( "Name must be non-empty" )
[362]2007
[700]2008        self.meta_msg.reset()
2009        self.meta_msg.pack_int( 128 )
[362]2010
[700]2011        if not spoof:
2012            self.meta_msg.pack_string( hostname )
2013        else:
2014            self.meta_msg.pack_string( spoof )
[362]2015
[700]2016        self.meta_msg.pack_string( name )
2017
2018        if not spoof:
2019            self.meta_msg.pack_int( 0 )
2020        else:
2021            self.meta_msg.pack_int( 1 )
2022           
2023        self.meta_msg.pack_string( typestr )
2024        self.meta_msg.pack_string( name )
2025        self.meta_msg.pack_string( unitstr )
2026        self.meta_msg.pack_int( self.slope[ slopestr ] )
2027        self.meta_msg.pack_uint( int( tmax ) )
2028        self.meta_msg.pack_uint( int( dmax ) )
2029
2030        if not group:
2031            self.meta_msg.pack_int( 0 )
2032        else:
2033            self.meta_msg.pack_int( 1 )
2034            self.meta_msg.pack_string( "GROUP" )
2035            self.meta_msg.pack_string( group )
2036
2037        self.data_msg.reset()
2038        self.data_msg.pack_int( 128+5 )
2039
2040        if not spoof:
2041            self.data_msg.pack_string( hostname )
2042        else:
2043            self.data_msg.pack_string( spoof )
2044
2045        self.data_msg.pack_string( name )
2046
2047        if not spoof:
2048            self.data_msg.pack_int( 0 )
2049        else:
2050            self.data_msg.pack_int( 1 )
2051
2052        self.data_msg.pack_string( "%s" )
2053        self.data_msg.pack_string( str( value ) )
2054
2055        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2056
[26]2057def printTime( ):
[354]2058
[691]2059    """Print current time/date in human readable format for log/debug"""
[26]2060
[691]2061    return time.strftime("%a, %d %b %Y %H:%M:%S")
[26]2062
2063def debug_msg( level, msg ):
[354]2064
[691]2065    """Print msg if at or above current debug level"""
[26]2066
[691]2067    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
[377]2068
[691]2069    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2070        sys.stderr.write( msg + '\n' )
[26]2071
[691]2072    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2073        syslog.syslog( msg )
[373]2074
[307]2075def write_pidfile():
2076
[691]2077    # Write pidfile if PIDFILE is set
2078    #
2079    if PIDFILE:
[307]2080
[691]2081        pid    = os.getpid()
[354]2082
[691]2083        pidfile    = open( PIDFILE, 'w' )
[354]2084
[691]2085        pidfile.write( str( pid ) )
2086        pidfile.close()
[307]2087
[23]2088def main():
[354]2089
[691]2090    """Application start"""
[23]2091
[837]2092    global PBSQuery, PBSError, lsfObject, pyslurm
[854]2093    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
[256]2094
[691]2095    if not processArgs( sys.argv[1:] ):
[354]2096
[691]2097        sys.exit( 1 )
[212]2098
[691]2099    # Load appropriate DataGatherer depending on which BATCH_API is set
2100    # and any required modules for the Gatherer
2101    #
2102    if BATCH_API == 'pbs':
[256]2103
[691]2104        try:
2105            from PBSQuery import PBSQuery, PBSError
[256]2106
[787]2107        except ImportError, details:
[256]2108
[854]2109            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
[787]2110            print details
[691]2111            sys.exit( 1 )
[256]2112
[691]2113        gather = PbsDataGatherer()
[256]2114
[691]2115    elif BATCH_API == 'sge':
[256]2116
[854]2117        if BATCH_SERVER != 'localhost':
2118
2119            # Print and log, but continue execution
2120            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2121            print err_msg
2122            debug_msg( 0, err_msg )
2123
[691]2124        # Tested with SGE 6.0u11.
2125        #
2126        gather = SgeDataGatherer()
[368]2127
[691]2128    elif BATCH_API == 'lsf':
[368]2129
[854]2130        if BATCH_SERVER != 'localhost':
2131
2132            # Print and log, but continue execution
2133            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2134            print err_msg
2135            debug_msg( 0, err_msg )
2136
[691]2137        try:
2138            from lsfObject import lsfObject
2139        except:
[854]2140            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2141            sys.exit( 1 )
[256]2142
[691]2143        gather = LsfDataGatherer()
[524]2144
[837]2145    elif BATCH_API == 'slurm':
2146
[854]2147        if BATCH_SERVER != 'localhost':
2148
2149            # Print and log, but continue execution
2150            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2151            print err_msg
2152            debug_msg( 0, err_msg )
2153
[837]2154        try:
2155            import pyslurm
2156        except:
2157            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
[854]2158            sys.exit( 1 )
[837]2159
2160        gather = SLURMDataGatherer()
2161
[691]2162    else:
[786]2163        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
[354]2164
[691]2165        sys.exit( 1 )
[256]2166
[691]2167    if( DAEMONIZE and USE_SYSLOG ):
[373]2168
[691]2169        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[373]2170
[691]2171    if DAEMONIZE:
[354]2172
[691]2173        gather.daemon()
2174    else:
2175        gather.run()
[23]2176
[256]2177# wh00t? someone started me! :)
[65]2178#
[23]2179if __name__ == '__main__':
[691]2180    main()
Note: See TracBrowser for help on using the repository browser.