source: branches/1.1/jobmond/jobmond.py @ 951

Last change on this file since 951 was 947, checked in by ramonb, 10 years ago
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 65.7 KB
RevLine 
[23]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[691]5# Copyright (C) 2006-2013  Ramon Bastiaans
[623]6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
[225]7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
[228]22# SVN $Id: jobmond.py 947 2014-01-20 13:23:53Z ramonb $
[227]23#
[23]24
[694]25# vi :set ts=4
26
[471]27import sys, getopt, ConfigParser, time, os, socket, string, re
[837]28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
[318]29from xml.sax.handler import feature_namespaces
[623]30from collections import deque
[699]31from glob import glob
[318]32
[913]33VERSION='__VERSION__'
[307]34
[471]35def usage( ver ):
36
[691]37    print 'jobmond %s' %VERSION
[471]38
[691]39    if ver:
40        return 0
[471]41
[691]42    print
43    print 'Purpose:'
44    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
45    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
46    print
47    print 'Usage:    jobmond [OPTIONS]'
48    print
49    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
50    print '  -p, --pidfile=FILE    Use pid file to store the process id'
51    print '  -h, --help        Print help and exit'
52    print '  -v, --version      Print version and exit'
53    print
[307]54
[212]55def processArgs( args ):
[26]56
[701]57    SHORT_L      = 'p:hvc:'
58    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
[165]59
[704]60    global PIDFILE, JOBMOND_CONF
[701]61    PIDFILE      = None
[61]62
[701]63    JOBMOND_CONF = '/etc/jobmond.conf'
[354]64
[691]65    try:
[68]66
[691]67        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
[185]68
[691]69    except getopt.GetoptError, detail:
[212]70
[691]71        print detail
[851]72        usage( False )
[691]73        sys.exit( 1 )
[212]74
[691]75    for opt, value in opts:
[212]76
[691]77        if opt in [ '--config', '-c' ]:
78       
[701]79            JOBMOND_CONF = value
[212]80
[691]81        if opt in [ '--pidfile', '-p' ]:
[212]82
[701]83            PIDFILE      = value
[691]84       
85        if opt in [ '--help', '-h' ]:
[307]86 
[691]87            usage( False )
88            sys.exit( 0 )
[212]89
[691]90        if opt in [ '--version', '-v' ]:
[471]91
[691]92            usage( True )
93            sys.exit( 0 )
[471]94
[701]95    return loadConfig( JOBMOND_CONF )
[212]96
[944]97html_escape_table = {
98    "&": "_",
99    '"': "_",
100    "'": "_",
101    ">": "_",
102    "<": "_",
103}
104
105# If this is too slow:
106# - re.sub(r'[\&|\>\<\'\"]','_', variable )
107# - re.sub(r'[\&\>\<\'\"]','_', variable )
108# - and even faster: compile regex
109
110def html_escape(text):
111    """Produce entities within text."""
112    return string.join((html_escape_table.get(c,c) for c in text), '')
113
[520]114class GangliaConfigParser:
115
[699]116    def __init__( self, filename ):
[520]117
[699]118        self.conf_lijst   = [ ]
119        self.conf_dict    = { }
120        self.filename     = filename
121        self.file_pointer = file( filename, 'r' )
122        self.lexx         = shlex.shlex( self.file_pointer )
123        self.lexx.whitespace_split = True
[520]124
[699]125        self.parse()
[520]126
[699]127    def __del__( self ):
[520]128
[699]129        """
130        Cleanup: close file descriptor
131        """
132
133        self.file_pointer.close()
134        del self.lexx
135        del self.conf_lijst
136
[691]137    def removeQuotes( self, value ):
[520]138
[699]139        clean_value = value
140        clean_value = clean_value.replace( "'", "" )
141        clean_value = clean_value.replace( '"', '' )
142        clean_value = clean_value.strip()
[520]143
[691]144        return clean_value
[520]145
[699]146    def removeBraces( self, value ):
[520]147
[699]148        clean_value = value
149        clean_value = clean_value.replace( "(", "" )
150        clean_value = clean_value.replace( ')', '' )
151        clean_value = clean_value.strip()
[520]152
[699]153        return clean_value
[520]154
[699]155    def parse( self ):
[520]156
[699]157        """
158        Parse self.filename using shlex scanning.
159        - Removes /* comments */
160        - Traverses (recursively) through all include () statements
161        - Stores complete valid config tokens in self.conf_list
[520]162
[699]163        i.e.:
164            ['globals',
165             '{',
166             'daemonize',
167             '=',
168             'yes',
169             'setuid',
170             '=',
171             'yes',
172             'user',
173             '=',
174             'ganglia',
175             'debug_level',
176             '=',
177             '0',
178             <etc> ]
179        """
[520]180
[699]181        t = 'bogus'
182        c = False
183        i = False
[520]184
[699]185        while t != self.lexx.eof:
186            #print 'get token'
187            t = self.lexx.get_token()
[520]188
[699]189            if len( t ) >= 2:
[520]190
[699]191                if len( t ) >= 4:
[520]192
[699]193                    if t[:2] == '/*' and t[-2:] == '*/':
[520]194
[699]195                        #print 'comment line'
196                        #print 'skipping: %s' %t
197                        continue
[520]198
[699]199                if t == '/*' or t[:2] == '/*':
200                    c = True
201                    #print 'comment start'
202                    #print 'skipping: %s' %t
203                    continue
[520]204
[699]205                if t == '*/' or t[-2:] == '*/':
206                    c = False
207                    #print 'skipping: %s' %t
208                    #print 'comment end'
209                    continue
210
211            if c:
212                #print 'skipping: %s' %t
213                continue
214
215            if t == 'include':
216                i = True
217                #print 'include start'
218                #print 'skipping: %s' %t
219                continue
220
221            if i:
222
223                #print 'include start: %s' %t
224
225                t2 = self.removeQuotes( t )
226                t2 = self.removeBraces( t )
227
228                for in_file in glob( self.removeQuotes(t2) ):
229
230                    #print 'including file: %s' %in_file
231                    parse_infile = GangliaConfigParser( in_file )
232
233                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
234
235                    del parse_infile
236
237                i = False
238                #print 'include end'
239                #print 'skipping: %s' %t
240                continue
241
242            #print 'keep: %s' %t
243            self.conf_lijst.append( self.removeQuotes(t) )
244
245    def getConfLijst( self ):
246
247        return self.conf_lijst
248
249    def confListToDict( self, parent_list=None ):
250
251        """
252        Recursively traverses a conf_list and creates dictionary from it
253        """
254
255        new_dict = { }
256        count    = 0
257        skip     = 0
258
259        if not parent_list:
260            parent_list = self.conf_lijst
261
262        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
263
264        for n, c in enumerate( parent_list ):
265
266            count = count + 1
267
268            #print 'CL: n %d c %s' %(n, c)
269
270            if skip > 0:
271
272                #print '- skipped'
273                skip = skip - 1
274                continue
275
276            if (n+1) <= (len( parent_list )-1):
277
278                if parent_list[(n+1)] == '{':
279
280                    if not new_dict.has_key( c ):
281                        new_dict[ c ] = [ ]
282
283                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
284                    new_dict[ c ].append( temp_new_dict )
285
286                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
287
288                    if not new_dict.has_key( c ):
289                        new_dict[ c ] = [ ]
290
291                    new_dict[ c ].append( parent_list[ (n+2) ] )
292
293                    skip = 2
294
295                if parent_list[n] == '}':
296
297                    #print 'leaving confListToDict(): new dict = %s' %new_dict
298                    return (new_dict, count)
299
300    def makeConfDict( self ):
301
302        """
303        Walks through self.conf_list and creates a dictionary based upon config values
304
305        i.e.:
306            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
307                                                         'ip': ['"127.0.0.1"'],
308                                                         'mask': ['32']}]}],
309                                    'port': ['8649']}],
310            'udp_recv_channel': [{'port': ['8649']}],
311            'udp_send_channel': [{'host': ['145.101.32.3'],
312                                  'port': ['8649']},
313                                 {'host': ['145.101.32.207'],
314                                  'port': ['8649']}]}
315        """
316
317        new_dict = { }
318        skip     = 0
319
320        #print 'entering makeConfDict()'
321
322        for n, c in enumerate( self.conf_lijst ):
323
324            #print 'M: n %d c %s' %(n, c)
325
326            if skip > 0:
327
328                #print '- skipped'
329                skip = skip - 1
330                continue
331
332            if (n+1) <= (len( self.conf_lijst )-1):
333
334                if self.conf_lijst[(n+1)] == '{':
335
336                    if not new_dict.has_key( c ):
337                        new_dict[ c ] = [ ]
338
339                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
340                    new_dict[ c ].append( temp_new_dict )
341
342                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
343
344                    if not new_dict.has_key( c ):
345                        new_dict[ c ] = [ ]
346
347                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
348
349                    skip = 2
350
351        self.conf_dict = new_dict
352        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
353
354    def checkConfDict( self ):
355
356        if len( self.conf_lijst ) == 0:
357
358            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
359
360        if len( self.conf_dict ) == 0:
361
362            self.makeConfDict()
363
364    def getConfDict( self ):
365
366        self.checkConfDict()
367        return self.conf_dict
368
369    def getUdpSendChannels( self ):
370
371        self.checkConfDict()
372
[707]373        udp_send_channels = [ ] # IP:PORT
374
375        if not self.conf_dict.has_key( 'udp_send_channel' ):
376            return None
377
378        for u in self.conf_dict[ 'udp_send_channel' ]:
379
380            if u.has_key( 'mcast_join' ):
381
382                ip = u['mcast_join'][0]
383
384            elif u.has_key( 'host' ):
385
386                ip = u['host'][0]
387
388            port = u['port'][0]
389
390            udp_send_channels.append( ( ip, port ) )
391
392        if len( udp_send_channels ) == 0:
393            return None
394
395        return udp_send_channels
396
[699]397    def getSectionLastOption( self, section, option ):
398
399        """
400        Get last option set in a config section that could be set multiple times in multiple (include) files.
401
402        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
403        """
404
405        self.checkConfDict()
406        value = None
407
408        if not self.conf_dict.has_key( section ):
409
410            return None
411
412        # Could be set multiple times in multiple (include) files: get last one set
413        for c in self.conf_dict[ section ]:
414
415                if c.has_key( option ):
416
[705]417                    value = c[ option ][0]
[699]418
[705]419        return value
[699]420
421    def getClusterName( self ):
422
423        return self.getSectionLastOption( 'cluster', 'name' )
424
425    def getVal( self, section, option ):
426
427        return self.getSectionLastOption( section, option )
428
[691]429    def getInt( self, section, valname ):
[520]430
[691]431        value    = self.getVal( section, valname )
[520]432
[691]433        if not value:
[699]434            return None
[520]435
[691]436        return int( value )
[520]437
[691]438    def getStr( self, section, valname ):
[520]439
[691]440        value    = self.getVal( section, valname )
[520]441
[691]442        if not value:
[699]443            return None
[520]444
[691]445        return str( value )
[520]446
447def findGmetric():
448
[691]449    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
[520]450
[691]451        guess    = '%s/%s' %( dir, 'gmetric' )
[520]452
[691]453        if os.path.exists( guess ):
[520]454
[691]455            return guess
[520]456
[691]457    return False
[520]458
[212]459def loadConfig( filename ):
460
[691]461    def getlist( cfg_string ):
[215]462
[691]463        my_list = [ ]
[215]464
[691]465        for item_txt in cfg_string.split( ',' ):
[215]466
[691]467            sep_char = None
[215]468
[691]469            item_txt = item_txt.strip()
[215]470
[691]471            for s_char in [ "'", '"' ]:
[215]472
[691]473                if item_txt.find( s_char ) != -1:
[215]474
[691]475                    if item_txt.count( s_char ) != 2:
[215]476
[691]477                        print 'Missing quote: %s' %item_txt
478                        sys.exit( 1 )
[215]479
[691]480                    else:
[215]481
[691]482                        sep_char = s_char
483                        break
[215]484
[691]485            if sep_char:
[215]486
[691]487                item_txt = item_txt.split( sep_char )[1]
[215]488
[691]489            my_list.append( item_txt )
[215]490
[691]491        return my_list
[215]492
[784]493    if not os.path.isfile( JOBMOND_CONF ):
494
495        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
496        sys.exit( 1 )
497
498    try:
499        f = open( JOBMOND_CONF, 'r' )
500    except IOError, detail:
501        print "Cannot read config file: '%s'" %JOBMOND_CONF
502        sys.exit( 1 )
503    else:
504        f.close()
505
[691]506    cfg        = ConfigParser.ConfigParser()
[212]507
[691]508    cfg.read( filename )
[212]509
[691]510    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
511    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
512    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
513    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
[707]514    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
[212]515
[701]516    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[212]517
[701]518    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[212]519
[691]520    SYSLOG_LEVEL    = -1
[701]521    SYSLOG_FACILITY = None
[377]522
[691]523    try:
[701]524        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[212]525
[691]526    except ConfigParser.NoOptionError:
[373]527
[701]528        USE_SYSLOG  = True
[373]529
[691]530        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
[373]531
[691]532    if USE_SYSLOG:
[373]533
[691]534        try:
[701]535            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[373]536
[691]537        except ConfigParser.NoOptionError:
[373]538
[691]539            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
[701]540            SYSLOG_LEVEL = 0
[373]541
[691]542        try:
[373]543
[691]544            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[373]545
[691]546        except ConfigParser.NoOptionError:
[373]547
[691]548            SYSLOG_FACILITY = syslog.LOG_DAEMON
[373]549
[691]550            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
[373]551
[691]552    try:
[373]553
[701]554        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
[212]555
[691]556    except ConfigParser.NoOptionError:
[265]557
[864]558        # Not required for all API's: only pbs api allows remote connections
559        BATCH_SERVER = None
[265]560
[691]561    try:
562   
[701]563        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
[265]564
[691]565    except ConfigParser.NoOptionError:
[265]566
[691]567        # Backwards compatibility for old configs
568        #
[265]569
[701]570        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
571        api_guess           = 'pbs'
[691]572   
573    try:
[212]574
[701]575        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
[353]576
[691]577    except ConfigParser.NoOptionError:
[353]578
[703]579        # Not specified: assume /etc/ganglia/gmond.conf
[691]580        #
[703]581        GMOND_CONF          = '/etc/ganglia/gmond.conf'
[353]582
[707]583    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
584    GMETRIC_TARGET          = None
[449]585
[707]586    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
[449]587
[707]588    if not GMOND_UDP_SEND_CHANNELS:
[449]589
[701]590        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
[520]591
[691]592        # Couldn't figure it out: let's see if it's in our jobmond.conf
593        #
594        try:
[520]595
[691]596            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
[520]597
[691]598        # Guess not: now just give up
[701]599       
[691]600        except ConfigParser.NoOptionError:
[520]601
[691]602            GMETRIC_TARGET    = None
[520]603
[691]604            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
[520]605
[701]606            gmetric_bin    = findGmetric()
[520]607
[701]608            if gmetric_bin:
[520]609
[701]610                GMETRIC_BINARY     = gmetric_bin
611            else:
612                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
[520]613
[701]614                try:
[520]615
[701]616                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
[520]617
[701]618                except ConfigParser.NoOptionError:
[520]619
[786]620                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
[701]621                    sys.exit( 1 )
[520]622
[701]623    #TODO: is this really still needed or should be automatic
[691]624    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
[212]625
[927]626    try:
627        BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
[215]628
[927]629    except ConfigParser.NoOptionError:
630
631        BATCH_HOST_TRANSLATE = [ ]
632        pass
633
[691]634    try:
[256]635
[691]636        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
[266]637
[691]638    except ConfigParser.NoOptionError, detail:
[266]639
[864]640        print "FATAL ERROR: BATCH_API not set"
641        sys.exit( 1 )
[354]642
[691]643    try:
[317]644
[691]645        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
[317]646
[691]647    except ConfigParser.NoOptionError, detail:
[317]648
[691]649        QUEUE        = None
[353]650
[701]651    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
652
[691]653    return True
[212]654
[507]655def fqdn_parts (fqdn):
[520]656
[691]657    """Return pair of host and domain for fully-qualified domain name arg."""
[520]658
[691]659    parts = fqdn.split (".")
[520]660
[691]661    return (parts[0], string.join(parts[1:], "."))
[507]662
[61]663class DataProcessor:
[355]664
[691]665    """Class for processing of data"""
[61]666
[691]667    binary = None
[61]668
[691]669    def __init__( self, binary=None ):
[355]670
[691]671        """Remember alternate binary location if supplied"""
[61]672
[691]673        global GMETRIC_BINARY, GMOND_CONF
[449]674
[691]675        if binary:
676            self.binary = binary
[61]677
[707]678        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[691]679            self.binary = GMETRIC_BINARY
[449]680
[691]681        # Timeout for XML
682        #
683        # From ganglia's documentation:
684        #
685        # 'A metric will be deleted DMAX seconds after it is received, and
686        # DMAX=0 means eternal life.'
[61]687
[691]688        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
[80]689
[707]690        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[354]691
[691]692            incompatible = self.checkGmetricVersion()
[61]693
[691]694            if incompatible:
[355]695
[903]696                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
[691]697                sys.exit( 1 )
[65]698
[946]699        self.gmetric_send_instances = { }
700
701
702    def __del__( self ):
703
704        for send_instance in self.gmetric_send_instances:
705
706            del send_instance
707
[691]708    def checkGmetricVersion( self ):
[355]709
[691]710        """
[903]711        Check version of gmetric is at least 3.3.8
[691]712        for the syntax we use
713        """
[65]714
[691]715        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
[255]716
[691]717        incompatible    = 0
[341]718
[691]719        gfp        = os.popen( self.binary + ' --version' )
[692]720        lines      = gfp.readlines()
[65]721
[691]722        gfp.close()
[355]723
[691]724        for line in lines:
[355]725
[691]726            line = line.split( ' ' )
[65]727
[691]728            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
729           
730                gmetric_version    = line[1].split( '\n' )[0]
[65]731
[691]732                version_major    = int( gmetric_version.split( '.' )[0] )
733                version_minor    = int( gmetric_version.split( '.' )[1] )
734                version_patch    = int( gmetric_version.split( '.' )[2] )
[65]735
[691]736                incompatible    = 0
[65]737
[691]738                if version_major < 3:
[65]739
[691]740                    incompatible = 1
741               
742                elif version_major == 3:
[65]743
[903]744                    if version_minor < 3:
[65]745
[692]746                        incompatible = 1
[65]747
[903]748                    elif version_patch < 8:
749
750                        incompatible = 1
751
[691]752        return incompatible
[65]753
[691]754    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
[355]755
[691]756        """Call gmetric binary and multicast"""
[65]757
[691]758        cmd = self.binary
[65]759
[707]760        if GMOND_UDP_SEND_CHANNELS:
[61]761
[707]762            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
763
764                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
765
766                debug_msg( 10, printTime() + ' ' + metric_debug)
767
[946]768                if not self.gmetric_send_instances.has_key( (c_ip, c_port) ):
[707]769
[946]770                    self.gmetric_send_instances[ (c_ip, c_port) ] = Gmetric( c_ip, c_port )
[707]771
[946]772                self.gmetric_send_instances[ (c_ip, c_port) ].send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
773
[707]774        elif GMETRIC_TARGET:
775
[691]776            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
777            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
[353]778
[691]779            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
[353]780
[691]781            debug_msg( 10, printTime() + ' ' + metric_debug)
[353]782
[946]783            if not self.gmetric_send_instances.has_key( (GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT) ):
[353]784
[946]785                self.gmetric_send_instances[ (GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT) ] = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
[353]786
[946]787            self.gmetric_send_instances[ (GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT) ].send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
788
[691]789        else:
790            try:
791                cmd = cmd + ' -c' + GMOND_CONF
[353]792
[691]793            except NameError:
[353]794
[705]795                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
[353]796
[691]797            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[353]798
[691]799            if len( units ) > 0:
[409]800
[691]801                cmd = cmd + ' -u"' + units + '"'
[409]802
[691]803            debug_msg( 10, printTime() + ' ' + cmd )
[353]804
[691]805            os.system( cmd )
[353]806
[318]807class DataGatherer:
[23]808
[691]809    """Skeleton class for batch system DataGatherer"""
[256]810
[691]811    def printJobs( self, jobs ):
[355]812
[691]813        """Print a jobinfo overview"""
[318]814
[691]815        for name, attrs in self.jobs.items():
[318]816
[691]817            print 'job %s' %(name)
[318]818
[691]819            for name, val in attrs.items():
[318]820
[691]821                print '\t%s = %s' %( name, val )
[318]822
[691]823    def printJob( self, jobs, job_id ):
[355]824
[691]825        """Print job with job_id from jobs"""
[318]826
[691]827        print 'job %s' %(job_id)
[318]828
[691]829        for name, val in jobs[ job_id ].items():
[318]830
[691]831            print '\t%s = %s' %( name, val )
[318]832
[691]833    def getAttr( self, attrs, name ):
[507]834
[691]835        """Return certain attribute from dictionary, if exists"""
[507]836
[691]837        if attrs.has_key( name ):
[507]838
[691]839            return attrs[ name ]
840        else:
841            return ''
[507]842
[691]843    def jobDataChanged( self, jobs, job_id, attrs ):
[507]844
[691]845        """Check if job with attrs and job_id in jobs has changed"""
[507]846
[691]847        if jobs.has_key( job_id ):
[507]848
[691]849            oldData = jobs[ job_id ]   
850        else:
851            return 1
[507]852
[691]853        for name, val in attrs.items():
[507]854
[691]855            if oldData.has_key( name ):
[507]856
[691]857                if oldData[ name ] != attrs[ name ]:
[507]858
[691]859                    return 1
[507]860
[691]861            else:
862                return 1
[507]863
[691]864        return 0
[507]865
[691]866    def submitJobData( self ):
[507]867
[691]868        """Submit job info list"""
[507]869
[691]870        global BATCH_API
[512]871
[728]872        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[507]873
[724]874        running_jobs = 0
875        queued_jobs  = 0
[507]876
[691]877        # Count how many running/queued jobs we found
878        #
879        for jobid, jobattrs in self.jobs.items():
[507]880
[691]881            if jobattrs[ 'status' ] == 'Q':
[507]882
[691]883                queued_jobs += 1
[507]884
[691]885            elif jobattrs[ 'status' ] == 'R':
[507]886
[691]887                running_jobs += 1
[507]888
[691]889        # Report running/queued jobs as seperate metric for a nice RRD graph
890        #
[728]891        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
892        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
[507]893
[691]894        # Report down/offline nodes in batch (PBS only ATM)
895        #
[866]896        if BATCH_API in [ 'pbs', 'slurm' ]:
[512]897
[691]898            domain        = fqdn_parts( socket.getfqdn() )[1]
[514]899
[728]900            downed_nodes  = list()
901            offline_nodes = list()
[691]902       
903            l        = ['state']
[512]904
[790]905            nodelist = self.getNodeData()
906
907            for name, node in nodelist.items():
908
[691]909                if ( node[ 'state' ].find( "down" ) != -1 ):
[512]910
[691]911                    downed_nodes.append( name )
[512]912
[691]913                if ( node[ 'state' ].find( "offline" ) != -1 ):
[512]914
[691]915                    offline_nodes.append( name )
[512]916
[728]917            downnodeslist    = do_nodelist( downed_nodes )
918            offlinenodeslist = do_nodelist( offline_nodes )
[512]919
[691]920            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
921            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[728]922            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
923            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
[514]924
[691]925        # Now let's spread the knowledge
926        #
927        for jobid, jobattrs in self.jobs.items():
[507]928
[691]929            # Make gmetric values for each job: respect max gmetric value length
930            #
931            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
932            metric_increment    = 0
[507]933
[691]934            # If we have more job info than max gmetric value length allows, split it up
935            # amongst multiple metrics
936            #
937            for val in gmetric_val:
[507]938
[728]939                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
940                self.dp.multicastGmetric( metric_name, val )
[507]941
[691]942                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
943                #
944                metric_increment    = metric_increment + 1
[507]945
[691]946    def compileGmetricVal( self, jobid, jobattrs ):
[507]947
[691]948        """Create a val string for gmetric of jobinfo"""
[507]949
[691]950        gval_lists    = [ ]
951        val_list    = { }
[507]952
[691]953        for val_name, val_value in jobattrs.items():
[507]954
[691]955            # These are our own metric names, i.e.: status, start_timestamp, etc
956            #
957            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
[507]958
[691]959            # These are their corresponding values
960            #
961            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
[507]962
[691]963            if val_name == 'nodes' and jobattrs['status'] == 'R':
[507]964
[691]965                node_str = None
[507]966
[691]967                for node in val_value:
[507]968
[691]969                    if node_str:
[507]970
[691]971                        node_str = node_str + ';' + node
972                    else:
973                        node_str = node
[507]974
[691]975                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
976                    #
977                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
[507]978
[691]979                        # It's too big, we need to make a new gmetric for the additional info
980                        #
981                        val_list[ val_name ]    = node_str
[507]982
[691]983                        gval_lists.append( val_list )
[507]984
[691]985                        val_list        = { }
986                        node_str        = None
[507]987
[691]988                val_list[ val_name ]    = node_str
[507]989
[691]990                gval_lists.append( val_list )
[507]991
[691]992                val_list        = { }
[507]993
[691]994            elif val_value != '':
[507]995
[691]996                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
997                #
998                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
[507]999
[691]1000                    # It's too big, we need to make a new gmetric for the additional info
1001                    #
1002                    gval_lists.append( val_list )
[507]1003
[691]1004                    val_list        = { }
[507]1005
[691]1006                val_list[ val_name ]    = val_value
[507]1007
[691]1008        if len( val_list ) > 0:
[507]1009
[691]1010            gval_lists.append( val_list )
[507]1011
[691]1012        str_list    = [ ]
[507]1013
[691]1014        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
1015        #
1016        for val_list in gval_lists:
[507]1017
[691]1018            my_val_str    = None
[507]1019
[691]1020            for val_name, val_value in val_list.items():
[507]1021
[691]1022                if type(val_value) == list:
[579]1023
[691]1024                    val_value    = val_value.join( ',' )
[579]1025
[691]1026                if my_val_str:
[507]1027
[691]1028                    try:
1029                        # fixme: It's getting
1030                        # ('nodes', None) items
[944]1031                        my_val_str = my_val_str + ' ' + val_name + '=' + html_escape(val_value)
[691]1032                    except:
1033                        pass
[623]1034
[691]1035                else:
[944]1036                    my_val_str = val_name + '=' + html_escape(val_value)
[507]1037
[691]1038            str_list.append( my_val_str )
[507]1039
[691]1040        return str_list
[507]1041
[691]1042    def daemon( self ):
[355]1043
[691]1044        """Run as daemon forever"""
[256]1045
[691]1046        # Fork the first child
1047        #
1048        pid = os.fork()
1049        if pid > 0:
1050            sys.exit(0)  # end parent
[256]1051
[691]1052        # creates a session and sets the process group ID
1053        #
1054        os.setsid()
[318]1055
[691]1056        # Fork the second child
1057        #
1058        pid = os.fork()
1059        if pid > 0:
1060            sys.exit(0)  # end parent
[318]1061
[691]1062        write_pidfile()
[318]1063
[691]1064        # Go to the root directory and set the umask
1065        #
1066        os.chdir('/')
1067        os.umask(0)
[318]1068
[691]1069        sys.stdin.close()
1070        sys.stdout.close()
1071        sys.stderr.close()
[318]1072
[691]1073        os.open('/dev/null', os.O_RDWR)
1074        os.dup2(0, 1)
1075        os.dup2(0, 2)
[318]1076
[691]1077        self.run()
[318]1078
[691]1079    def run( self ):
[355]1080
[691]1081        """Main thread"""
[256]1082
[691]1083        while ( 1 ):
1084       
1085            self.getJobData()
1086            self.submitJobData()
1087            time.sleep( BATCH_POLL_INTERVAL )   
[256]1088
[623]1089# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1090# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1091# with 6.2.  See also the fixmes.
[256]1092
[507]1093class NoJobs (Exception):
[691]1094    """Exception raised by empty job list in qstat output."""
1095    pass
[256]1096
[507]1097class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
[691]1098    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
[318]1099
[691]1100    def __init__(self):
1101        self.value = ""
1102        self.joblist = []
1103        self.job = {}
1104        self.queue = ""
1105        self.in_joblist = False
1106        self.lrequest = False
1107        self.eltq = deque()
1108        xml.sax.handler.ContentHandler.__init__(self)
[318]1109
[691]1110    # The structure of the output is as follows (for SGE 6.0).  It's
1111    # similar for 6.1, but radically different for 6.2, and is
1112    # undocumented generally.  Unfortunately it's voluminous, and probably
1113    # doesn't scale to large clusters/queues.
[318]1114
[691]1115    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1116    #   <djob_info>
1117    #     <qmaster_response>  <!-- job -->
1118    #       ...
1119    #       <JB_ja_template> 
1120    #     <ulong_sublist>
1121    #     ...         <!-- start_time, state ... -->
1122    #     </ulong_sublist>
1123    #       </JB_ja_template> 
1124    #       <JB_ja_tasks>
1125    #     <ulong_sublist>
1126    #       ...       <!-- task info
1127    #     </ulong_sublist>
1128    #     ...
1129    #       </JB_ja_tasks>
1130    #       ...
1131    #     </qmaster_response>
1132    #   </djob_info>
1133    #   <messages>
1134    #   ...
[318]1135
[691]1136    # NB.  We might treat each task as a separate job, like
1137    # straight qstat output, but the web interface expects jobs to
1138    # be identified by integers, not, say, <job number>.<task>.
[318]1139
[691]1140    # So, I lied.  If the job list is empty, we get invalid XML
1141    # like this, which we need to defend against:
[318]1142
[691]1143    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1144    #   <>
1145    #     <ST_name>*</ST_name>
1146    #   </>
1147    # </unknown_jobs>
[318]1148
[691]1149    def startElement(self, name, attrs):
1150        self.value = ""
1151        if name == "djob_info":    # job list
1152            self.in_joblist = True
1153        # The job container is "qmaster_response" in SGE 6.0
1154        # and 6.1, but "element" in 6.2.  This is only the very
1155        # start of what's necessary for 6.2, though (sigh).
1156        elif (name == "qmaster_response" or name == "element") \
1157                and self.eltq[-1] == "djob_info": # job
1158            self.job = {"job_state": "U", "slots": 0,
1159                    "nodes": [], "queued_timestamp": "",
1160                    "queued_timestamp": "", "queue": "",
1161                    "ppn": "0", "RN_max": 0,
1162                    # fixme in endElement
1163                    "requested_memory": 0, "requested_time": 0
1164                    }
1165            self.joblist.append(self.job)
1166        elif name == "qstat_l_requests": # resource request
1167            self.lrequest = True
1168        elif name == "unknown_jobs":
1169            raise NoJobs
1170        self.eltq.append (name)
[318]1171
[691]1172    def characters(self, ch):
1173        self.value += ch
[318]1174
[691]1175    def endElement(self, name): 
1176        """Snarf job elements contents into job dictionary.
1177           Translate keys if appropriate."""
[318]1178
[691]1179        name_trans = {
1180          "JB_job_number": "number",
1181          "JB_job_name": "name", "JB_owner": "owner",
1182          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1183          "JB_submission_time": "queued_timestamp"
1184          }
1185        value = self.value
1186        self.eltq.pop ()
[318]1187
[691]1188        if name == "djob_info":
1189            self.in_joblist = False
1190            self.job = {}
1191        elif name == "JAT_master_queue":
1192            self.job["queue"] = value.split("@")[0]
1193        elif name == "JG_qhostname":
1194            if not (value in self.job["nodes"]):
1195                self.job["nodes"].append(value)
1196        elif name == "JG_slots": # slots in use
1197            self.job["slots"] += int(value)
1198        elif name == "RN_max": # requested slots (tasks or parallel)
1199            self.job["RN_max"] = max (self.job["RN_max"],
1200                          int(value))
1201        elif name == "JAT_state": # job state (bitwise or)
1202            value = int (value)
1203            # Status values from sge_jobL.h
1204            #define JIDLE           0x00000000
1205            #define JHELD           0x00000010
1206            #define JMIGRATING          0x00000020
1207            #define JQUEUED         0x00000040
1208            #define JRUNNING        0x00000080
1209            #define JSUSPENDED          0x00000100
1210            #define JTRANSFERING        0x00000200
1211            #define JDELETED        0x00000400
1212            #define JWAITING        0x00000800
1213            #define JEXITING        0x00001000
1214            #define JWRITTEN        0x00002000
1215            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1216            #define JFINISHED           0x00010000
1217            if value & 0x80:
1218                self.job["status"] = "R"
1219            elif value & 0x40:
1220                self.job["status"] = "Q"
1221            else:
1222                self.job["status"] = "O" # `other'
1223        elif name == "CE_name" and self.lrequest and self.value in \
1224                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1225            # We're in a container for an interesting resource
1226            # request; record which type.
1227            self.lrequest = self.value
1228        elif name == "CE_doubleval" and self.lrequest:
1229            # if we're in a container for an interesting
1230            # resource request, use the maxmimum of the hard
1231            # and soft requests to record the requested CPU
1232            # or core.  Fixme:  I'm not sure if this logic is
1233            # right.
1234            if self.lrequest in ("h_core", "s_core"):
1235                self.job["requested_memory"] = \
1236                    max (float (value),
1237                     self.job["requested_memory"])
1238            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1239            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1240                self.job["requested_time"] = \
1241                    max (float (value),
1242                     self.job["requested_time"])
1243        elif name == "qstat_l_requests":
1244            self.lrequest = False
1245        elif self.job and self.in_joblist:
1246            if name in name_trans:
1247                name = name_trans[name]
1248                self.job[name] = value
[318]1249
[507]1250# Abstracted from PBS original.
1251# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
[520]1252#
1253def do_nodelist( nodes ):
1254
[691]1255    """Translate node list as appropriate."""
[520]1256
[691]1257    nodeslist        = [ ]
1258    my_domain        = fqdn_parts( socket.getfqdn() )[1]
[520]1259
[691]1260    for node in nodes:
[520]1261
[691]1262        host        = node.split( '/' )[0] # not relevant for SGE
1263        h, host_domain    = fqdn_parts(host)
[520]1264
[691]1265        if host_domain == my_domain:
[520]1266
[691]1267            host    = h
[520]1268
[691]1269        if nodeslist.count( host ) == 0:
[520]1270
[691]1271            for translate_pattern in BATCH_HOST_TRANSLATE:
[520]1272
[691]1273                if translate_pattern.find( '/' ) != -1:
[520]1274
[691]1275                    translate_orig    = \
1276                        translate_pattern.split( '/' )[1]
1277                    translate_new    = \
1278                        translate_pattern.split( '/' )[2]
1279                    host = re.sub( translate_orig,
1280                               translate_new, host )
1281            if not host in nodeslist:
1282                nodeslist.append( host )
1283    return nodeslist
[318]1284
[837]1285class SLURMDataGatherer( DataGatherer ):
1286
1287    global pyslurm
1288
1289    """This is the DataGatherer for SLURM"""
1290
1291    def __init__( self ):
1292
1293        """Setup appropriate variables"""
1294
1295        self.jobs       = { }
1296        self.timeoffset = 0
1297        self.dp         = DataProcessor()
1298
1299    def getNodeData( self ):
1300
[866]1301        slurm_type  = pyslurm.node()
[837]1302
[866]1303        slurm_nodes = slurm_type.get()
[837]1304
[866]1305        nodedict    = { }
1306
1307        for node, attrs in slurm_nodes.items():
1308
1309            ( num_state, name_state ) = attrs['node_state'] 
1310
1311            if name_state == 'DOWN':
1312
1313                nodedict[ node ] = { 'state' : 'down' }
1314
1315            elif name_state == 'DRAIN':
1316
1317                nodedict[ node ] = { 'state' : 'offline' }
1318
[837]1319        return nodedict
1320
1321    def getJobData( self ):
1322
1323        """Gather all data on current jobs"""
1324
1325        joblist            = {}
1326
1327        self.cur_time  = time.time()
1328
1329        slurm_type = pyslurm.job()
1330        joblist    = slurm_type.get()
1331
1332        jobs_processed    = [ ]
1333
1334        for name, attrs in joblist.items():
1335            display_queue = 1
1336            job_id        = name
1337
1338            name          = self.getAttr( attrs, 'name' )
1339            queue         = self.getAttr( attrs, 'partition' )
1340
1341            if QUEUE:
1342                for q in QUEUE:
1343                    if q == queue:
1344                        display_queue = 1
1345                        break
1346                    else:
1347                        display_queue = 0
1348                        continue
1349            if display_queue == 0:
1350                continue
1351
1352            owner_uid        = attrs[ 'user_id' ]
1353            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1354
1355            requested_time   = self.getAttr( attrs, 'time_limit' )
[852]1356            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
[837]1357
[852]1358            if min_memory == 0:
1359
1360                requested_memory = ''
1361
1362            else:
1363                requested_memory = min_memory
1364
[853]1365            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
[837]1366
[853]1367            if min_cpus == 0:
1368
1369                ppn = ''
1370
1371            else:
1372                ppn = min_cpus
1373
[837]1374            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1375
1376            status = 'Q'
1377
1378            if status_long == 'RUNNING':
1379
1380                status = 'R'
1381
1382            elif status_long == 'COMPLETED':
1383
1384                continue
1385
1386            jobs_processed.append( job_id )
1387
1388            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1389
1390            start_timestamp = ''
1391            nodeslist       = ''
1392
1393            if status == 'R':
1394
1395                start_timestamp = self.getAttr( attrs, 'start_time' )
[851]1396                nodes           = attrs[ 'nodes' ]
[837]1397
[851]1398                if not nodes:
[837]1399
[851]1400                    # This should not happen
1401
1402                    # Something wrong: running but 'nodes' returned empty by pyslurm
1403                    # Possible pyslurm bug: abort/quit/warning
1404
1405                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1406
1407                    print err_msg
1408                    debug_msg( 0, err_msg )
1409                    sys.exit(1)
1410
1411                my_nodelist = [ ]
1412
1413                slurm_hostlist  = pyslurm.hostlist()
1414                slurm_hostlist.create( nodes )
1415                slurm_hostlist.uniq()
1416
1417                while slurm_hostlist.count() > 0:
1418
1419                    my_nodelist.append( slurm_hostlist.pop() )
1420
1421                slurm_hostlist.destroy()
1422
1423                del slurm_hostlist
1424
1425                nodeslist       = do_nodelist( my_nodelist )
1426
[837]1427                if DETECT_TIME_DIFFS:
1428
1429                    # If a job start if later than our current date,
1430                    # that must mean the Torque server's time is later
1431                    # than our local time.
1432               
1433                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1434
1435                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1436
1437            elif status == 'Q':
1438
1439                nodeslist       = str( attrs[ 'num_nodes' ] )
1440
1441            else:
1442                start_timestamp = ''
1443                nodeslist       = ''
1444
1445            myAttrs                = { }
1446
1447            myAttrs[ 'name' ]             = str( name )
1448            myAttrs[ 'queue' ]            = str( queue )
1449            myAttrs[ 'owner' ]            = str( owner )
1450            myAttrs[ 'requested_time' ]   = str( requested_time )
1451            myAttrs[ 'requested_memory' ] = str( requested_memory )
1452            myAttrs[ 'ppn' ]              = str( ppn )
1453            myAttrs[ 'status' ]           = str( status )
1454            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1455            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1456            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1457            myAttrs[ 'nodes' ]            = nodeslist
1458            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1459            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1460
1461            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1462
1463                self.jobs[ job_id ] = myAttrs
1464
1465        for id, attrs in self.jobs.items():
1466
1467            if id not in jobs_processed:
1468
1469                # This one isn't there anymore; toedeledoki!
1470                #
1471                del self.jobs[ id ]
1472
[318]1473class SgeDataGatherer(DataGatherer):
1474
[691]1475    jobs = {}
[61]1476
[691]1477    def __init__( self ):
1478        self.jobs = {}
1479        self.timeoffset = 0
1480        self.dp = DataProcessor()
[318]1481
[691]1482    def getJobData( self ):
1483        """Gather all data on current jobs in SGE"""
[318]1484
[691]1485        import popen2
[318]1486
[691]1487        self.cur_time = 0
1488        queues = ""
1489        if QUEUE:    # only for specific queues
1490            # Fixme:  assumes queue names don't contain single
1491            # quote or comma.  Don't know what the SGE rules are.
1492            queues = " -q '" + string.join (QUEUE, ",") + "'"
1493        # Note the comment in SgeQstatXMLParser about scaling with
1494        # this method of getting data.  I haven't found better one.
1495        # Output with args `-xml -ext -f -r' is easier to parse
1496        # in some ways, harder in others, but it doesn't provide
1497        # the submission time (at least SGE 6.0).  The pipeline
1498        # into sed corrects bogus XML observed with a configuration
1499        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1500        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
[623]1501sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
[691]1502                           + queues, True)
1503        qstatparser = SgeQstatXMLParser()
1504        parse_err = 0
1505        try:
1506            xml.sax.parse(piping.fromchild, qstatparser)
1507        except NoJobs:
1508            pass
1509        except:
1510            parse_err = 1
[704]1511        if piping.wait():
1512            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
[691]1513            return None
1514        elif parse_err:
1515            debug_msg(10, "Bad XML output from qstat"())
1516            exit (1)
1517        for f in piping.fromchild, piping.tochild, piping.childerr:
1518            f.close()
1519        self.cur_time = time.time()
1520        jobs_processed = []
1521        for job in qstatparser.joblist:
1522            job_id = job["number"]
1523            if job["status"] in [ 'Q', 'R' ]:
1524                jobs_processed.append(job_id)
1525            if job["status"] == "R":
1526                job["nodes"] = do_nodelist (job["nodes"])
1527                # Fixme: why is job["nodes"] sometimes null?
1528                try:
1529                    # Fixme: Is this sensible?  The
1530                    # PBS-type PPN isn't something you use
1531                    # with SGE.
[704]1532                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
[691]1533                except:
1534                    job["ppn"] = 0
1535                if DETECT_TIME_DIFFS:
1536                    # If a job start is later than our
1537                    # current date, that must mean
1538                    # the SGE server's time is later
1539                    # than our local time.
[704]1540                    start_timestamp = int (job["start_timestamp"])
1541                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
[318]1542
[704]1543                        self.timeoffset    = start_timestamp - int(self.cur_time)
[691]1544            else:
1545                # fixme: Note sure what this should be:
1546                job["ppn"] = job["RN_max"]
1547                job["nodes"] = "1"
[318]1548
[691]1549            myAttrs = {}
1550            for attr in ["name", "queue", "owner",
1551                     "requested_time", "status",
1552                     "requested_memory", "ppn",
1553                     "start_timestamp", "queued_timestamp"]:
1554                myAttrs[attr] = str(job[attr])
1555            myAttrs["nodes"] = job["nodes"]
[704]1556            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
[691]1557            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1558            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
[318]1559
[704]1560            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
[691]1561                self.jobs[job_id] = myAttrs
1562        for id, attrs in self.jobs.items():
1563            if id not in jobs_processed:
1564                del self.jobs[id]
[318]1565
[524]1566# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1567# Requres LSFObject http://sourceforge.net/projects/lsfobject
1568#
1569class LsfDataGatherer(DataGatherer):
[525]1570
[691]1571    """This is the DataGatherer for LSf"""
[524]1572
[691]1573    global lsfObject
[524]1574
[691]1575    def __init__( self ):
[525]1576
[691]1577        self.jobs = { }
1578        self.timeoffset = 0
1579        self.dp = DataProcessor()
1580        self.initLsfQuery()
[524]1581
[691]1582    def _countDuplicatesInList( self, dupedList ):
[525]1583
[691]1584        countDupes    = { }
[525]1585
[691]1586        for item in dupedList:
[525]1587
[691]1588            if not countDupes.has_key( item ):
[525]1589
[691]1590                countDupes[ item ]    = 1
1591            else:
1592                countDupes[ item ]    = countDupes[ item ] + 1
[525]1593
[691]1594        dupeCountList    = [ ]
[525]1595
[691]1596        for item, count in countDupes.items():
[525]1597
[691]1598            dupeCountList.append( ( item, count ) )
[525]1599
[691]1600        return dupeCountList
[524]1601#
1602#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1603#print _countDuplicatesInList(lst)
1604#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1605########################
1606
[691]1607    def initLsfQuery( self ):
1608        self.pq = None
1609        self.pq = lsfObject.jobInfoEntObject()
[524]1610
[691]1611    def getJobData( self, known_jobs="" ):
1612        """Gather all data on current jobs in LSF"""
1613        if len( known_jobs ) > 0:
1614            jobs = known_jobs
1615        else:
1616            jobs = { }
1617        joblist = {}
1618        joblist = self.pq.getJobInfo()
1619        nodelist = ''
[524]1620
[691]1621        self.cur_time = time.time()
[524]1622
[691]1623        jobs_processed = [ ]
[524]1624
[691]1625        for name, attrs in joblist.items():
1626            job_id = str(name)
1627            jobs_processed.append( job_id )
1628            name = self.getAttr( attrs, 'jobName' )
1629            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1630            owner = self.getAttr( attrs, 'user' )
[524]1631
1632### THIS IS THE rLimit List index values
[691]1633#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1634#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1635#define LSF_RLIMIT_DATA     2        /* data size */
1636#define LSF_RLIMIT_STACK    3        /* stack size */
1637#define LSF_RLIMIT_CORE     4        /* core file size */
1638#define LSF_RLIMIT_RSS      5        /* resident set size */
1639#define LSF_RLIMIT_NOFILE   6        /* open files */
1640#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1641#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
[524]1642#define LSF_RLIMIT_SWAP     8
[691]1643#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1644#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1645#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1646#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
[524]1647
[691]1648            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1649            if requested_time == -1: 
1650                requested_time = ""
1651            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1652            if requested_memory == -1: 
1653                requested_memory = ""
[524]1654# This tries to get proc per node. We don't support this right now
[691]1655            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1656            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1657            if requested_cpus == None or requested_cpus == "":
1658                requested_cpus = 1
[524]1659
[691]1660            if QUEUE:
1661                for q in QUEUE:
1662                    if q == queue:
1663                        display_queue = 1
1664                        break
1665                    else:
1666                        display_queue = 0
1667                        continue
1668            if display_queue == 0:
1669                continue
[524]1670
[691]1671            runState = self.getAttr( attrs, 'status' )
1672            if runState == 4:
1673                status = 'R'
1674            else:
1675                status = 'Q'
1676            queued_timestamp = self.getAttr( attrs, 'submitTime' )
[524]1677
[691]1678            if status == 'R':
1679                start_timestamp = self.getAttr( attrs, 'startTime' )
1680                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1681                nodelist = nodesCpu.keys()
[524]1682
[691]1683                if DETECT_TIME_DIFFS:
[524]1684
[691]1685                    # If a job start if later than our current date,
1686                    # that must mean the Torque server's time is later
1687                    # than our local time.
[524]1688
[691]1689                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
[524]1690
[691]1691                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[524]1692
[691]1693            elif status == 'Q':
1694                start_timestamp = ''
1695                count_mynodes = 0
1696                numeric_node = 1
1697                nodelist = ''
[524]1698
[691]1699            myAttrs = { }
1700            if name == "":
1701                myAttrs['name'] = "none"
1702            else:
1703                myAttrs['name'] = name
[524]1704
[691]1705            myAttrs[ 'owner' ]        = owner
1706            myAttrs[ 'requested_time' ]    = str(requested_time)
1707            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1708            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1709            myAttrs[ 'ppn' ]        = str( ppn )
1710            myAttrs[ 'status' ]        = status
1711            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1712            myAttrs[ 'queue' ]        = str(queue)
1713            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1714            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1715            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1716            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1717            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
[524]1718
[691]1719            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1720                jobs[ job_id ] = myAttrs
[524]1721
[691]1722                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
[524]1723
[691]1724        for id, attrs in jobs.items():
1725            if id not in jobs_processed:
1726                # This one isn't there anymore
1727                #
1728                del jobs[ id ]
1729        self.jobs=jobs
[524]1730
1731
[355]1732class PbsDataGatherer( DataGatherer ):
[318]1733
[691]1734    """This is the DataGatherer for PBS and Torque"""
[318]1735
[691]1736    global PBSQuery, PBSError
[256]1737
[691]1738    def __init__( self ):
[354]1739
[691]1740        """Setup appropriate variables"""
[23]1741
[785]1742        self.jobs       = { }
1743        self.timeoffset = 0
1744        self.dp         = DataProcessor()
[354]1745
[691]1746        self.initPbsQuery()
[23]1747
[691]1748    def initPbsQuery( self ):
[91]1749
[785]1750        self.pq = None
[354]1751
[788]1752        try:
[354]1753
[788]1754            if( BATCH_SERVER ):
[91]1755
[788]1756                self.pq = PBSQuery( BATCH_SERVER )
1757            else:
1758                self.pq = PBSQuery()
1759
1760        except PBSError, details:
1761            print 'Cannot connect to pbs server'
1762            print details
1763            sys.exit( 1 )
1764
[691]1765        try:
[791]1766            # TODO: actually use new data structure
[691]1767            self.pq.old_data_structure()
[656]1768
[691]1769        except AttributeError:
[656]1770
[691]1771            # pbs_query is older
1772            #
1773            pass
[656]1774
[790]1775    def getNodeData( self ):
1776
1777        nodedict = { }
1778
1779        try:
1780            nodedict = self.pq.getnodes()
1781
1782        except PBSError, detail:
1783
1784            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1785
1786        return nodedict
1787
[691]1788    def getJobData( self ):
[354]1789
[691]1790        """Gather all data on current jobs in Torque"""
[26]1791
[785]1792        joblist            = {}
1793        self.cur_time      = 0
[349]1794
[691]1795        try:
1796            joblist        = self.pq.getjobs()
[785]1797            self.cur_time  = time.time()
[354]1798
[947]1799        except (PBSError, TypeError), detail:
[354]1800
[790]1801            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
[691]1802            return None
[354]1803
[691]1804        jobs_processed    = [ ]
[26]1805
[691]1806        for name, attrs in joblist.items():
[785]1807            display_queue = 1
1808            job_id        = name.split( '.' )[0]
[26]1809
[785]1810            name          = self.getAttr( attrs, 'Job_Name' )
1811            queue         = self.getAttr( attrs, 'queue' )
[317]1812
[691]1813            if QUEUE:
1814                for q in QUEUE:
1815                    if q == queue:
1816                        display_queue = 1
1817                        break
1818                    else:
1819                        display_queue = 0
1820                        continue
1821            if display_queue == 0:
1822                continue
[317]1823
1824
[691]1825            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
[785]1826            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1827            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
[95]1828
[785]1829            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
[95]1830
[785]1831            ppn = ''
[281]1832
[691]1833            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
[95]1834
[785]1835                mynoderequest_fields = mynoderequest.split( ':' )
[281]1836
[691]1837                for mynoderequest_field in mynoderequest_fields:
[281]1838
[691]1839                    if mynoderequest_field.find( 'ppn' ) != -1:
[281]1840
[785]1841                        ppn = mynoderequest_field.split( 'ppn=' )[1]
[281]1842
[785]1843            status = self.getAttr( attrs, 'job_state' )
[25]1844
[691]1845            if status in [ 'Q', 'R' ]:
[450]1846
[691]1847                jobs_processed.append( job_id )
[450]1848
[785]1849            queued_timestamp = self.getAttr( attrs, 'ctime' )
[243]1850
[691]1851            if status == 'R':
[133]1852
[785]1853                start_timestamp = self.getAttr( attrs, 'mtime' )
1854                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
[133]1855
[785]1856                nodeslist       = do_nodelist( nodes )
[354]1857
[691]1858                if DETECT_TIME_DIFFS:
[185]1859
[691]1860                    # If a job start if later than our current date,
1861                    # that must mean the Torque server's time is later
1862                    # than our local time.
1863               
1864                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
[185]1865
[785]1866                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[185]1867
[691]1868            elif status == 'Q':
[95]1869
[691]1870                # 'mynodequest' can be a string in the following syntax according to the
1871                # Torque Administator's manual:
1872                #
1873                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1874                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1875                # etc
1876                #
[451]1877
[691]1878                #
1879                # For now we only count the amount of nodes request and ignore properties
1880                #
[451]1881
[785]1882                start_timestamp = ''
1883                count_mynodes   = 0
[354]1884
[691]1885                for node in mynoderequest.split( '+' ):
[67]1886
[691]1887                    # Just grab the {node_count|hostname} part and ignore properties
1888                    #
[785]1889                    nodepart     = node.split( ':' )[0]
[67]1890
[691]1891                    # Let's assume a node_count value
1892                    #
[785]1893                    numeric_node = 1
[451]1894
[691]1895                    # Chop the value up into characters
1896                    #
1897                    for letter in nodepart:
[67]1898
[691]1899                        # If this char is not a digit (0-9), this must be a hostname
1900                        #
1901                        if letter not in string.digits:
[133]1902
[785]1903                            numeric_node = 0
[133]1904
[691]1905                    # If this is a hostname, just count this as one (1) node
1906                    #
1907                    if not numeric_node:
[354]1908
[785]1909                        count_mynodes = count_mynodes + 1
[691]1910                    else:
[451]1911
[691]1912                        # If this a number, it must be the node_count
1913                        # and increase our count with it's value
1914                        #
1915                        try:
[785]1916                            count_mynodes = count_mynodes + int( nodepart )
[354]1917
[691]1918                        except ValueError, detail:
[354]1919
[691]1920                            # When we arrive here I must be bugged or very confused
1921                            # THIS SHOULD NOT HAPPEN!
1922                            #
1923                            debug_msg( 10, str( detail ) )
1924                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1925                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1926                            debug_msg( 10, 'job = ' + str( name ) )
1927                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1928                       
[785]1929                nodeslist       = str( count_mynodes )
[691]1930            else:
[785]1931                start_timestamp = ''
1932                nodeslist       = ''
[133]1933
[691]1934            myAttrs                = { }
[26]1935
[785]1936            myAttrs[ 'name' ]             = str( name )
1937            myAttrs[ 'queue' ]            = str( queue )
1938            myAttrs[ 'owner' ]            = str( owner )
1939            myAttrs[ 'requested_time' ]   = str( requested_time )
1940            myAttrs[ 'requested_memory' ] = str( requested_memory )
1941            myAttrs[ 'ppn' ]              = str( ppn )
1942            myAttrs[ 'status' ]           = str( status )
1943            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1944            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1945            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1946            myAttrs[ 'nodes' ]            = nodeslist
1947            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
[691]1948            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
[354]1949
[691]1950            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
[61]1951
[785]1952                self.jobs[ job_id ] = myAttrs
[26]1953
[691]1954        for id, attrs in self.jobs.items():
[76]1955
[691]1956            if id not in jobs_processed:
[76]1957
[691]1958                # This one isn't there anymore; toedeledoki!
1959                #
1960                del self.jobs[ id ]
[76]1961
[362]1962GMETRIC_DEFAULT_TYPE    = 'string'
1963GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1964GMETRIC_DEFAULT_PORT    = '8649'
[700]1965GMETRIC_DEFAULT_UNITS   = ''
[362]1966
1967class Gmetric:
1968
[691]1969    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
[362]1970
[700]1971    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1972    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1973    protocol        = ( 'udp', 'multicast' )
[362]1974
[691]1975    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
[700]1976               
[691]1977        global GMETRIC_DEFAULT_TYPE
[362]1978
[691]1979        self.prot       = self.checkHostProtocol( host )
[700]1980        self.data_msg   = xdrlib.Packer()
1981        self.meta_msg   = xdrlib.Packer()
[691]1982        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
[362]1983
[691]1984        if self.prot not in self.protocol:
[362]1985
[691]1986            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
[362]1987
[691]1988        if self.prot == 'multicast':
[362]1989
[691]1990            # Set multicast options
1991            #
1992            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
[362]1993
[691]1994        self.hostport   = ( host, int( port ) )
1995        self.slopestr   = 'both'
1996        self.tmax       = 60
[362]1997
[946]1998    def __del__( self ):
1999
2000        self.socket.close()
2001
[691]2002    def checkHostProtocol( self, ip ):
[362]2003
[691]2004        """Detect if a ip adress is a multicast address"""
[471]2005
[691]2006        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
2007        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
[362]2008
[700]2009        ip_fields               = ip.split( '.' )
[362]2010
[691]2011        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
[362]2012
[691]2013            return 'multicast'
2014        else:
2015            return 'udp'
[362]2016
[691]2017    def send( self, name, value, dmax, typestr = '', units = '' ):
[362]2018
[691]2019        if len( units ) == 0:
[700]2020            units       = GMETRIC_DEFAULT_UNITS
[471]2021
[691]2022        if len( typestr ) == 0:
[700]2023            typestr     = GMETRIC_DEFAULT_TYPE
[362]2024
[700]2025        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
[409]2026
[700]2027        meta_rt = self.socket.sendto( meta_msg, self.hostport )
2028        data_rt = self.socket.sendto( data_msg, self.hostport )
[362]2029
[700]2030        return ( meta_rt, data_rt )
[362]2031
[700]2032    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
2033
2034        hostname = "unset"
2035
[691]2036        if slopestr not in self.slope:
[362]2037
[691]2038            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
[362]2039
[691]2040        if typestr not in self.type:
[362]2041
[691]2042            raise ValueError( "Type must be one of: " + str( self.type ) )
[362]2043
[691]2044        if len( name ) == 0:
[362]2045
[691]2046            raise ValueError( "Name must be non-empty" )
[362]2047
[700]2048        self.meta_msg.reset()
2049        self.meta_msg.pack_int( 128 )
[362]2050
[700]2051        if not spoof:
2052            self.meta_msg.pack_string( hostname )
2053        else:
2054            self.meta_msg.pack_string( spoof )
[362]2055
[700]2056        self.meta_msg.pack_string( name )
2057
2058        if not spoof:
2059            self.meta_msg.pack_int( 0 )
2060        else:
2061            self.meta_msg.pack_int( 1 )
2062           
2063        self.meta_msg.pack_string( typestr )
2064        self.meta_msg.pack_string( name )
2065        self.meta_msg.pack_string( unitstr )
2066        self.meta_msg.pack_int( self.slope[ slopestr ] )
2067        self.meta_msg.pack_uint( int( tmax ) )
2068        self.meta_msg.pack_uint( int( dmax ) )
2069
2070        if not group:
2071            self.meta_msg.pack_int( 0 )
2072        else:
2073            self.meta_msg.pack_int( 1 )
2074            self.meta_msg.pack_string( "GROUP" )
2075            self.meta_msg.pack_string( group )
2076
2077        self.data_msg.reset()
2078        self.data_msg.pack_int( 128+5 )
2079
2080        if not spoof:
2081            self.data_msg.pack_string( hostname )
2082        else:
2083            self.data_msg.pack_string( spoof )
2084
2085        self.data_msg.pack_string( name )
2086
2087        if not spoof:
2088            self.data_msg.pack_int( 0 )
2089        else:
2090            self.data_msg.pack_int( 1 )
2091
2092        self.data_msg.pack_string( "%s" )
2093        self.data_msg.pack_string( str( value ) )
2094
2095        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2096
[26]2097def printTime( ):
[354]2098
[691]2099    """Print current time/date in human readable format for log/debug"""
[26]2100
[691]2101    return time.strftime("%a, %d %b %Y %H:%M:%S")
[26]2102
2103def debug_msg( level, msg ):
[354]2104
[691]2105    """Print msg if at or above current debug level"""
[26]2106
[691]2107    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
[377]2108
[691]2109    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2110        sys.stderr.write( msg + '\n' )
[26]2111
[691]2112    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2113        syslog.syslog( msg )
[373]2114
[307]2115def write_pidfile():
2116
[691]2117    # Write pidfile if PIDFILE is set
2118    #
2119    if PIDFILE:
[307]2120
[691]2121        pid    = os.getpid()
[354]2122
[691]2123        pidfile    = open( PIDFILE, 'w' )
[354]2124
[691]2125        pidfile.write( str( pid ) )
2126        pidfile.close()
[307]2127
[23]2128def main():
[354]2129
[691]2130    """Application start"""
[23]2131
[837]2132    global PBSQuery, PBSError, lsfObject, pyslurm
[854]2133    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
[256]2134
[691]2135    if not processArgs( sys.argv[1:] ):
[354]2136
[691]2137        sys.exit( 1 )
[212]2138
[691]2139    # Load appropriate DataGatherer depending on which BATCH_API is set
2140    # and any required modules for the Gatherer
2141    #
2142    if BATCH_API == 'pbs':
[256]2143
[691]2144        try:
2145            from PBSQuery import PBSQuery, PBSError
[256]2146
[787]2147        except ImportError, details:
[256]2148
[854]2149            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
[787]2150            print details
[691]2151            sys.exit( 1 )
[256]2152
[691]2153        gather = PbsDataGatherer()
[256]2154
[691]2155    elif BATCH_API == 'sge':
[256]2156
[854]2157        if BATCH_SERVER != 'localhost':
2158
2159            # Print and log, but continue execution
2160            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2161            print err_msg
2162            debug_msg( 0, err_msg )
2163
[691]2164        # Tested with SGE 6.0u11.
2165        #
2166        gather = SgeDataGatherer()
[368]2167
[691]2168    elif BATCH_API == 'lsf':
[368]2169
[854]2170        if BATCH_SERVER != 'localhost':
2171
2172            # Print and log, but continue execution
2173            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2174            print err_msg
2175            debug_msg( 0, err_msg )
2176
[691]2177        try:
2178            from lsfObject import lsfObject
2179        except:
[854]2180            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2181            sys.exit( 1 )
[256]2182
[691]2183        gather = LsfDataGatherer()
[524]2184
[837]2185    elif BATCH_API == 'slurm':
2186
[854]2187        if BATCH_SERVER != 'localhost':
2188
2189            # Print and log, but continue execution
2190            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2191            print err_msg
2192            debug_msg( 0, err_msg )
2193
[837]2194        try:
2195            import pyslurm
2196        except:
2197            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
[854]2198            sys.exit( 1 )
[837]2199
2200        gather = SLURMDataGatherer()
2201
[691]2202    else:
[786]2203        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
[354]2204
[691]2205        sys.exit( 1 )
[256]2206
[691]2207    if( DAEMONIZE and USE_SYSLOG ):
[373]2208
[691]2209        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[373]2210
[691]2211    if DAEMONIZE:
[354]2212
[691]2213        gather.daemon()
2214    else:
2215        gather.run()
[23]2216
[256]2217# wh00t? someone started me! :)
[65]2218#
[23]2219if __name__ == '__main__':
[691]2220    main()
Note: See TracBrowser for help on using the repository browser.