source: branches/1.1/jobmond/jobmond.py @ 944

Last change on this file since 944 was 944, checked in by ramonb, 10 years ago

jobmond/jobmond.py:

  • cgi escaping was not working: still errors occured
  • now just replace special characters with underscores
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 65.0 KB
RevLine 
[23]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[691]5# Copyright (C) 2006-2013  Ramon Bastiaans
[623]6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
[225]7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
[228]22# SVN $Id: jobmond.py 944 2014-01-16 14:28:14Z ramonb $
[227]23#
[23]24
[694]25# vi :set ts=4
26
[471]27import sys, getopt, ConfigParser, time, os, socket, string, re
[837]28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
[318]29from xml.sax.handler import feature_namespaces
[623]30from collections import deque
[699]31from glob import glob
[318]32
[913]33VERSION='__VERSION__'
[307]34
[471]35def usage( ver ):
36
[691]37    print 'jobmond %s' %VERSION
[471]38
[691]39    if ver:
40        return 0
[471]41
[691]42    print
43    print 'Purpose:'
44    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
45    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
46    print
47    print 'Usage:    jobmond [OPTIONS]'
48    print
49    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
50    print '  -p, --pidfile=FILE    Use pid file to store the process id'
51    print '  -h, --help        Print help and exit'
52    print '  -v, --version      Print version and exit'
53    print
[307]54
[212]55def processArgs( args ):
[26]56
[701]57    SHORT_L      = 'p:hvc:'
58    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
[165]59
[704]60    global PIDFILE, JOBMOND_CONF
[701]61    PIDFILE      = None
[61]62
[701]63    JOBMOND_CONF = '/etc/jobmond.conf'
[354]64
[691]65    try:
[68]66
[691]67        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
[185]68
[691]69    except getopt.GetoptError, detail:
[212]70
[691]71        print detail
[851]72        usage( False )
[691]73        sys.exit( 1 )
[212]74
[691]75    for opt, value in opts:
[212]76
[691]77        if opt in [ '--config', '-c' ]:
78       
[701]79            JOBMOND_CONF = value
[212]80
[691]81        if opt in [ '--pidfile', '-p' ]:
[212]82
[701]83            PIDFILE      = value
[691]84       
85        if opt in [ '--help', '-h' ]:
[307]86 
[691]87            usage( False )
88            sys.exit( 0 )
[212]89
[691]90        if opt in [ '--version', '-v' ]:
[471]91
[691]92            usage( True )
93            sys.exit( 0 )
[471]94
[701]95    return loadConfig( JOBMOND_CONF )
[212]96
[944]97html_escape_table = {
98    "&": "_",
99    '"': "_",
100    "'": "_",
101    ">": "_",
102    "<": "_",
103}
104
105# If this is too slow:
106# - re.sub(r'[\&|\>\<\'\"]','_', variable )
107# - re.sub(r'[\&\>\<\'\"]','_', variable )
108# - and even faster: compile regex
109
110def html_escape(text):
111    """Produce entities within text."""
112    return string.join((html_escape_table.get(c,c) for c in text), '')
113
[520]114class GangliaConfigParser:
115
[699]116    def __init__( self, filename ):
[520]117
[699]118        self.conf_lijst   = [ ]
119        self.conf_dict    = { }
120        self.filename     = filename
121        self.file_pointer = file( filename, 'r' )
122        self.lexx         = shlex.shlex( self.file_pointer )
123        self.lexx.whitespace_split = True
[520]124
[699]125        self.parse()
[520]126
[699]127    def __del__( self ):
[520]128
[699]129        """
130        Cleanup: close file descriptor
131        """
132
133        self.file_pointer.close()
134        del self.lexx
135        del self.conf_lijst
136
[691]137    def removeQuotes( self, value ):
[520]138
[699]139        clean_value = value
140        clean_value = clean_value.replace( "'", "" )
141        clean_value = clean_value.replace( '"', '' )
142        clean_value = clean_value.strip()
[520]143
[691]144        return clean_value
[520]145
[699]146    def removeBraces( self, value ):
[520]147
[699]148        clean_value = value
149        clean_value = clean_value.replace( "(", "" )
150        clean_value = clean_value.replace( ')', '' )
151        clean_value = clean_value.strip()
[520]152
[699]153        return clean_value
[520]154
[699]155    def parse( self ):
[520]156
[699]157        """
158        Parse self.filename using shlex scanning.
159        - Removes /* comments */
160        - Traverses (recursively) through all include () statements
161        - Stores complete valid config tokens in self.conf_list
[520]162
[699]163        i.e.:
164            ['globals',
165             '{',
166             'daemonize',
167             '=',
168             'yes',
169             'setuid',
170             '=',
171             'yes',
172             'user',
173             '=',
174             'ganglia',
175             'debug_level',
176             '=',
177             '0',
178             <etc> ]
179        """
[520]180
[699]181        t = 'bogus'
182        c = False
183        i = False
[520]184
[699]185        while t != self.lexx.eof:
186            #print 'get token'
187            t = self.lexx.get_token()
[520]188
[699]189            if len( t ) >= 2:
[520]190
[699]191                if len( t ) >= 4:
[520]192
[699]193                    if t[:2] == '/*' and t[-2:] == '*/':
[520]194
[699]195                        #print 'comment line'
196                        #print 'skipping: %s' %t
197                        continue
[520]198
[699]199                if t == '/*' or t[:2] == '/*':
200                    c = True
201                    #print 'comment start'
202                    #print 'skipping: %s' %t
203                    continue
[520]204
[699]205                if t == '*/' or t[-2:] == '*/':
206                    c = False
207                    #print 'skipping: %s' %t
208                    #print 'comment end'
209                    continue
210
211            if c:
212                #print 'skipping: %s' %t
213                continue
214
215            if t == 'include':
216                i = True
217                #print 'include start'
218                #print 'skipping: %s' %t
219                continue
220
221            if i:
222
223                #print 'include start: %s' %t
224
225                t2 = self.removeQuotes( t )
226                t2 = self.removeBraces( t )
227
228                for in_file in glob( self.removeQuotes(t2) ):
229
230                    #print 'including file: %s' %in_file
231                    parse_infile = GangliaConfigParser( in_file )
232
233                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
234
235                    del parse_infile
236
237                i = False
238                #print 'include end'
239                #print 'skipping: %s' %t
240                continue
241
242            #print 'keep: %s' %t
243            self.conf_lijst.append( self.removeQuotes(t) )
244
245    def getConfLijst( self ):
246
247        return self.conf_lijst
248
249    def confListToDict( self, parent_list=None ):
250
251        """
252        Recursively traverses a conf_list and creates dictionary from it
253        """
254
255        new_dict = { }
256        count    = 0
257        skip     = 0
258
259        if not parent_list:
260            parent_list = self.conf_lijst
261
262        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
263
264        for n, c in enumerate( parent_list ):
265
266            count = count + 1
267
268            #print 'CL: n %d c %s' %(n, c)
269
270            if skip > 0:
271
272                #print '- skipped'
273                skip = skip - 1
274                continue
275
276            if (n+1) <= (len( parent_list )-1):
277
278                if parent_list[(n+1)] == '{':
279
280                    if not new_dict.has_key( c ):
281                        new_dict[ c ] = [ ]
282
283                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
284                    new_dict[ c ].append( temp_new_dict )
285
286                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
287
288                    if not new_dict.has_key( c ):
289                        new_dict[ c ] = [ ]
290
291                    new_dict[ c ].append( parent_list[ (n+2) ] )
292
293                    skip = 2
294
295                if parent_list[n] == '}':
296
297                    #print 'leaving confListToDict(): new dict = %s' %new_dict
298                    return (new_dict, count)
299
300    def makeConfDict( self ):
301
302        """
303        Walks through self.conf_list and creates a dictionary based upon config values
304
305        i.e.:
306            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
307                                                         'ip': ['"127.0.0.1"'],
308                                                         'mask': ['32']}]}],
309                                    'port': ['8649']}],
310            'udp_recv_channel': [{'port': ['8649']}],
311            'udp_send_channel': [{'host': ['145.101.32.3'],
312                                  'port': ['8649']},
313                                 {'host': ['145.101.32.207'],
314                                  'port': ['8649']}]}
315        """
316
317        new_dict = { }
318        skip     = 0
319
320        #print 'entering makeConfDict()'
321
322        for n, c in enumerate( self.conf_lijst ):
323
324            #print 'M: n %d c %s' %(n, c)
325
326            if skip > 0:
327
328                #print '- skipped'
329                skip = skip - 1
330                continue
331
332            if (n+1) <= (len( self.conf_lijst )-1):
333
334                if self.conf_lijst[(n+1)] == '{':
335
336                    if not new_dict.has_key( c ):
337                        new_dict[ c ] = [ ]
338
339                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
340                    new_dict[ c ].append( temp_new_dict )
341
342                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
343
344                    if not new_dict.has_key( c ):
345                        new_dict[ c ] = [ ]
346
347                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
348
349                    skip = 2
350
351        self.conf_dict = new_dict
352        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
353
354    def checkConfDict( self ):
355
356        if len( self.conf_lijst ) == 0:
357
358            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
359
360        if len( self.conf_dict ) == 0:
361
362            self.makeConfDict()
363
364    def getConfDict( self ):
365
366        self.checkConfDict()
367        return self.conf_dict
368
369    def getUdpSendChannels( self ):
370
371        self.checkConfDict()
372
[707]373        udp_send_channels = [ ] # IP:PORT
374
375        if not self.conf_dict.has_key( 'udp_send_channel' ):
376            return None
377
378        for u in self.conf_dict[ 'udp_send_channel' ]:
379
380            if u.has_key( 'mcast_join' ):
381
382                ip = u['mcast_join'][0]
383
384            elif u.has_key( 'host' ):
385
386                ip = u['host'][0]
387
388            port = u['port'][0]
389
390            udp_send_channels.append( ( ip, port ) )
391
392        if len( udp_send_channels ) == 0:
393            return None
394
395        return udp_send_channels
396
[699]397    def getSectionLastOption( self, section, option ):
398
399        """
400        Get last option set in a config section that could be set multiple times in multiple (include) files.
401
402        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
403        """
404
405        self.checkConfDict()
406        value = None
407
408        if not self.conf_dict.has_key( section ):
409
410            return None
411
412        # Could be set multiple times in multiple (include) files: get last one set
413        for c in self.conf_dict[ section ]:
414
415                if c.has_key( option ):
416
[705]417                    value = c[ option ][0]
[699]418
[705]419        return value
[699]420
421    def getClusterName( self ):
422
423        return self.getSectionLastOption( 'cluster', 'name' )
424
425    def getVal( self, section, option ):
426
427        return self.getSectionLastOption( section, option )
428
[691]429    def getInt( self, section, valname ):
[520]430
[691]431        value    = self.getVal( section, valname )
[520]432
[691]433        if not value:
[699]434            return None
[520]435
[691]436        return int( value )
[520]437
[691]438    def getStr( self, section, valname ):
[520]439
[691]440        value    = self.getVal( section, valname )
[520]441
[691]442        if not value:
[699]443            return None
[520]444
[691]445        return str( value )
[520]446
447def findGmetric():
448
[691]449    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
[520]450
[691]451        guess    = '%s/%s' %( dir, 'gmetric' )
[520]452
[691]453        if os.path.exists( guess ):
[520]454
[691]455            return guess
[520]456
[691]457    return False
[520]458
[212]459def loadConfig( filename ):
460
[691]461    def getlist( cfg_string ):
[215]462
[691]463        my_list = [ ]
[215]464
[691]465        for item_txt in cfg_string.split( ',' ):
[215]466
[691]467            sep_char = None
[215]468
[691]469            item_txt = item_txt.strip()
[215]470
[691]471            for s_char in [ "'", '"' ]:
[215]472
[691]473                if item_txt.find( s_char ) != -1:
[215]474
[691]475                    if item_txt.count( s_char ) != 2:
[215]476
[691]477                        print 'Missing quote: %s' %item_txt
478                        sys.exit( 1 )
[215]479
[691]480                    else:
[215]481
[691]482                        sep_char = s_char
483                        break
[215]484
[691]485            if sep_char:
[215]486
[691]487                item_txt = item_txt.split( sep_char )[1]
[215]488
[691]489            my_list.append( item_txt )
[215]490
[691]491        return my_list
[215]492
[784]493    if not os.path.isfile( JOBMOND_CONF ):
494
495        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
496        sys.exit( 1 )
497
498    try:
499        f = open( JOBMOND_CONF, 'r' )
500    except IOError, detail:
501        print "Cannot read config file: '%s'" %JOBMOND_CONF
502        sys.exit( 1 )
503    else:
504        f.close()
505
[691]506    cfg        = ConfigParser.ConfigParser()
[212]507
[691]508    cfg.read( filename )
[212]509
[691]510    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
511    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
512    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
513    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
[707]514    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
[212]515
[701]516    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[212]517
[701]518    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[212]519
[691]520    SYSLOG_LEVEL    = -1
[701]521    SYSLOG_FACILITY = None
[377]522
[691]523    try:
[701]524        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[212]525
[691]526    except ConfigParser.NoOptionError:
[373]527
[701]528        USE_SYSLOG  = True
[373]529
[691]530        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
[373]531
[691]532    if USE_SYSLOG:
[373]533
[691]534        try:
[701]535            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[373]536
[691]537        except ConfigParser.NoOptionError:
[373]538
[691]539            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
[701]540            SYSLOG_LEVEL = 0
[373]541
[691]542        try:
[373]543
[691]544            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[373]545
[691]546        except ConfigParser.NoOptionError:
[373]547
[691]548            SYSLOG_FACILITY = syslog.LOG_DAEMON
[373]549
[691]550            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
[373]551
[691]552    try:
[373]553
[701]554        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
[212]555
[691]556    except ConfigParser.NoOptionError:
[265]557
[864]558        # Not required for all API's: only pbs api allows remote connections
559        BATCH_SERVER = None
[265]560
[691]561    try:
562   
[701]563        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
[265]564
[691]565    except ConfigParser.NoOptionError:
[265]566
[691]567        # Backwards compatibility for old configs
568        #
[265]569
[701]570        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
571        api_guess           = 'pbs'
[691]572   
573    try:
[212]574
[701]575        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
[353]576
[691]577    except ConfigParser.NoOptionError:
[353]578
[703]579        # Not specified: assume /etc/ganglia/gmond.conf
[691]580        #
[703]581        GMOND_CONF          = '/etc/ganglia/gmond.conf'
[353]582
[707]583    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
584    GMETRIC_TARGET          = None
[449]585
[707]586    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
[449]587
[707]588    if not GMOND_UDP_SEND_CHANNELS:
[449]589
[701]590        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
[520]591
[691]592        # Couldn't figure it out: let's see if it's in our jobmond.conf
593        #
594        try:
[520]595
[691]596            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
[520]597
[691]598        # Guess not: now just give up
[701]599       
[691]600        except ConfigParser.NoOptionError:
[520]601
[691]602            GMETRIC_TARGET    = None
[520]603
[691]604            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
[520]605
[701]606            gmetric_bin    = findGmetric()
[520]607
[701]608            if gmetric_bin:
[520]609
[701]610                GMETRIC_BINARY     = gmetric_bin
611            else:
612                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
[520]613
[701]614                try:
[520]615
[701]616                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
[520]617
[701]618                except ConfigParser.NoOptionError:
[520]619
[786]620                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
[701]621                    sys.exit( 1 )
[520]622
[701]623    #TODO: is this really still needed or should be automatic
[691]624    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
[212]625
[927]626    try:
627        BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
[215]628
[927]629    except ConfigParser.NoOptionError:
630
631        BATCH_HOST_TRANSLATE = [ ]
632        pass
633
[691]634    try:
[256]635
[691]636        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
[266]637
[691]638    except ConfigParser.NoOptionError, detail:
[266]639
[864]640        print "FATAL ERROR: BATCH_API not set"
641        sys.exit( 1 )
[354]642
[691]643    try:
[317]644
[691]645        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
[317]646
[691]647    except ConfigParser.NoOptionError, detail:
[317]648
[691]649        QUEUE        = None
[353]650
[701]651    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
652
[691]653    return True
[212]654
[507]655def fqdn_parts (fqdn):
[520]656
[691]657    """Return pair of host and domain for fully-qualified domain name arg."""
[520]658
[691]659    parts = fqdn.split (".")
[520]660
[691]661    return (parts[0], string.join(parts[1:], "."))
[507]662
[61]663class DataProcessor:
[355]664
[691]665    """Class for processing of data"""
[61]666
[691]667    binary = None
[61]668
[691]669    def __init__( self, binary=None ):
[355]670
[691]671        """Remember alternate binary location if supplied"""
[61]672
[691]673        global GMETRIC_BINARY, GMOND_CONF
[449]674
[691]675        if binary:
676            self.binary = binary
[61]677
[707]678        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[691]679            self.binary = GMETRIC_BINARY
[449]680
[691]681        # Timeout for XML
682        #
683        # From ganglia's documentation:
684        #
685        # 'A metric will be deleted DMAX seconds after it is received, and
686        # DMAX=0 means eternal life.'
[61]687
[691]688        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
[80]689
[707]690        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[354]691
[691]692            incompatible = self.checkGmetricVersion()
[61]693
[691]694            if incompatible:
[355]695
[903]696                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
[691]697                sys.exit( 1 )
[65]698
[691]699    def checkGmetricVersion( self ):
[355]700
[691]701        """
[903]702        Check version of gmetric is at least 3.3.8
[691]703        for the syntax we use
704        """
[65]705
[691]706        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
[255]707
[691]708        incompatible    = 0
[341]709
[691]710        gfp        = os.popen( self.binary + ' --version' )
[692]711        lines      = gfp.readlines()
[65]712
[691]713        gfp.close()
[355]714
[691]715        for line in lines:
[355]716
[691]717            line = line.split( ' ' )
[65]718
[691]719            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
720           
721                gmetric_version    = line[1].split( '\n' )[0]
[65]722
[691]723                version_major    = int( gmetric_version.split( '.' )[0] )
724                version_minor    = int( gmetric_version.split( '.' )[1] )
725                version_patch    = int( gmetric_version.split( '.' )[2] )
[65]726
[691]727                incompatible    = 0
[65]728
[691]729                if version_major < 3:
[65]730
[691]731                    incompatible = 1
732               
733                elif version_major == 3:
[65]734
[903]735                    if version_minor < 3:
[65]736
[692]737                        incompatible = 1
[65]738
[903]739                    elif version_patch < 8:
740
741                        incompatible = 1
742
[691]743        return incompatible
[65]744
[691]745    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
[355]746
[691]747        """Call gmetric binary and multicast"""
[65]748
[691]749        cmd = self.binary
[65]750
[707]751        if GMOND_UDP_SEND_CHANNELS:
[61]752
[707]753            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
754
755                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
756
757                debug_msg( 10, printTime() + ' ' + metric_debug)
758
759                gm = Gmetric( c_ip, c_port )
760
761                gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
762
763        elif GMETRIC_TARGET:
764
[691]765            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
766            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
[353]767
[691]768            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
[353]769
[691]770            debug_msg( 10, printTime() + ' ' + metric_debug)
[353]771
[691]772            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
[353]773
[691]774            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
[353]775
[691]776        else:
777            try:
778                cmd = cmd + ' -c' + GMOND_CONF
[353]779
[691]780            except NameError:
[353]781
[705]782                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
[353]783
[691]784            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[353]785
[691]786            if len( units ) > 0:
[409]787
[691]788                cmd = cmd + ' -u"' + units + '"'
[409]789
[691]790            debug_msg( 10, printTime() + ' ' + cmd )
[353]791
[691]792            os.system( cmd )
[353]793
[318]794class DataGatherer:
[23]795
[691]796    """Skeleton class for batch system DataGatherer"""
[256]797
[691]798    def printJobs( self, jobs ):
[355]799
[691]800        """Print a jobinfo overview"""
[318]801
[691]802        for name, attrs in self.jobs.items():
[318]803
[691]804            print 'job %s' %(name)
[318]805
[691]806            for name, val in attrs.items():
[318]807
[691]808                print '\t%s = %s' %( name, val )
[318]809
[691]810    def printJob( self, jobs, job_id ):
[355]811
[691]812        """Print job with job_id from jobs"""
[318]813
[691]814        print 'job %s' %(job_id)
[318]815
[691]816        for name, val in jobs[ job_id ].items():
[318]817
[691]818            print '\t%s = %s' %( name, val )
[318]819
[691]820    def getAttr( self, attrs, name ):
[507]821
[691]822        """Return certain attribute from dictionary, if exists"""
[507]823
[691]824        if attrs.has_key( name ):
[507]825
[691]826            return attrs[ name ]
827        else:
828            return ''
[507]829
[691]830    def jobDataChanged( self, jobs, job_id, attrs ):
[507]831
[691]832        """Check if job with attrs and job_id in jobs has changed"""
[507]833
[691]834        if jobs.has_key( job_id ):
[507]835
[691]836            oldData = jobs[ job_id ]   
837        else:
838            return 1
[507]839
[691]840        for name, val in attrs.items():
[507]841
[691]842            if oldData.has_key( name ):
[507]843
[691]844                if oldData[ name ] != attrs[ name ]:
[507]845
[691]846                    return 1
[507]847
[691]848            else:
849                return 1
[507]850
[691]851        return 0
[507]852
[691]853    def submitJobData( self ):
[507]854
[691]855        """Submit job info list"""
[507]856
[691]857        global BATCH_API
[512]858
[728]859        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[507]860
[724]861        running_jobs = 0
862        queued_jobs  = 0
[507]863
[691]864        # Count how many running/queued jobs we found
865        #
866        for jobid, jobattrs in self.jobs.items():
[507]867
[691]868            if jobattrs[ 'status' ] == 'Q':
[507]869
[691]870                queued_jobs += 1
[507]871
[691]872            elif jobattrs[ 'status' ] == 'R':
[507]873
[691]874                running_jobs += 1
[507]875
[691]876        # Report running/queued jobs as seperate metric for a nice RRD graph
877        #
[728]878        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
879        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
[507]880
[691]881        # Report down/offline nodes in batch (PBS only ATM)
882        #
[866]883        if BATCH_API in [ 'pbs', 'slurm' ]:
[512]884
[691]885            domain        = fqdn_parts( socket.getfqdn() )[1]
[514]886
[728]887            downed_nodes  = list()
888            offline_nodes = list()
[691]889       
890            l        = ['state']
[512]891
[790]892            nodelist = self.getNodeData()
893
894            for name, node in nodelist.items():
895
[691]896                if ( node[ 'state' ].find( "down" ) != -1 ):
[512]897
[691]898                    downed_nodes.append( name )
[512]899
[691]900                if ( node[ 'state' ].find( "offline" ) != -1 ):
[512]901
[691]902                    offline_nodes.append( name )
[512]903
[728]904            downnodeslist    = do_nodelist( downed_nodes )
905            offlinenodeslist = do_nodelist( offline_nodes )
[512]906
[691]907            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
908            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[728]909            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
910            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
[514]911
[691]912        # Now let's spread the knowledge
913        #
914        for jobid, jobattrs in self.jobs.items():
[507]915
[691]916            # Make gmetric values for each job: respect max gmetric value length
917            #
918            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
919            metric_increment    = 0
[507]920
[691]921            # If we have more job info than max gmetric value length allows, split it up
922            # amongst multiple metrics
923            #
924            for val in gmetric_val:
[507]925
[728]926                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
927                self.dp.multicastGmetric( metric_name, val )
[507]928
[691]929                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
930                #
931                metric_increment    = metric_increment + 1
[507]932
[691]933    def compileGmetricVal( self, jobid, jobattrs ):
[507]934
[691]935        """Create a val string for gmetric of jobinfo"""
[507]936
[691]937        gval_lists    = [ ]
938        val_list    = { }
[507]939
[691]940        for val_name, val_value in jobattrs.items():
[507]941
[691]942            # These are our own metric names, i.e.: status, start_timestamp, etc
943            #
944            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
[507]945
[691]946            # These are their corresponding values
947            #
948            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
[507]949
[691]950            if val_name == 'nodes' and jobattrs['status'] == 'R':
[507]951
[691]952                node_str = None
[507]953
[691]954                for node in val_value:
[507]955
[691]956                    if node_str:
[507]957
[691]958                        node_str = node_str + ';' + node
959                    else:
960                        node_str = node
[507]961
[691]962                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
963                    #
964                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
[507]965
[691]966                        # It's too big, we need to make a new gmetric for the additional info
967                        #
968                        val_list[ val_name ]    = node_str
[507]969
[691]970                        gval_lists.append( val_list )
[507]971
[691]972                        val_list        = { }
973                        node_str        = None
[507]974
[691]975                val_list[ val_name ]    = node_str
[507]976
[691]977                gval_lists.append( val_list )
[507]978
[691]979                val_list        = { }
[507]980
[691]981            elif val_value != '':
[507]982
[691]983                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
984                #
985                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
[507]986
[691]987                    # It's too big, we need to make a new gmetric for the additional info
988                    #
989                    gval_lists.append( val_list )
[507]990
[691]991                    val_list        = { }
[507]992
[691]993                val_list[ val_name ]    = val_value
[507]994
[691]995        if len( val_list ) > 0:
[507]996
[691]997            gval_lists.append( val_list )
[507]998
[691]999        str_list    = [ ]
[507]1000
[691]1001        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
1002        #
1003        for val_list in gval_lists:
[507]1004
[691]1005            my_val_str    = None
[507]1006
[691]1007            for val_name, val_value in val_list.items():
[507]1008
[691]1009                if type(val_value) == list:
[579]1010
[691]1011                    val_value    = val_value.join( ',' )
[579]1012
[691]1013                if my_val_str:
[507]1014
[691]1015                    try:
1016                        # fixme: It's getting
1017                        # ('nodes', None) items
[944]1018                        my_val_str = my_val_str + ' ' + val_name + '=' + html_escape(val_value)
[691]1019                    except:
1020                        pass
[623]1021
[691]1022                else:
[944]1023                    my_val_str = val_name + '=' + html_escape(val_value)
[507]1024
[691]1025            str_list.append( my_val_str )
[507]1026
[691]1027        return str_list
[507]1028
[691]1029    def daemon( self ):
[355]1030
[691]1031        """Run as daemon forever"""
[256]1032
[691]1033        # Fork the first child
1034        #
1035        pid = os.fork()
1036        if pid > 0:
1037            sys.exit(0)  # end parent
[256]1038
[691]1039        # creates a session and sets the process group ID
1040        #
1041        os.setsid()
[318]1042
[691]1043        # Fork the second child
1044        #
1045        pid = os.fork()
1046        if pid > 0:
1047            sys.exit(0)  # end parent
[318]1048
[691]1049        write_pidfile()
[318]1050
[691]1051        # Go to the root directory and set the umask
1052        #
1053        os.chdir('/')
1054        os.umask(0)
[318]1055
[691]1056        sys.stdin.close()
1057        sys.stdout.close()
1058        sys.stderr.close()
[318]1059
[691]1060        os.open('/dev/null', os.O_RDWR)
1061        os.dup2(0, 1)
1062        os.dup2(0, 2)
[318]1063
[691]1064        self.run()
[318]1065
[691]1066    def run( self ):
[355]1067
[691]1068        """Main thread"""
[256]1069
[691]1070        while ( 1 ):
1071       
1072            self.getJobData()
1073            self.submitJobData()
1074            time.sleep( BATCH_POLL_INTERVAL )   
[256]1075
[623]1076# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1077# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1078# with 6.2.  See also the fixmes.
[256]1079
[507]1080class NoJobs (Exception):
[691]1081    """Exception raised by empty job list in qstat output."""
1082    pass
[256]1083
[507]1084class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
[691]1085    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
[318]1086
[691]1087    def __init__(self):
1088        self.value = ""
1089        self.joblist = []
1090        self.job = {}
1091        self.queue = ""
1092        self.in_joblist = False
1093        self.lrequest = False
1094        self.eltq = deque()
1095        xml.sax.handler.ContentHandler.__init__(self)
[318]1096
[691]1097    # The structure of the output is as follows (for SGE 6.0).  It's
1098    # similar for 6.1, but radically different for 6.2, and is
1099    # undocumented generally.  Unfortunately it's voluminous, and probably
1100    # doesn't scale to large clusters/queues.
[318]1101
[691]1102    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1103    #   <djob_info>
1104    #     <qmaster_response>  <!-- job -->
1105    #       ...
1106    #       <JB_ja_template> 
1107    #     <ulong_sublist>
1108    #     ...         <!-- start_time, state ... -->
1109    #     </ulong_sublist>
1110    #       </JB_ja_template> 
1111    #       <JB_ja_tasks>
1112    #     <ulong_sublist>
1113    #       ...       <!-- task info
1114    #     </ulong_sublist>
1115    #     ...
1116    #       </JB_ja_tasks>
1117    #       ...
1118    #     </qmaster_response>
1119    #   </djob_info>
1120    #   <messages>
1121    #   ...
[318]1122
[691]1123    # NB.  We might treat each task as a separate job, like
1124    # straight qstat output, but the web interface expects jobs to
1125    # be identified by integers, not, say, <job number>.<task>.
[318]1126
[691]1127    # So, I lied.  If the job list is empty, we get invalid XML
1128    # like this, which we need to defend against:
[318]1129
[691]1130    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1131    #   <>
1132    #     <ST_name>*</ST_name>
1133    #   </>
1134    # </unknown_jobs>
[318]1135
[691]1136    def startElement(self, name, attrs):
1137        self.value = ""
1138        if name == "djob_info":    # job list
1139            self.in_joblist = True
1140        # The job container is "qmaster_response" in SGE 6.0
1141        # and 6.1, but "element" in 6.2.  This is only the very
1142        # start of what's necessary for 6.2, though (sigh).
1143        elif (name == "qmaster_response" or name == "element") \
1144                and self.eltq[-1] == "djob_info": # job
1145            self.job = {"job_state": "U", "slots": 0,
1146                    "nodes": [], "queued_timestamp": "",
1147                    "queued_timestamp": "", "queue": "",
1148                    "ppn": "0", "RN_max": 0,
1149                    # fixme in endElement
1150                    "requested_memory": 0, "requested_time": 0
1151                    }
1152            self.joblist.append(self.job)
1153        elif name == "qstat_l_requests": # resource request
1154            self.lrequest = True
1155        elif name == "unknown_jobs":
1156            raise NoJobs
1157        self.eltq.append (name)
[318]1158
[691]1159    def characters(self, ch):
1160        self.value += ch
[318]1161
[691]1162    def endElement(self, name): 
1163        """Snarf job elements contents into job dictionary.
1164           Translate keys if appropriate."""
[318]1165
[691]1166        name_trans = {
1167          "JB_job_number": "number",
1168          "JB_job_name": "name", "JB_owner": "owner",
1169          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1170          "JB_submission_time": "queued_timestamp"
1171          }
1172        value = self.value
1173        self.eltq.pop ()
[318]1174
[691]1175        if name == "djob_info":
1176            self.in_joblist = False
1177            self.job = {}
1178        elif name == "JAT_master_queue":
1179            self.job["queue"] = value.split("@")[0]
1180        elif name == "JG_qhostname":
1181            if not (value in self.job["nodes"]):
1182                self.job["nodes"].append(value)
1183        elif name == "JG_slots": # slots in use
1184            self.job["slots"] += int(value)
1185        elif name == "RN_max": # requested slots (tasks or parallel)
1186            self.job["RN_max"] = max (self.job["RN_max"],
1187                          int(value))
1188        elif name == "JAT_state": # job state (bitwise or)
1189            value = int (value)
1190            # Status values from sge_jobL.h
1191            #define JIDLE           0x00000000
1192            #define JHELD           0x00000010
1193            #define JMIGRATING          0x00000020
1194            #define JQUEUED         0x00000040
1195            #define JRUNNING        0x00000080
1196            #define JSUSPENDED          0x00000100
1197            #define JTRANSFERING        0x00000200
1198            #define JDELETED        0x00000400
1199            #define JWAITING        0x00000800
1200            #define JEXITING        0x00001000
1201            #define JWRITTEN        0x00002000
1202            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1203            #define JFINISHED           0x00010000
1204            if value & 0x80:
1205                self.job["status"] = "R"
1206            elif value & 0x40:
1207                self.job["status"] = "Q"
1208            else:
1209                self.job["status"] = "O" # `other'
1210        elif name == "CE_name" and self.lrequest and self.value in \
1211                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1212            # We're in a container for an interesting resource
1213            # request; record which type.
1214            self.lrequest = self.value
1215        elif name == "CE_doubleval" and self.lrequest:
1216            # if we're in a container for an interesting
1217            # resource request, use the maxmimum of the hard
1218            # and soft requests to record the requested CPU
1219            # or core.  Fixme:  I'm not sure if this logic is
1220            # right.
1221            if self.lrequest in ("h_core", "s_core"):
1222                self.job["requested_memory"] = \
1223                    max (float (value),
1224                     self.job["requested_memory"])
1225            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1226            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1227                self.job["requested_time"] = \
1228                    max (float (value),
1229                     self.job["requested_time"])
1230        elif name == "qstat_l_requests":
1231            self.lrequest = False
1232        elif self.job and self.in_joblist:
1233            if name in name_trans:
1234                name = name_trans[name]
1235                self.job[name] = value
[318]1236
[507]1237# Abstracted from PBS original.
1238# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
[520]1239#
1240def do_nodelist( nodes ):
1241
[691]1242    """Translate node list as appropriate."""
[520]1243
[691]1244    nodeslist        = [ ]
1245    my_domain        = fqdn_parts( socket.getfqdn() )[1]
[520]1246
[691]1247    for node in nodes:
[520]1248
[691]1249        host        = node.split( '/' )[0] # not relevant for SGE
1250        h, host_domain    = fqdn_parts(host)
[520]1251
[691]1252        if host_domain == my_domain:
[520]1253
[691]1254            host    = h
[520]1255
[691]1256        if nodeslist.count( host ) == 0:
[520]1257
[691]1258            for translate_pattern in BATCH_HOST_TRANSLATE:
[520]1259
[691]1260                if translate_pattern.find( '/' ) != -1:
[520]1261
[691]1262                    translate_orig    = \
1263                        translate_pattern.split( '/' )[1]
1264                    translate_new    = \
1265                        translate_pattern.split( '/' )[2]
1266                    host = re.sub( translate_orig,
1267                               translate_new, host )
1268            if not host in nodeslist:
1269                nodeslist.append( host )
1270    return nodeslist
[318]1271
[837]1272class SLURMDataGatherer( DataGatherer ):
1273
1274    global pyslurm
1275
1276    """This is the DataGatherer for SLURM"""
1277
1278    def __init__( self ):
1279
1280        """Setup appropriate variables"""
1281
1282        self.jobs       = { }
1283        self.timeoffset = 0
1284        self.dp         = DataProcessor()
1285
1286    def getNodeData( self ):
1287
[866]1288        slurm_type  = pyslurm.node()
[837]1289
[866]1290        slurm_nodes = slurm_type.get()
[837]1291
[866]1292        nodedict    = { }
1293
1294        for node, attrs in slurm_nodes.items():
1295
1296            ( num_state, name_state ) = attrs['node_state'] 
1297
1298            if name_state == 'DOWN':
1299
1300                nodedict[ node ] = { 'state' : 'down' }
1301
1302            elif name_state == 'DRAIN':
1303
1304                nodedict[ node ] = { 'state' : 'offline' }
1305
[837]1306        return nodedict
1307
1308    def getJobData( self ):
1309
1310        """Gather all data on current jobs"""
1311
1312        joblist            = {}
1313
1314        self.cur_time  = time.time()
1315
1316        slurm_type = pyslurm.job()
1317        joblist    = slurm_type.get()
1318
1319        jobs_processed    = [ ]
1320
1321        for name, attrs in joblist.items():
1322            display_queue = 1
1323            job_id        = name
1324
1325            name          = self.getAttr( attrs, 'name' )
1326            queue         = self.getAttr( attrs, 'partition' )
1327
1328            if QUEUE:
1329                for q in QUEUE:
1330                    if q == queue:
1331                        display_queue = 1
1332                        break
1333                    else:
1334                        display_queue = 0
1335                        continue
1336            if display_queue == 0:
1337                continue
1338
1339            owner_uid        = attrs[ 'user_id' ]
1340            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1341
1342            requested_time   = self.getAttr( attrs, 'time_limit' )
[852]1343            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
[837]1344
[852]1345            if min_memory == 0:
1346
1347                requested_memory = ''
1348
1349            else:
1350                requested_memory = min_memory
1351
[853]1352            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
[837]1353
[853]1354            if min_cpus == 0:
1355
1356                ppn = ''
1357
1358            else:
1359                ppn = min_cpus
1360
[837]1361            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1362
1363            status = 'Q'
1364
1365            if status_long == 'RUNNING':
1366
1367                status = 'R'
1368
1369            elif status_long == 'COMPLETED':
1370
1371                continue
1372
1373            jobs_processed.append( job_id )
1374
1375            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1376
1377            start_timestamp = ''
1378            nodeslist       = ''
1379
1380            if status == 'R':
1381
1382                start_timestamp = self.getAttr( attrs, 'start_time' )
[851]1383                nodes           = attrs[ 'nodes' ]
[837]1384
[851]1385                if not nodes:
[837]1386
[851]1387                    # This should not happen
1388
1389                    # Something wrong: running but 'nodes' returned empty by pyslurm
1390                    # Possible pyslurm bug: abort/quit/warning
1391
1392                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1393
1394                    print err_msg
1395                    debug_msg( 0, err_msg )
1396                    sys.exit(1)
1397
1398                my_nodelist = [ ]
1399
1400                slurm_hostlist  = pyslurm.hostlist()
1401                slurm_hostlist.create( nodes )
1402                slurm_hostlist.uniq()
1403
1404                while slurm_hostlist.count() > 0:
1405
1406                    my_nodelist.append( slurm_hostlist.pop() )
1407
1408                slurm_hostlist.destroy()
1409
1410                del slurm_hostlist
1411
1412                nodeslist       = do_nodelist( my_nodelist )
1413
[837]1414                if DETECT_TIME_DIFFS:
1415
1416                    # If a job start if later than our current date,
1417                    # that must mean the Torque server's time is later
1418                    # than our local time.
1419               
1420                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1421
1422                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1423
1424            elif status == 'Q':
1425
1426                nodeslist       = str( attrs[ 'num_nodes' ] )
1427
1428            else:
1429                start_timestamp = ''
1430                nodeslist       = ''
1431
1432            myAttrs                = { }
1433
1434            myAttrs[ 'name' ]             = str( name )
1435            myAttrs[ 'queue' ]            = str( queue )
1436            myAttrs[ 'owner' ]            = str( owner )
1437            myAttrs[ 'requested_time' ]   = str( requested_time )
1438            myAttrs[ 'requested_memory' ] = str( requested_memory )
1439            myAttrs[ 'ppn' ]              = str( ppn )
1440            myAttrs[ 'status' ]           = str( status )
1441            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1442            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1443            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1444            myAttrs[ 'nodes' ]            = nodeslist
1445            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1446            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1447
1448            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1449
1450                self.jobs[ job_id ] = myAttrs
1451
1452        for id, attrs in self.jobs.items():
1453
1454            if id not in jobs_processed:
1455
1456                # This one isn't there anymore; toedeledoki!
1457                #
1458                del self.jobs[ id ]
1459
[318]1460class SgeDataGatherer(DataGatherer):
1461
[691]1462    jobs = {}
[61]1463
[691]1464    def __init__( self ):
1465        self.jobs = {}
1466        self.timeoffset = 0
1467        self.dp = DataProcessor()
[318]1468
[691]1469    def getJobData( self ):
1470        """Gather all data on current jobs in SGE"""
[318]1471
[691]1472        import popen2
[318]1473
[691]1474        self.cur_time = 0
1475        queues = ""
1476        if QUEUE:    # only for specific queues
1477            # Fixme:  assumes queue names don't contain single
1478            # quote or comma.  Don't know what the SGE rules are.
1479            queues = " -q '" + string.join (QUEUE, ",") + "'"
1480        # Note the comment in SgeQstatXMLParser about scaling with
1481        # this method of getting data.  I haven't found better one.
1482        # Output with args `-xml -ext -f -r' is easier to parse
1483        # in some ways, harder in others, but it doesn't provide
1484        # the submission time (at least SGE 6.0).  The pipeline
1485        # into sed corrects bogus XML observed with a configuration
1486        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1487        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
[623]1488sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
[691]1489                           + queues, True)
1490        qstatparser = SgeQstatXMLParser()
1491        parse_err = 0
1492        try:
1493            xml.sax.parse(piping.fromchild, qstatparser)
1494        except NoJobs:
1495            pass
1496        except:
1497            parse_err = 1
[704]1498        if piping.wait():
1499            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
[691]1500            return None
1501        elif parse_err:
1502            debug_msg(10, "Bad XML output from qstat"())
1503            exit (1)
1504        for f in piping.fromchild, piping.tochild, piping.childerr:
1505            f.close()
1506        self.cur_time = time.time()
1507        jobs_processed = []
1508        for job in qstatparser.joblist:
1509            job_id = job["number"]
1510            if job["status"] in [ 'Q', 'R' ]:
1511                jobs_processed.append(job_id)
1512            if job["status"] == "R":
1513                job["nodes"] = do_nodelist (job["nodes"])
1514                # Fixme: why is job["nodes"] sometimes null?
1515                try:
1516                    # Fixme: Is this sensible?  The
1517                    # PBS-type PPN isn't something you use
1518                    # with SGE.
[704]1519                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
[691]1520                except:
1521                    job["ppn"] = 0
1522                if DETECT_TIME_DIFFS:
1523                    # If a job start is later than our
1524                    # current date, that must mean
1525                    # the SGE server's time is later
1526                    # than our local time.
[704]1527                    start_timestamp = int (job["start_timestamp"])
1528                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
[318]1529
[704]1530                        self.timeoffset    = start_timestamp - int(self.cur_time)
[691]1531            else:
1532                # fixme: Note sure what this should be:
1533                job["ppn"] = job["RN_max"]
1534                job["nodes"] = "1"
[318]1535
[691]1536            myAttrs = {}
1537            for attr in ["name", "queue", "owner",
1538                     "requested_time", "status",
1539                     "requested_memory", "ppn",
1540                     "start_timestamp", "queued_timestamp"]:
1541                myAttrs[attr] = str(job[attr])
1542            myAttrs["nodes"] = job["nodes"]
[704]1543            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
[691]1544            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1545            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
[318]1546
[704]1547            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
[691]1548                self.jobs[job_id] = myAttrs
1549        for id, attrs in self.jobs.items():
1550            if id not in jobs_processed:
1551                del self.jobs[id]
[318]1552
[524]1553# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1554# Requres LSFObject http://sourceforge.net/projects/lsfobject
1555#
1556class LsfDataGatherer(DataGatherer):
[525]1557
[691]1558    """This is the DataGatherer for LSf"""
[524]1559
[691]1560    global lsfObject
[524]1561
[691]1562    def __init__( self ):
[525]1563
[691]1564        self.jobs = { }
1565        self.timeoffset = 0
1566        self.dp = DataProcessor()
1567        self.initLsfQuery()
[524]1568
[691]1569    def _countDuplicatesInList( self, dupedList ):
[525]1570
[691]1571        countDupes    = { }
[525]1572
[691]1573        for item in dupedList:
[525]1574
[691]1575            if not countDupes.has_key( item ):
[525]1576
[691]1577                countDupes[ item ]    = 1
1578            else:
1579                countDupes[ item ]    = countDupes[ item ] + 1
[525]1580
[691]1581        dupeCountList    = [ ]
[525]1582
[691]1583        for item, count in countDupes.items():
[525]1584
[691]1585            dupeCountList.append( ( item, count ) )
[525]1586
[691]1587        return dupeCountList
[524]1588#
1589#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1590#print _countDuplicatesInList(lst)
1591#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1592########################
1593
[691]1594    def initLsfQuery( self ):
1595        self.pq = None
1596        self.pq = lsfObject.jobInfoEntObject()
[524]1597
[691]1598    def getJobData( self, known_jobs="" ):
1599        """Gather all data on current jobs in LSF"""
1600        if len( known_jobs ) > 0:
1601            jobs = known_jobs
1602        else:
1603            jobs = { }
1604        joblist = {}
1605        joblist = self.pq.getJobInfo()
1606        nodelist = ''
[524]1607
[691]1608        self.cur_time = time.time()
[524]1609
[691]1610        jobs_processed = [ ]
[524]1611
[691]1612        for name, attrs in joblist.items():
1613            job_id = str(name)
1614            jobs_processed.append( job_id )
1615            name = self.getAttr( attrs, 'jobName' )
1616            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1617            owner = self.getAttr( attrs, 'user' )
[524]1618
1619### THIS IS THE rLimit List index values
[691]1620#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1621#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1622#define LSF_RLIMIT_DATA     2        /* data size */
1623#define LSF_RLIMIT_STACK    3        /* stack size */
1624#define LSF_RLIMIT_CORE     4        /* core file size */
1625#define LSF_RLIMIT_RSS      5        /* resident set size */
1626#define LSF_RLIMIT_NOFILE   6        /* open files */
1627#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1628#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
[524]1629#define LSF_RLIMIT_SWAP     8
[691]1630#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1631#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1632#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1633#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
[524]1634
[691]1635            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1636            if requested_time == -1: 
1637                requested_time = ""
1638            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1639            if requested_memory == -1: 
1640                requested_memory = ""
[524]1641# This tries to get proc per node. We don't support this right now
[691]1642            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1643            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1644            if requested_cpus == None or requested_cpus == "":
1645                requested_cpus = 1
[524]1646
[691]1647            if QUEUE:
1648                for q in QUEUE:
1649                    if q == queue:
1650                        display_queue = 1
1651                        break
1652                    else:
1653                        display_queue = 0
1654                        continue
1655            if display_queue == 0:
1656                continue
[524]1657
[691]1658            runState = self.getAttr( attrs, 'status' )
1659            if runState == 4:
1660                status = 'R'
1661            else:
1662                status = 'Q'
1663            queued_timestamp = self.getAttr( attrs, 'submitTime' )
[524]1664
[691]1665            if status == 'R':
1666                start_timestamp = self.getAttr( attrs, 'startTime' )
1667                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1668                nodelist = nodesCpu.keys()
[524]1669
[691]1670                if DETECT_TIME_DIFFS:
[524]1671
[691]1672                    # If a job start if later than our current date,
1673                    # that must mean the Torque server's time is later
1674                    # than our local time.
[524]1675
[691]1676                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
[524]1677
[691]1678                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[524]1679
[691]1680            elif status == 'Q':
1681                start_timestamp = ''
1682                count_mynodes = 0
1683                numeric_node = 1
1684                nodelist = ''
[524]1685
[691]1686            myAttrs = { }
1687            if name == "":
1688                myAttrs['name'] = "none"
1689            else:
1690                myAttrs['name'] = name
[524]1691
[691]1692            myAttrs[ 'owner' ]        = owner
1693            myAttrs[ 'requested_time' ]    = str(requested_time)
1694            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1695            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1696            myAttrs[ 'ppn' ]        = str( ppn )
1697            myAttrs[ 'status' ]        = status
1698            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1699            myAttrs[ 'queue' ]        = str(queue)
1700            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1701            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1702            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1703            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1704            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
[524]1705
[691]1706            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1707                jobs[ job_id ] = myAttrs
[524]1708
[691]1709                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
[524]1710
[691]1711        for id, attrs in jobs.items():
1712            if id not in jobs_processed:
1713                # This one isn't there anymore
1714                #
1715                del jobs[ id ]
1716        self.jobs=jobs
[524]1717
1718
[355]1719class PbsDataGatherer( DataGatherer ):
[318]1720
[691]1721    """This is the DataGatherer for PBS and Torque"""
[318]1722
[691]1723    global PBSQuery, PBSError
[256]1724
[691]1725    def __init__( self ):
[354]1726
[691]1727        """Setup appropriate variables"""
[23]1728
[785]1729        self.jobs       = { }
1730        self.timeoffset = 0
1731        self.dp         = DataProcessor()
[354]1732
[691]1733        self.initPbsQuery()
[23]1734
[691]1735    def initPbsQuery( self ):
[91]1736
[785]1737        self.pq = None
[354]1738
[788]1739        try:
[354]1740
[788]1741            if( BATCH_SERVER ):
[91]1742
[788]1743                self.pq = PBSQuery( BATCH_SERVER )
1744            else:
1745                self.pq = PBSQuery()
1746
1747        except PBSError, details:
1748            print 'Cannot connect to pbs server'
1749            print details
1750            sys.exit( 1 )
1751
[691]1752        try:
[791]1753            # TODO: actually use new data structure
[691]1754            self.pq.old_data_structure()
[656]1755
[691]1756        except AttributeError:
[656]1757
[691]1758            # pbs_query is older
1759            #
1760            pass
[656]1761
[790]1762    def getNodeData( self ):
1763
1764        nodedict = { }
1765
1766        try:
1767            nodedict = self.pq.getnodes()
1768
1769        except PBSError, detail:
1770
1771            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1772
1773        return nodedict
1774
[691]1775    def getJobData( self ):
[354]1776
[691]1777        """Gather all data on current jobs in Torque"""
[26]1778
[785]1779        joblist            = {}
1780        self.cur_time      = 0
[349]1781
[691]1782        try:
1783            joblist        = self.pq.getjobs()
[785]1784            self.cur_time  = time.time()
[354]1785
[691]1786        except PBSError, detail:
[354]1787
[790]1788            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
[691]1789            return None
[354]1790
[691]1791        jobs_processed    = [ ]
[26]1792
[691]1793        for name, attrs in joblist.items():
[785]1794            display_queue = 1
1795            job_id        = name.split( '.' )[0]
[26]1796
[785]1797            name          = self.getAttr( attrs, 'Job_Name' )
1798            queue         = self.getAttr( attrs, 'queue' )
[317]1799
[691]1800            if QUEUE:
1801                for q in QUEUE:
1802                    if q == queue:
1803                        display_queue = 1
1804                        break
1805                    else:
1806                        display_queue = 0
1807                        continue
1808            if display_queue == 0:
1809                continue
[317]1810
1811
[691]1812            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
[785]1813            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1814            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
[95]1815
[785]1816            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
[95]1817
[785]1818            ppn = ''
[281]1819
[691]1820            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
[95]1821
[785]1822                mynoderequest_fields = mynoderequest.split( ':' )
[281]1823
[691]1824                for mynoderequest_field in mynoderequest_fields:
[281]1825
[691]1826                    if mynoderequest_field.find( 'ppn' ) != -1:
[281]1827
[785]1828                        ppn = mynoderequest_field.split( 'ppn=' )[1]
[281]1829
[785]1830            status = self.getAttr( attrs, 'job_state' )
[25]1831
[691]1832            if status in [ 'Q', 'R' ]:
[450]1833
[691]1834                jobs_processed.append( job_id )
[450]1835
[785]1836            queued_timestamp = self.getAttr( attrs, 'ctime' )
[243]1837
[691]1838            if status == 'R':
[133]1839
[785]1840                start_timestamp = self.getAttr( attrs, 'mtime' )
1841                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
[133]1842
[785]1843                nodeslist       = do_nodelist( nodes )
[354]1844
[691]1845                if DETECT_TIME_DIFFS:
[185]1846
[691]1847                    # If a job start if later than our current date,
1848                    # that must mean the Torque server's time is later
1849                    # than our local time.
1850               
1851                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
[185]1852
[785]1853                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[185]1854
[691]1855            elif status == 'Q':
[95]1856
[691]1857                # 'mynodequest' can be a string in the following syntax according to the
1858                # Torque Administator's manual:
1859                #
1860                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1861                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1862                # etc
1863                #
[451]1864
[691]1865                #
1866                # For now we only count the amount of nodes request and ignore properties
1867                #
[451]1868
[785]1869                start_timestamp = ''
1870                count_mynodes   = 0
[354]1871
[691]1872                for node in mynoderequest.split( '+' ):
[67]1873
[691]1874                    # Just grab the {node_count|hostname} part and ignore properties
1875                    #
[785]1876                    nodepart     = node.split( ':' )[0]
[67]1877
[691]1878                    # Let's assume a node_count value
1879                    #
[785]1880                    numeric_node = 1
[451]1881
[691]1882                    # Chop the value up into characters
1883                    #
1884                    for letter in nodepart:
[67]1885
[691]1886                        # If this char is not a digit (0-9), this must be a hostname
1887                        #
1888                        if letter not in string.digits:
[133]1889
[785]1890                            numeric_node = 0
[133]1891
[691]1892                    # If this is a hostname, just count this as one (1) node
1893                    #
1894                    if not numeric_node:
[354]1895
[785]1896                        count_mynodes = count_mynodes + 1
[691]1897                    else:
[451]1898
[691]1899                        # If this a number, it must be the node_count
1900                        # and increase our count with it's value
1901                        #
1902                        try:
[785]1903                            count_mynodes = count_mynodes + int( nodepart )
[354]1904
[691]1905                        except ValueError, detail:
[354]1906
[691]1907                            # When we arrive here I must be bugged or very confused
1908                            # THIS SHOULD NOT HAPPEN!
1909                            #
1910                            debug_msg( 10, str( detail ) )
1911                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1912                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1913                            debug_msg( 10, 'job = ' + str( name ) )
1914                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1915                       
[785]1916                nodeslist       = str( count_mynodes )
[691]1917            else:
[785]1918                start_timestamp = ''
1919                nodeslist       = ''
[133]1920
[691]1921            myAttrs                = { }
[26]1922
[785]1923            myAttrs[ 'name' ]             = str( name )
1924            myAttrs[ 'queue' ]            = str( queue )
1925            myAttrs[ 'owner' ]            = str( owner )
1926            myAttrs[ 'requested_time' ]   = str( requested_time )
1927            myAttrs[ 'requested_memory' ] = str( requested_memory )
1928            myAttrs[ 'ppn' ]              = str( ppn )
1929            myAttrs[ 'status' ]           = str( status )
1930            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1931            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1932            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1933            myAttrs[ 'nodes' ]            = nodeslist
1934            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
[691]1935            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
[354]1936
[691]1937            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
[61]1938
[785]1939                self.jobs[ job_id ] = myAttrs
[26]1940
[691]1941        for id, attrs in self.jobs.items():
[76]1942
[691]1943            if id not in jobs_processed:
[76]1944
[691]1945                # This one isn't there anymore; toedeledoki!
1946                #
1947                del self.jobs[ id ]
[76]1948
[362]1949GMETRIC_DEFAULT_TYPE    = 'string'
1950GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1951GMETRIC_DEFAULT_PORT    = '8649'
[700]1952GMETRIC_DEFAULT_UNITS   = ''
[362]1953
1954class Gmetric:
1955
[691]1956    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
[362]1957
[700]1958    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1959    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1960    protocol        = ( 'udp', 'multicast' )
[362]1961
[691]1962    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
[700]1963               
[691]1964        global GMETRIC_DEFAULT_TYPE
[362]1965
[691]1966        self.prot       = self.checkHostProtocol( host )
[700]1967        self.data_msg   = xdrlib.Packer()
1968        self.meta_msg   = xdrlib.Packer()
[691]1969        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
[362]1970
[691]1971        if self.prot not in self.protocol:
[362]1972
[691]1973            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
[362]1974
[691]1975        if self.prot == 'multicast':
[362]1976
[691]1977            # Set multicast options
1978            #
1979            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
[362]1980
[691]1981        self.hostport   = ( host, int( port ) )
1982        self.slopestr   = 'both'
1983        self.tmax       = 60
[362]1984
[691]1985    def checkHostProtocol( self, ip ):
[362]1986
[691]1987        """Detect if a ip adress is a multicast address"""
[471]1988
[691]1989        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1990        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
[362]1991
[700]1992        ip_fields               = ip.split( '.' )
[362]1993
[691]1994        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
[362]1995
[691]1996            return 'multicast'
1997        else:
1998            return 'udp'
[362]1999
[691]2000    def send( self, name, value, dmax, typestr = '', units = '' ):
[362]2001
[691]2002        if len( units ) == 0:
[700]2003            units       = GMETRIC_DEFAULT_UNITS
[471]2004
[691]2005        if len( typestr ) == 0:
[700]2006            typestr     = GMETRIC_DEFAULT_TYPE
[362]2007
[700]2008        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
[409]2009
[700]2010        meta_rt = self.socket.sendto( meta_msg, self.hostport )
2011        data_rt = self.socket.sendto( data_msg, self.hostport )
[362]2012
[700]2013        return ( meta_rt, data_rt )
[362]2014
[700]2015    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
2016
2017        hostname = "unset"
2018
[691]2019        if slopestr not in self.slope:
[362]2020
[691]2021            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
[362]2022
[691]2023        if typestr not in self.type:
[362]2024
[691]2025            raise ValueError( "Type must be one of: " + str( self.type ) )
[362]2026
[691]2027        if len( name ) == 0:
[362]2028
[691]2029            raise ValueError( "Name must be non-empty" )
[362]2030
[700]2031        self.meta_msg.reset()
2032        self.meta_msg.pack_int( 128 )
[362]2033
[700]2034        if not spoof:
2035            self.meta_msg.pack_string( hostname )
2036        else:
2037            self.meta_msg.pack_string( spoof )
[362]2038
[700]2039        self.meta_msg.pack_string( name )
2040
2041        if not spoof:
2042            self.meta_msg.pack_int( 0 )
2043        else:
2044            self.meta_msg.pack_int( 1 )
2045           
2046        self.meta_msg.pack_string( typestr )
2047        self.meta_msg.pack_string( name )
2048        self.meta_msg.pack_string( unitstr )
2049        self.meta_msg.pack_int( self.slope[ slopestr ] )
2050        self.meta_msg.pack_uint( int( tmax ) )
2051        self.meta_msg.pack_uint( int( dmax ) )
2052
2053        if not group:
2054            self.meta_msg.pack_int( 0 )
2055        else:
2056            self.meta_msg.pack_int( 1 )
2057            self.meta_msg.pack_string( "GROUP" )
2058            self.meta_msg.pack_string( group )
2059
2060        self.data_msg.reset()
2061        self.data_msg.pack_int( 128+5 )
2062
2063        if not spoof:
2064            self.data_msg.pack_string( hostname )
2065        else:
2066            self.data_msg.pack_string( spoof )
2067
2068        self.data_msg.pack_string( name )
2069
2070        if not spoof:
2071            self.data_msg.pack_int( 0 )
2072        else:
2073            self.data_msg.pack_int( 1 )
2074
2075        self.data_msg.pack_string( "%s" )
2076        self.data_msg.pack_string( str( value ) )
2077
2078        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2079
[26]2080def printTime( ):
[354]2081
[691]2082    """Print current time/date in human readable format for log/debug"""
[26]2083
[691]2084    return time.strftime("%a, %d %b %Y %H:%M:%S")
[26]2085
2086def debug_msg( level, msg ):
[354]2087
[691]2088    """Print msg if at or above current debug level"""
[26]2089
[691]2090    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
[377]2091
[691]2092    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2093        sys.stderr.write( msg + '\n' )
[26]2094
[691]2095    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2096        syslog.syslog( msg )
[373]2097
[307]2098def write_pidfile():
2099
[691]2100    # Write pidfile if PIDFILE is set
2101    #
2102    if PIDFILE:
[307]2103
[691]2104        pid    = os.getpid()
[354]2105
[691]2106        pidfile    = open( PIDFILE, 'w' )
[354]2107
[691]2108        pidfile.write( str( pid ) )
2109        pidfile.close()
[307]2110
[23]2111def main():
[354]2112
[691]2113    """Application start"""
[23]2114
[837]2115    global PBSQuery, PBSError, lsfObject, pyslurm
[854]2116    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
[256]2117
[691]2118    if not processArgs( sys.argv[1:] ):
[354]2119
[691]2120        sys.exit( 1 )
[212]2121
[691]2122    # Load appropriate DataGatherer depending on which BATCH_API is set
2123    # and any required modules for the Gatherer
2124    #
2125    if BATCH_API == 'pbs':
[256]2126
[691]2127        try:
2128            from PBSQuery import PBSQuery, PBSError
[256]2129
[787]2130        except ImportError, details:
[256]2131
[854]2132            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
[787]2133            print details
[691]2134            sys.exit( 1 )
[256]2135
[691]2136        gather = PbsDataGatherer()
[256]2137
[691]2138    elif BATCH_API == 'sge':
[256]2139
[854]2140        if BATCH_SERVER != 'localhost':
2141
2142            # Print and log, but continue execution
2143            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2144            print err_msg
2145            debug_msg( 0, err_msg )
2146
[691]2147        # Tested with SGE 6.0u11.
2148        #
2149        gather = SgeDataGatherer()
[368]2150
[691]2151    elif BATCH_API == 'lsf':
[368]2152
[854]2153        if BATCH_SERVER != 'localhost':
2154
2155            # Print and log, but continue execution
2156            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2157            print err_msg
2158            debug_msg( 0, err_msg )
2159
[691]2160        try:
2161            from lsfObject import lsfObject
2162        except:
[854]2163            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2164            sys.exit( 1 )
[256]2165
[691]2166        gather = LsfDataGatherer()
[524]2167
[837]2168    elif BATCH_API == 'slurm':
2169
[854]2170        if BATCH_SERVER != 'localhost':
2171
2172            # Print and log, but continue execution
2173            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2174            print err_msg
2175            debug_msg( 0, err_msg )
2176
[837]2177        try:
2178            import pyslurm
2179        except:
2180            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
[854]2181            sys.exit( 1 )
[837]2182
2183        gather = SLURMDataGatherer()
2184
[691]2185    else:
[786]2186        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
[354]2187
[691]2188        sys.exit( 1 )
[256]2189
[691]2190    if( DAEMONIZE and USE_SYSLOG ):
[373]2191
[691]2192        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[373]2193
[691]2194    if DAEMONIZE:
[354]2195
[691]2196        gather.daemon()
2197    else:
2198        gather.run()
[23]2199
[256]2200# wh00t? someone started me! :)
[65]2201#
[23]2202if __name__ == '__main__':
[691]2203    main()
Note: See TracBrowser for help on using the repository browser.