source: branches/1.1/jobmond/jobmond.py @ 943

Last change on this file since 943 was 943, checked in by ramonb, 10 years ago
  • typo fix
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 64.7 KB
RevLine 
[23]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[691]5# Copyright (C) 2006-2013  Ramon Bastiaans
[623]6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
[225]7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
[228]22# SVN $Id: jobmond.py 943 2013-12-19 14:12:43Z ramonb $
[227]23#
[23]24
[694]25# vi :set ts=4
26
[471]27import sys, getopt, ConfigParser, time, os, socket, string, re
[837]28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
[318]29from xml.sax.handler import feature_namespaces
[623]30from collections import deque
[699]31from glob import glob
[942]32import cgi
[318]33
[913]34VERSION='__VERSION__'
[307]35
[471]36def usage( ver ):
37
[691]38    print 'jobmond %s' %VERSION
[471]39
[691]40    if ver:
41        return 0
[471]42
[691]43    print
44    print 'Purpose:'
45    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
46    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
47    print
48    print 'Usage:    jobmond [OPTIONS]'
49    print
50    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
51    print '  -p, --pidfile=FILE    Use pid file to store the process id'
52    print '  -h, --help        Print help and exit'
53    print '  -v, --version      Print version and exit'
54    print
[307]55
[212]56def processArgs( args ):
[26]57
[701]58    SHORT_L      = 'p:hvc:'
59    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
[165]60
[704]61    global PIDFILE, JOBMOND_CONF
[701]62    PIDFILE      = None
[61]63
[701]64    JOBMOND_CONF = '/etc/jobmond.conf'
[354]65
[691]66    try:
[68]67
[691]68        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
[185]69
[691]70    except getopt.GetoptError, detail:
[212]71
[691]72        print detail
[851]73        usage( False )
[691]74        sys.exit( 1 )
[212]75
[691]76    for opt, value in opts:
[212]77
[691]78        if opt in [ '--config', '-c' ]:
79       
[701]80            JOBMOND_CONF = value
[212]81
[691]82        if opt in [ '--pidfile', '-p' ]:
[212]83
[701]84            PIDFILE      = value
[691]85       
86        if opt in [ '--help', '-h' ]:
[307]87 
[691]88            usage( False )
89            sys.exit( 0 )
[212]90
[691]91        if opt in [ '--version', '-v' ]:
[471]92
[691]93            usage( True )
94            sys.exit( 0 )
[471]95
[701]96    return loadConfig( JOBMOND_CONF )
[212]97
[520]98class GangliaConfigParser:
99
[699]100    def __init__( self, filename ):
[520]101
[699]102        self.conf_lijst   = [ ]
103        self.conf_dict    = { }
104        self.filename     = filename
105        self.file_pointer = file( filename, 'r' )
106        self.lexx         = shlex.shlex( self.file_pointer )
107        self.lexx.whitespace_split = True
[520]108
[699]109        self.parse()
[520]110
[699]111    def __del__( self ):
[520]112
[699]113        """
114        Cleanup: close file descriptor
115        """
116
117        self.file_pointer.close()
118        del self.lexx
119        del self.conf_lijst
120
[691]121    def removeQuotes( self, value ):
[520]122
[699]123        clean_value = value
124        clean_value = clean_value.replace( "'", "" )
125        clean_value = clean_value.replace( '"', '' )
126        clean_value = clean_value.strip()
[520]127
[691]128        return clean_value
[520]129
[699]130    def removeBraces( self, value ):
[520]131
[699]132        clean_value = value
133        clean_value = clean_value.replace( "(", "" )
134        clean_value = clean_value.replace( ')', '' )
135        clean_value = clean_value.strip()
[520]136
[699]137        return clean_value
[520]138
[699]139    def parse( self ):
[520]140
[699]141        """
142        Parse self.filename using shlex scanning.
143        - Removes /* comments */
144        - Traverses (recursively) through all include () statements
145        - Stores complete valid config tokens in self.conf_list
[520]146
[699]147        i.e.:
148            ['globals',
149             '{',
150             'daemonize',
151             '=',
152             'yes',
153             'setuid',
154             '=',
155             'yes',
156             'user',
157             '=',
158             'ganglia',
159             'debug_level',
160             '=',
161             '0',
162             <etc> ]
163        """
[520]164
[699]165        t = 'bogus'
166        c = False
167        i = False
[520]168
[699]169        while t != self.lexx.eof:
170            #print 'get token'
171            t = self.lexx.get_token()
[520]172
[699]173            if len( t ) >= 2:
[520]174
[699]175                if len( t ) >= 4:
[520]176
[699]177                    if t[:2] == '/*' and t[-2:] == '*/':
[520]178
[699]179                        #print 'comment line'
180                        #print 'skipping: %s' %t
181                        continue
[520]182
[699]183                if t == '/*' or t[:2] == '/*':
184                    c = True
185                    #print 'comment start'
186                    #print 'skipping: %s' %t
187                    continue
[520]188
[699]189                if t == '*/' or t[-2:] == '*/':
190                    c = False
191                    #print 'skipping: %s' %t
192                    #print 'comment end'
193                    continue
194
195            if c:
196                #print 'skipping: %s' %t
197                continue
198
199            if t == 'include':
200                i = True
201                #print 'include start'
202                #print 'skipping: %s' %t
203                continue
204
205            if i:
206
207                #print 'include start: %s' %t
208
209                t2 = self.removeQuotes( t )
210                t2 = self.removeBraces( t )
211
212                for in_file in glob( self.removeQuotes(t2) ):
213
214                    #print 'including file: %s' %in_file
215                    parse_infile = GangliaConfigParser( in_file )
216
217                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
218
219                    del parse_infile
220
221                i = False
222                #print 'include end'
223                #print 'skipping: %s' %t
224                continue
225
226            #print 'keep: %s' %t
227            self.conf_lijst.append( self.removeQuotes(t) )
228
229    def getConfLijst( self ):
230
231        return self.conf_lijst
232
233    def confListToDict( self, parent_list=None ):
234
235        """
236        Recursively traverses a conf_list and creates dictionary from it
237        """
238
239        new_dict = { }
240        count    = 0
241        skip     = 0
242
243        if not parent_list:
244            parent_list = self.conf_lijst
245
246        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
247
248        for n, c in enumerate( parent_list ):
249
250            count = count + 1
251
252            #print 'CL: n %d c %s' %(n, c)
253
254            if skip > 0:
255
256                #print '- skipped'
257                skip = skip - 1
258                continue
259
260            if (n+1) <= (len( parent_list )-1):
261
262                if parent_list[(n+1)] == '{':
263
264                    if not new_dict.has_key( c ):
265                        new_dict[ c ] = [ ]
266
267                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
268                    new_dict[ c ].append( temp_new_dict )
269
270                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
271
272                    if not new_dict.has_key( c ):
273                        new_dict[ c ] = [ ]
274
275                    new_dict[ c ].append( parent_list[ (n+2) ] )
276
277                    skip = 2
278
279                if parent_list[n] == '}':
280
281                    #print 'leaving confListToDict(): new dict = %s' %new_dict
282                    return (new_dict, count)
283
284    def makeConfDict( self ):
285
286        """
287        Walks through self.conf_list and creates a dictionary based upon config values
288
289        i.e.:
290            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
291                                                         'ip': ['"127.0.0.1"'],
292                                                         'mask': ['32']}]}],
293                                    'port': ['8649']}],
294            'udp_recv_channel': [{'port': ['8649']}],
295            'udp_send_channel': [{'host': ['145.101.32.3'],
296                                  'port': ['8649']},
297                                 {'host': ['145.101.32.207'],
298                                  'port': ['8649']}]}
299        """
300
301        new_dict = { }
302        skip     = 0
303
304        #print 'entering makeConfDict()'
305
306        for n, c in enumerate( self.conf_lijst ):
307
308            #print 'M: n %d c %s' %(n, c)
309
310            if skip > 0:
311
312                #print '- skipped'
313                skip = skip - 1
314                continue
315
316            if (n+1) <= (len( self.conf_lijst )-1):
317
318                if self.conf_lijst[(n+1)] == '{':
319
320                    if not new_dict.has_key( c ):
321                        new_dict[ c ] = [ ]
322
323                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
324                    new_dict[ c ].append( temp_new_dict )
325
326                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
327
328                    if not new_dict.has_key( c ):
329                        new_dict[ c ] = [ ]
330
331                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
332
333                    skip = 2
334
335        self.conf_dict = new_dict
336        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
337
338    def checkConfDict( self ):
339
340        if len( self.conf_lijst ) == 0:
341
342            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
343
344        if len( self.conf_dict ) == 0:
345
346            self.makeConfDict()
347
348    def getConfDict( self ):
349
350        self.checkConfDict()
351        return self.conf_dict
352
353    def getUdpSendChannels( self ):
354
355        self.checkConfDict()
356
[707]357        udp_send_channels = [ ] # IP:PORT
358
359        if not self.conf_dict.has_key( 'udp_send_channel' ):
360            return None
361
362        for u in self.conf_dict[ 'udp_send_channel' ]:
363
364            if u.has_key( 'mcast_join' ):
365
366                ip = u['mcast_join'][0]
367
368            elif u.has_key( 'host' ):
369
370                ip = u['host'][0]
371
372            port = u['port'][0]
373
374            udp_send_channels.append( ( ip, port ) )
375
376        if len( udp_send_channels ) == 0:
377            return None
378
379        return udp_send_channels
380
[699]381    def getSectionLastOption( self, section, option ):
382
383        """
384        Get last option set in a config section that could be set multiple times in multiple (include) files.
385
386        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
387        """
388
389        self.checkConfDict()
390        value = None
391
392        if not self.conf_dict.has_key( section ):
393
394            return None
395
396        # Could be set multiple times in multiple (include) files: get last one set
397        for c in self.conf_dict[ section ]:
398
399                if c.has_key( option ):
400
[705]401                    value = c[ option ][0]
[699]402
[705]403        return value
[699]404
405    def getClusterName( self ):
406
407        return self.getSectionLastOption( 'cluster', 'name' )
408
409    def getVal( self, section, option ):
410
411        return self.getSectionLastOption( section, option )
412
[691]413    def getInt( self, section, valname ):
[520]414
[691]415        value    = self.getVal( section, valname )
[520]416
[691]417        if not value:
[699]418            return None
[520]419
[691]420        return int( value )
[520]421
[691]422    def getStr( self, section, valname ):
[520]423
[691]424        value    = self.getVal( section, valname )
[520]425
[691]426        if not value:
[699]427            return None
[520]428
[691]429        return str( value )
[520]430
431def findGmetric():
432
[691]433    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
[520]434
[691]435        guess    = '%s/%s' %( dir, 'gmetric' )
[520]436
[691]437        if os.path.exists( guess ):
[520]438
[691]439            return guess
[520]440
[691]441    return False
[520]442
[212]443def loadConfig( filename ):
444
[691]445    def getlist( cfg_string ):
[215]446
[691]447        my_list = [ ]
[215]448
[691]449        for item_txt in cfg_string.split( ',' ):
[215]450
[691]451            sep_char = None
[215]452
[691]453            item_txt = item_txt.strip()
[215]454
[691]455            for s_char in [ "'", '"' ]:
[215]456
[691]457                if item_txt.find( s_char ) != -1:
[215]458
[691]459                    if item_txt.count( s_char ) != 2:
[215]460
[691]461                        print 'Missing quote: %s' %item_txt
462                        sys.exit( 1 )
[215]463
[691]464                    else:
[215]465
[691]466                        sep_char = s_char
467                        break
[215]468
[691]469            if sep_char:
[215]470
[691]471                item_txt = item_txt.split( sep_char )[1]
[215]472
[691]473            my_list.append( item_txt )
[215]474
[691]475        return my_list
[215]476
[784]477    if not os.path.isfile( JOBMOND_CONF ):
478
479        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
480        sys.exit( 1 )
481
482    try:
483        f = open( JOBMOND_CONF, 'r' )
484    except IOError, detail:
485        print "Cannot read config file: '%s'" %JOBMOND_CONF
486        sys.exit( 1 )
487    else:
488        f.close()
489
[691]490    cfg        = ConfigParser.ConfigParser()
[212]491
[691]492    cfg.read( filename )
[212]493
[691]494    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
495    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
496    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
497    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
[707]498    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
[212]499
[701]500    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[212]501
[701]502    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[212]503
[691]504    SYSLOG_LEVEL    = -1
[701]505    SYSLOG_FACILITY = None
[377]506
[691]507    try:
[701]508        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[212]509
[691]510    except ConfigParser.NoOptionError:
[373]511
[701]512        USE_SYSLOG  = True
[373]513
[691]514        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
[373]515
[691]516    if USE_SYSLOG:
[373]517
[691]518        try:
[701]519            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[373]520
[691]521        except ConfigParser.NoOptionError:
[373]522
[691]523            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
[701]524            SYSLOG_LEVEL = 0
[373]525
[691]526        try:
[373]527
[691]528            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[373]529
[691]530        except ConfigParser.NoOptionError:
[373]531
[691]532            SYSLOG_FACILITY = syslog.LOG_DAEMON
[373]533
[691]534            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
[373]535
[691]536    try:
[373]537
[701]538        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
[212]539
[691]540    except ConfigParser.NoOptionError:
[265]541
[864]542        # Not required for all API's: only pbs api allows remote connections
543        BATCH_SERVER = None
[265]544
[691]545    try:
546   
[701]547        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
[265]548
[691]549    except ConfigParser.NoOptionError:
[265]550
[691]551        # Backwards compatibility for old configs
552        #
[265]553
[701]554        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
555        api_guess           = 'pbs'
[691]556   
557    try:
[212]558
[701]559        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
[353]560
[691]561    except ConfigParser.NoOptionError:
[353]562
[703]563        # Not specified: assume /etc/ganglia/gmond.conf
[691]564        #
[703]565        GMOND_CONF          = '/etc/ganglia/gmond.conf'
[353]566
[707]567    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
568    GMETRIC_TARGET          = None
[449]569
[707]570    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
[449]571
[707]572    if not GMOND_UDP_SEND_CHANNELS:
[449]573
[701]574        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
[520]575
[691]576        # Couldn't figure it out: let's see if it's in our jobmond.conf
577        #
578        try:
[520]579
[691]580            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
[520]581
[691]582        # Guess not: now just give up
[701]583       
[691]584        except ConfigParser.NoOptionError:
[520]585
[691]586            GMETRIC_TARGET    = None
[520]587
[691]588            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
[520]589
[701]590            gmetric_bin    = findGmetric()
[520]591
[701]592            if gmetric_bin:
[520]593
[701]594                GMETRIC_BINARY     = gmetric_bin
595            else:
596                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
[520]597
[701]598                try:
[520]599
[701]600                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
[520]601
[701]602                except ConfigParser.NoOptionError:
[520]603
[786]604                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
[701]605                    sys.exit( 1 )
[520]606
[701]607    #TODO: is this really still needed or should be automatic
[691]608    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
[212]609
[927]610    try:
611        BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
[215]612
[927]613    except ConfigParser.NoOptionError:
614
615        BATCH_HOST_TRANSLATE = [ ]
616        pass
617
[691]618    try:
[256]619
[691]620        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
[266]621
[691]622    except ConfigParser.NoOptionError, detail:
[266]623
[864]624        print "FATAL ERROR: BATCH_API not set"
625        sys.exit( 1 )
[354]626
[691]627    try:
[317]628
[691]629        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
[317]630
[691]631    except ConfigParser.NoOptionError, detail:
[317]632
[691]633        QUEUE        = None
[353]634
[701]635    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
636
[691]637    return True
[212]638
[507]639def fqdn_parts (fqdn):
[520]640
[691]641    """Return pair of host and domain for fully-qualified domain name arg."""
[520]642
[691]643    parts = fqdn.split (".")
[520]644
[691]645    return (parts[0], string.join(parts[1:], "."))
[507]646
[61]647class DataProcessor:
[355]648
[691]649    """Class for processing of data"""
[61]650
[691]651    binary = None
[61]652
[691]653    def __init__( self, binary=None ):
[355]654
[691]655        """Remember alternate binary location if supplied"""
[61]656
[691]657        global GMETRIC_BINARY, GMOND_CONF
[449]658
[691]659        if binary:
660            self.binary = binary
[61]661
[707]662        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[691]663            self.binary = GMETRIC_BINARY
[449]664
[691]665        # Timeout for XML
666        #
667        # From ganglia's documentation:
668        #
669        # 'A metric will be deleted DMAX seconds after it is received, and
670        # DMAX=0 means eternal life.'
[61]671
[691]672        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
[80]673
[707]674        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
[354]675
[691]676            incompatible = self.checkGmetricVersion()
[61]677
[691]678            if incompatible:
[355]679
[903]680                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
[691]681                sys.exit( 1 )
[65]682
[691]683    def checkGmetricVersion( self ):
[355]684
[691]685        """
[903]686        Check version of gmetric is at least 3.3.8
[691]687        for the syntax we use
688        """
[65]689
[691]690        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
[255]691
[691]692        incompatible    = 0
[341]693
[691]694        gfp        = os.popen( self.binary + ' --version' )
[692]695        lines      = gfp.readlines()
[65]696
[691]697        gfp.close()
[355]698
[691]699        for line in lines:
[355]700
[691]701            line = line.split( ' ' )
[65]702
[691]703            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
704           
705                gmetric_version    = line[1].split( '\n' )[0]
[65]706
[691]707                version_major    = int( gmetric_version.split( '.' )[0] )
708                version_minor    = int( gmetric_version.split( '.' )[1] )
709                version_patch    = int( gmetric_version.split( '.' )[2] )
[65]710
[691]711                incompatible    = 0
[65]712
[691]713                if version_major < 3:
[65]714
[691]715                    incompatible = 1
716               
717                elif version_major == 3:
[65]718
[903]719                    if version_minor < 3:
[65]720
[692]721                        incompatible = 1
[65]722
[903]723                    elif version_patch < 8:
724
725                        incompatible = 1
726
[691]727        return incompatible
[65]728
[691]729    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
[355]730
[691]731        """Call gmetric binary and multicast"""
[65]732
[691]733        cmd = self.binary
[65]734
[707]735        if GMOND_UDP_SEND_CHANNELS:
[61]736
[707]737            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
738
739                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
740
741                debug_msg( 10, printTime() + ' ' + metric_debug)
742
743                gm = Gmetric( c_ip, c_port )
744
745                gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
746
747        elif GMETRIC_TARGET:
748
[691]749            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
750            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
[353]751
[691]752            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
[353]753
[691]754            debug_msg( 10, printTime() + ' ' + metric_debug)
[353]755
[691]756            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
[353]757
[691]758            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
[353]759
[691]760        else:
761            try:
762                cmd = cmd + ' -c' + GMOND_CONF
[353]763
[691]764            except NameError:
[353]765
[705]766                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
[353]767
[691]768            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
[353]769
[691]770            if len( units ) > 0:
[409]771
[691]772                cmd = cmd + ' -u"' + units + '"'
[409]773
[691]774            debug_msg( 10, printTime() + ' ' + cmd )
[353]775
[691]776            os.system( cmd )
[353]777
[318]778class DataGatherer:
[23]779
[691]780    """Skeleton class for batch system DataGatherer"""
[256]781
[691]782    def printJobs( self, jobs ):
[355]783
[691]784        """Print a jobinfo overview"""
[318]785
[691]786        for name, attrs in self.jobs.items():
[318]787
[691]788            print 'job %s' %(name)
[318]789
[691]790            for name, val in attrs.items():
[318]791
[691]792                print '\t%s = %s' %( name, val )
[318]793
[691]794    def printJob( self, jobs, job_id ):
[355]795
[691]796        """Print job with job_id from jobs"""
[318]797
[691]798        print 'job %s' %(job_id)
[318]799
[691]800        for name, val in jobs[ job_id ].items():
[318]801
[691]802            print '\t%s = %s' %( name, val )
[318]803
[691]804    def getAttr( self, attrs, name ):
[507]805
[691]806        """Return certain attribute from dictionary, if exists"""
[507]807
[691]808        if attrs.has_key( name ):
[507]809
[691]810            return attrs[ name ]
811        else:
812            return ''
[507]813
[691]814    def jobDataChanged( self, jobs, job_id, attrs ):
[507]815
[691]816        """Check if job with attrs and job_id in jobs has changed"""
[507]817
[691]818        if jobs.has_key( job_id ):
[507]819
[691]820            oldData = jobs[ job_id ]   
821        else:
822            return 1
[507]823
[691]824        for name, val in attrs.items():
[507]825
[691]826            if oldData.has_key( name ):
[507]827
[691]828                if oldData[ name ] != attrs[ name ]:
[507]829
[691]830                    return 1
[507]831
[691]832            else:
833                return 1
[507]834
[691]835        return 0
[507]836
[691]837    def submitJobData( self ):
[507]838
[691]839        """Submit job info list"""
[507]840
[691]841        global BATCH_API
[512]842
[728]843        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[507]844
[724]845        running_jobs = 0
846        queued_jobs  = 0
[507]847
[691]848        # Count how many running/queued jobs we found
849        #
850        for jobid, jobattrs in self.jobs.items():
[507]851
[691]852            if jobattrs[ 'status' ] == 'Q':
[507]853
[691]854                queued_jobs += 1
[507]855
[691]856            elif jobattrs[ 'status' ] == 'R':
[507]857
[691]858                running_jobs += 1
[507]859
[691]860        # Report running/queued jobs as seperate metric for a nice RRD graph
861        #
[728]862        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
863        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
[507]864
[691]865        # Report down/offline nodes in batch (PBS only ATM)
866        #
[866]867        if BATCH_API in [ 'pbs', 'slurm' ]:
[512]868
[691]869            domain        = fqdn_parts( socket.getfqdn() )[1]
[514]870
[728]871            downed_nodes  = list()
872            offline_nodes = list()
[691]873       
874            l        = ['state']
[512]875
[790]876            nodelist = self.getNodeData()
877
878            for name, node in nodelist.items():
879
[691]880                if ( node[ 'state' ].find( "down" ) != -1 ):
[512]881
[691]882                    downed_nodes.append( name )
[512]883
[691]884                if ( node[ 'state' ].find( "offline" ) != -1 ):
[512]885
[691]886                    offline_nodes.append( name )
[512]887
[728]888            downnodeslist    = do_nodelist( downed_nodes )
889            offlinenodeslist = do_nodelist( offline_nodes )
[512]890
[691]891            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
892            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
[728]893            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
894            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
[514]895
[691]896        # Now let's spread the knowledge
897        #
898        for jobid, jobattrs in self.jobs.items():
[507]899
[691]900            # Make gmetric values for each job: respect max gmetric value length
901            #
902            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
903            metric_increment    = 0
[507]904
[691]905            # If we have more job info than max gmetric value length allows, split it up
906            # amongst multiple metrics
907            #
908            for val in gmetric_val:
[507]909
[728]910                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
911                self.dp.multicastGmetric( metric_name, val )
[507]912
[691]913                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
914                #
915                metric_increment    = metric_increment + 1
[507]916
[691]917    def compileGmetricVal( self, jobid, jobattrs ):
[507]918
[691]919        """Create a val string for gmetric of jobinfo"""
[507]920
[691]921        gval_lists    = [ ]
922        val_list    = { }
[507]923
[691]924        for val_name, val_value in jobattrs.items():
[507]925
[691]926            # These are our own metric names, i.e.: status, start_timestamp, etc
927            #
928            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
[507]929
[691]930            # These are their corresponding values
931            #
932            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
[507]933
[691]934            if val_name == 'nodes' and jobattrs['status'] == 'R':
[507]935
[691]936                node_str = None
[507]937
[691]938                for node in val_value:
[507]939
[691]940                    if node_str:
[507]941
[691]942                        node_str = node_str + ';' + node
943                    else:
944                        node_str = node
[507]945
[691]946                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
947                    #
948                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
[507]949
[691]950                        # It's too big, we need to make a new gmetric for the additional info
951                        #
952                        val_list[ val_name ]    = node_str
[507]953
[691]954                        gval_lists.append( val_list )
[507]955
[691]956                        val_list        = { }
957                        node_str        = None
[507]958
[691]959                val_list[ val_name ]    = node_str
[507]960
[691]961                gval_lists.append( val_list )
[507]962
[691]963                val_list        = { }
[507]964
[691]965            elif val_value != '':
[507]966
[691]967                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
968                #
969                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
[507]970
[691]971                    # It's too big, we need to make a new gmetric for the additional info
972                    #
973                    gval_lists.append( val_list )
[507]974
[691]975                    val_list        = { }
[507]976
[691]977                val_list[ val_name ]    = val_value
[507]978
[691]979        if len( val_list ) > 0:
[507]980
[691]981            gval_lists.append( val_list )
[507]982
[691]983        str_list    = [ ]
[507]984
[691]985        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
986        #
987        for val_list in gval_lists:
[507]988
[691]989            my_val_str    = None
[507]990
[691]991            for val_name, val_value in val_list.items():
[507]992
[691]993                if type(val_value) == list:
[579]994
[691]995                    val_value    = val_value.join( ',' )
[579]996
[691]997                if my_val_str:
[507]998
[691]999                    try:
1000                        # fixme: It's getting
1001                        # ('nodes', None) items
[943]1002                        my_val_str = my_val_str + ' ' + val_name + '=' + cgi.escape(val_value)
[691]1003                    except:
1004                        pass
[623]1005
[691]1006                else:
[943]1007                    my_val_str = val_name + '=' + cgi.escape(val_value)
[507]1008
[691]1009            str_list.append( my_val_str )
[507]1010
[691]1011        return str_list
[507]1012
[691]1013    def daemon( self ):
[355]1014
[691]1015        """Run as daemon forever"""
[256]1016
[691]1017        # Fork the first child
1018        #
1019        pid = os.fork()
1020        if pid > 0:
1021            sys.exit(0)  # end parent
[256]1022
[691]1023        # creates a session and sets the process group ID
1024        #
1025        os.setsid()
[318]1026
[691]1027        # Fork the second child
1028        #
1029        pid = os.fork()
1030        if pid > 0:
1031            sys.exit(0)  # end parent
[318]1032
[691]1033        write_pidfile()
[318]1034
[691]1035        # Go to the root directory and set the umask
1036        #
1037        os.chdir('/')
1038        os.umask(0)
[318]1039
[691]1040        sys.stdin.close()
1041        sys.stdout.close()
1042        sys.stderr.close()
[318]1043
[691]1044        os.open('/dev/null', os.O_RDWR)
1045        os.dup2(0, 1)
1046        os.dup2(0, 2)
[318]1047
[691]1048        self.run()
[318]1049
[691]1050    def run( self ):
[355]1051
[691]1052        """Main thread"""
[256]1053
[691]1054        while ( 1 ):
1055       
1056            self.getJobData()
1057            self.submitJobData()
1058            time.sleep( BATCH_POLL_INTERVAL )   
[256]1059
[623]1060# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1061# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1062# with 6.2.  See also the fixmes.
[256]1063
[507]1064class NoJobs (Exception):
[691]1065    """Exception raised by empty job list in qstat output."""
1066    pass
[256]1067
[507]1068class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
[691]1069    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
[318]1070
[691]1071    def __init__(self):
1072        self.value = ""
1073        self.joblist = []
1074        self.job = {}
1075        self.queue = ""
1076        self.in_joblist = False
1077        self.lrequest = False
1078        self.eltq = deque()
1079        xml.sax.handler.ContentHandler.__init__(self)
[318]1080
[691]1081    # The structure of the output is as follows (for SGE 6.0).  It's
1082    # similar for 6.1, but radically different for 6.2, and is
1083    # undocumented generally.  Unfortunately it's voluminous, and probably
1084    # doesn't scale to large clusters/queues.
[318]1085
[691]1086    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1087    #   <djob_info>
1088    #     <qmaster_response>  <!-- job -->
1089    #       ...
1090    #       <JB_ja_template> 
1091    #     <ulong_sublist>
1092    #     ...         <!-- start_time, state ... -->
1093    #     </ulong_sublist>
1094    #       </JB_ja_template> 
1095    #       <JB_ja_tasks>
1096    #     <ulong_sublist>
1097    #       ...       <!-- task info
1098    #     </ulong_sublist>
1099    #     ...
1100    #       </JB_ja_tasks>
1101    #       ...
1102    #     </qmaster_response>
1103    #   </djob_info>
1104    #   <messages>
1105    #   ...
[318]1106
[691]1107    # NB.  We might treat each task as a separate job, like
1108    # straight qstat output, but the web interface expects jobs to
1109    # be identified by integers, not, say, <job number>.<task>.
[318]1110
[691]1111    # So, I lied.  If the job list is empty, we get invalid XML
1112    # like this, which we need to defend against:
[318]1113
[691]1114    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1115    #   <>
1116    #     <ST_name>*</ST_name>
1117    #   </>
1118    # </unknown_jobs>
[318]1119
[691]1120    def startElement(self, name, attrs):
1121        self.value = ""
1122        if name == "djob_info":    # job list
1123            self.in_joblist = True
1124        # The job container is "qmaster_response" in SGE 6.0
1125        # and 6.1, but "element" in 6.2.  This is only the very
1126        # start of what's necessary for 6.2, though (sigh).
1127        elif (name == "qmaster_response" or name == "element") \
1128                and self.eltq[-1] == "djob_info": # job
1129            self.job = {"job_state": "U", "slots": 0,
1130                    "nodes": [], "queued_timestamp": "",
1131                    "queued_timestamp": "", "queue": "",
1132                    "ppn": "0", "RN_max": 0,
1133                    # fixme in endElement
1134                    "requested_memory": 0, "requested_time": 0
1135                    }
1136            self.joblist.append(self.job)
1137        elif name == "qstat_l_requests": # resource request
1138            self.lrequest = True
1139        elif name == "unknown_jobs":
1140            raise NoJobs
1141        self.eltq.append (name)
[318]1142
[691]1143    def characters(self, ch):
1144        self.value += ch
[318]1145
[691]1146    def endElement(self, name): 
1147        """Snarf job elements contents into job dictionary.
1148           Translate keys if appropriate."""
[318]1149
[691]1150        name_trans = {
1151          "JB_job_number": "number",
1152          "JB_job_name": "name", "JB_owner": "owner",
1153          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1154          "JB_submission_time": "queued_timestamp"
1155          }
1156        value = self.value
1157        self.eltq.pop ()
[318]1158
[691]1159        if name == "djob_info":
1160            self.in_joblist = False
1161            self.job = {}
1162        elif name == "JAT_master_queue":
1163            self.job["queue"] = value.split("@")[0]
1164        elif name == "JG_qhostname":
1165            if not (value in self.job["nodes"]):
1166                self.job["nodes"].append(value)
1167        elif name == "JG_slots": # slots in use
1168            self.job["slots"] += int(value)
1169        elif name == "RN_max": # requested slots (tasks or parallel)
1170            self.job["RN_max"] = max (self.job["RN_max"],
1171                          int(value))
1172        elif name == "JAT_state": # job state (bitwise or)
1173            value = int (value)
1174            # Status values from sge_jobL.h
1175            #define JIDLE           0x00000000
1176            #define JHELD           0x00000010
1177            #define JMIGRATING          0x00000020
1178            #define JQUEUED         0x00000040
1179            #define JRUNNING        0x00000080
1180            #define JSUSPENDED          0x00000100
1181            #define JTRANSFERING        0x00000200
1182            #define JDELETED        0x00000400
1183            #define JWAITING        0x00000800
1184            #define JEXITING        0x00001000
1185            #define JWRITTEN        0x00002000
1186            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1187            #define JFINISHED           0x00010000
1188            if value & 0x80:
1189                self.job["status"] = "R"
1190            elif value & 0x40:
1191                self.job["status"] = "Q"
1192            else:
1193                self.job["status"] = "O" # `other'
1194        elif name == "CE_name" and self.lrequest and self.value in \
1195                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1196            # We're in a container for an interesting resource
1197            # request; record which type.
1198            self.lrequest = self.value
1199        elif name == "CE_doubleval" and self.lrequest:
1200            # if we're in a container for an interesting
1201            # resource request, use the maxmimum of the hard
1202            # and soft requests to record the requested CPU
1203            # or core.  Fixme:  I'm not sure if this logic is
1204            # right.
1205            if self.lrequest in ("h_core", "s_core"):
1206                self.job["requested_memory"] = \
1207                    max (float (value),
1208                     self.job["requested_memory"])
1209            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1210            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1211                self.job["requested_time"] = \
1212                    max (float (value),
1213                     self.job["requested_time"])
1214        elif name == "qstat_l_requests":
1215            self.lrequest = False
1216        elif self.job and self.in_joblist:
1217            if name in name_trans:
1218                name = name_trans[name]
1219                self.job[name] = value
[318]1220
[507]1221# Abstracted from PBS original.
1222# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
[520]1223#
1224def do_nodelist( nodes ):
1225
[691]1226    """Translate node list as appropriate."""
[520]1227
[691]1228    nodeslist        = [ ]
1229    my_domain        = fqdn_parts( socket.getfqdn() )[1]
[520]1230
[691]1231    for node in nodes:
[520]1232
[691]1233        host        = node.split( '/' )[0] # not relevant for SGE
1234        h, host_domain    = fqdn_parts(host)
[520]1235
[691]1236        if host_domain == my_domain:
[520]1237
[691]1238            host    = h
[520]1239
[691]1240        if nodeslist.count( host ) == 0:
[520]1241
[691]1242            for translate_pattern in BATCH_HOST_TRANSLATE:
[520]1243
[691]1244                if translate_pattern.find( '/' ) != -1:
[520]1245
[691]1246                    translate_orig    = \
1247                        translate_pattern.split( '/' )[1]
1248                    translate_new    = \
1249                        translate_pattern.split( '/' )[2]
1250                    host = re.sub( translate_orig,
1251                               translate_new, host )
1252            if not host in nodeslist:
1253                nodeslist.append( host )
1254    return nodeslist
[318]1255
[837]1256class SLURMDataGatherer( DataGatherer ):
1257
1258    global pyslurm
1259
1260    """This is the DataGatherer for SLURM"""
1261
1262    def __init__( self ):
1263
1264        """Setup appropriate variables"""
1265
1266        self.jobs       = { }
1267        self.timeoffset = 0
1268        self.dp         = DataProcessor()
1269
1270    def getNodeData( self ):
1271
[866]1272        slurm_type  = pyslurm.node()
[837]1273
[866]1274        slurm_nodes = slurm_type.get()
[837]1275
[866]1276        nodedict    = { }
1277
1278        for node, attrs in slurm_nodes.items():
1279
1280            ( num_state, name_state ) = attrs['node_state'] 
1281
1282            if name_state == 'DOWN':
1283
1284                nodedict[ node ] = { 'state' : 'down' }
1285
1286            elif name_state == 'DRAIN':
1287
1288                nodedict[ node ] = { 'state' : 'offline' }
1289
[837]1290        return nodedict
1291
1292    def getJobData( self ):
1293
1294        """Gather all data on current jobs"""
1295
1296        joblist            = {}
1297
1298        self.cur_time  = time.time()
1299
1300        slurm_type = pyslurm.job()
1301        joblist    = slurm_type.get()
1302
1303        jobs_processed    = [ ]
1304
1305        for name, attrs in joblist.items():
1306            display_queue = 1
1307            job_id        = name
1308
1309            name          = self.getAttr( attrs, 'name' )
1310            queue         = self.getAttr( attrs, 'partition' )
1311
1312            if QUEUE:
1313                for q in QUEUE:
1314                    if q == queue:
1315                        display_queue = 1
1316                        break
1317                    else:
1318                        display_queue = 0
1319                        continue
1320            if display_queue == 0:
1321                continue
1322
1323            owner_uid        = attrs[ 'user_id' ]
1324            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1325
1326            requested_time   = self.getAttr( attrs, 'time_limit' )
[852]1327            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
[837]1328
[852]1329            if min_memory == 0:
1330
1331                requested_memory = ''
1332
1333            else:
1334                requested_memory = min_memory
1335
[853]1336            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
[837]1337
[853]1338            if min_cpus == 0:
1339
1340                ppn = ''
1341
1342            else:
1343                ppn = min_cpus
1344
[837]1345            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1346
1347            status = 'Q'
1348
1349            if status_long == 'RUNNING':
1350
1351                status = 'R'
1352
1353            elif status_long == 'COMPLETED':
1354
1355                continue
1356
1357            jobs_processed.append( job_id )
1358
1359            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1360
1361            start_timestamp = ''
1362            nodeslist       = ''
1363
1364            if status == 'R':
1365
1366                start_timestamp = self.getAttr( attrs, 'start_time' )
[851]1367                nodes           = attrs[ 'nodes' ]
[837]1368
[851]1369                if not nodes:
[837]1370
[851]1371                    # This should not happen
1372
1373                    # Something wrong: running but 'nodes' returned empty by pyslurm
1374                    # Possible pyslurm bug: abort/quit/warning
1375
1376                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1377
1378                    print err_msg
1379                    debug_msg( 0, err_msg )
1380                    sys.exit(1)
1381
1382                my_nodelist = [ ]
1383
1384                slurm_hostlist  = pyslurm.hostlist()
1385                slurm_hostlist.create( nodes )
1386                slurm_hostlist.uniq()
1387
1388                while slurm_hostlist.count() > 0:
1389
1390                    my_nodelist.append( slurm_hostlist.pop() )
1391
1392                slurm_hostlist.destroy()
1393
1394                del slurm_hostlist
1395
1396                nodeslist       = do_nodelist( my_nodelist )
1397
[837]1398                if DETECT_TIME_DIFFS:
1399
1400                    # If a job start if later than our current date,
1401                    # that must mean the Torque server's time is later
1402                    # than our local time.
1403               
1404                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1405
1406                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1407
1408            elif status == 'Q':
1409
1410                nodeslist       = str( attrs[ 'num_nodes' ] )
1411
1412            else:
1413                start_timestamp = ''
1414                nodeslist       = ''
1415
1416            myAttrs                = { }
1417
1418            myAttrs[ 'name' ]             = str( name )
1419            myAttrs[ 'queue' ]            = str( queue )
1420            myAttrs[ 'owner' ]            = str( owner )
1421            myAttrs[ 'requested_time' ]   = str( requested_time )
1422            myAttrs[ 'requested_memory' ] = str( requested_memory )
1423            myAttrs[ 'ppn' ]              = str( ppn )
1424            myAttrs[ 'status' ]           = str( status )
1425            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1426            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1427            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1428            myAttrs[ 'nodes' ]            = nodeslist
1429            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1430            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1431
1432            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1433
1434                self.jobs[ job_id ] = myAttrs
1435
1436        for id, attrs in self.jobs.items():
1437
1438            if id not in jobs_processed:
1439
1440                # This one isn't there anymore; toedeledoki!
1441                #
1442                del self.jobs[ id ]
1443
[318]1444class SgeDataGatherer(DataGatherer):
1445
[691]1446    jobs = {}
[61]1447
[691]1448    def __init__( self ):
1449        self.jobs = {}
1450        self.timeoffset = 0
1451        self.dp = DataProcessor()
[318]1452
[691]1453    def getJobData( self ):
1454        """Gather all data on current jobs in SGE"""
[318]1455
[691]1456        import popen2
[318]1457
[691]1458        self.cur_time = 0
1459        queues = ""
1460        if QUEUE:    # only for specific queues
1461            # Fixme:  assumes queue names don't contain single
1462            # quote or comma.  Don't know what the SGE rules are.
1463            queues = " -q '" + string.join (QUEUE, ",") + "'"
1464        # Note the comment in SgeQstatXMLParser about scaling with
1465        # this method of getting data.  I haven't found better one.
1466        # Output with args `-xml -ext -f -r' is easier to parse
1467        # in some ways, harder in others, but it doesn't provide
1468        # the submission time (at least SGE 6.0).  The pipeline
1469        # into sed corrects bogus XML observed with a configuration
1470        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1471        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
[623]1472sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
[691]1473                           + queues, True)
1474        qstatparser = SgeQstatXMLParser()
1475        parse_err = 0
1476        try:
1477            xml.sax.parse(piping.fromchild, qstatparser)
1478        except NoJobs:
1479            pass
1480        except:
1481            parse_err = 1
[704]1482        if piping.wait():
1483            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
[691]1484            return None
1485        elif parse_err:
1486            debug_msg(10, "Bad XML output from qstat"())
1487            exit (1)
1488        for f in piping.fromchild, piping.tochild, piping.childerr:
1489            f.close()
1490        self.cur_time = time.time()
1491        jobs_processed = []
1492        for job in qstatparser.joblist:
1493            job_id = job["number"]
1494            if job["status"] in [ 'Q', 'R' ]:
1495                jobs_processed.append(job_id)
1496            if job["status"] == "R":
1497                job["nodes"] = do_nodelist (job["nodes"])
1498                # Fixme: why is job["nodes"] sometimes null?
1499                try:
1500                    # Fixme: Is this sensible?  The
1501                    # PBS-type PPN isn't something you use
1502                    # with SGE.
[704]1503                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
[691]1504                except:
1505                    job["ppn"] = 0
1506                if DETECT_TIME_DIFFS:
1507                    # If a job start is later than our
1508                    # current date, that must mean
1509                    # the SGE server's time is later
1510                    # than our local time.
[704]1511                    start_timestamp = int (job["start_timestamp"])
1512                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
[318]1513
[704]1514                        self.timeoffset    = start_timestamp - int(self.cur_time)
[691]1515            else:
1516                # fixme: Note sure what this should be:
1517                job["ppn"] = job["RN_max"]
1518                job["nodes"] = "1"
[318]1519
[691]1520            myAttrs = {}
1521            for attr in ["name", "queue", "owner",
1522                     "requested_time", "status",
1523                     "requested_memory", "ppn",
1524                     "start_timestamp", "queued_timestamp"]:
1525                myAttrs[attr] = str(job[attr])
1526            myAttrs["nodes"] = job["nodes"]
[704]1527            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
[691]1528            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1529            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
[318]1530
[704]1531            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
[691]1532                self.jobs[job_id] = myAttrs
1533        for id, attrs in self.jobs.items():
1534            if id not in jobs_processed:
1535                del self.jobs[id]
[318]1536
[524]1537# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1538# Requres LSFObject http://sourceforge.net/projects/lsfobject
1539#
1540class LsfDataGatherer(DataGatherer):
[525]1541
[691]1542    """This is the DataGatherer for LSf"""
[524]1543
[691]1544    global lsfObject
[524]1545
[691]1546    def __init__( self ):
[525]1547
[691]1548        self.jobs = { }
1549        self.timeoffset = 0
1550        self.dp = DataProcessor()
1551        self.initLsfQuery()
[524]1552
[691]1553    def _countDuplicatesInList( self, dupedList ):
[525]1554
[691]1555        countDupes    = { }
[525]1556
[691]1557        for item in dupedList:
[525]1558
[691]1559            if not countDupes.has_key( item ):
[525]1560
[691]1561                countDupes[ item ]    = 1
1562            else:
1563                countDupes[ item ]    = countDupes[ item ] + 1
[525]1564
[691]1565        dupeCountList    = [ ]
[525]1566
[691]1567        for item, count in countDupes.items():
[525]1568
[691]1569            dupeCountList.append( ( item, count ) )
[525]1570
[691]1571        return dupeCountList
[524]1572#
1573#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1574#print _countDuplicatesInList(lst)
1575#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1576########################
1577
[691]1578    def initLsfQuery( self ):
1579        self.pq = None
1580        self.pq = lsfObject.jobInfoEntObject()
[524]1581
[691]1582    def getJobData( self, known_jobs="" ):
1583        """Gather all data on current jobs in LSF"""
1584        if len( known_jobs ) > 0:
1585            jobs = known_jobs
1586        else:
1587            jobs = { }
1588        joblist = {}
1589        joblist = self.pq.getJobInfo()
1590        nodelist = ''
[524]1591
[691]1592        self.cur_time = time.time()
[524]1593
[691]1594        jobs_processed = [ ]
[524]1595
[691]1596        for name, attrs in joblist.items():
1597            job_id = str(name)
1598            jobs_processed.append( job_id )
1599            name = self.getAttr( attrs, 'jobName' )
1600            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1601            owner = self.getAttr( attrs, 'user' )
[524]1602
1603### THIS IS THE rLimit List index values
[691]1604#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1605#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1606#define LSF_RLIMIT_DATA     2        /* data size */
1607#define LSF_RLIMIT_STACK    3        /* stack size */
1608#define LSF_RLIMIT_CORE     4        /* core file size */
1609#define LSF_RLIMIT_RSS      5        /* resident set size */
1610#define LSF_RLIMIT_NOFILE   6        /* open files */
1611#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1612#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
[524]1613#define LSF_RLIMIT_SWAP     8
[691]1614#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1615#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1616#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1617#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
[524]1618
[691]1619            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1620            if requested_time == -1: 
1621                requested_time = ""
1622            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1623            if requested_memory == -1: 
1624                requested_memory = ""
[524]1625# This tries to get proc per node. We don't support this right now
[691]1626            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1627            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1628            if requested_cpus == None or requested_cpus == "":
1629                requested_cpus = 1
[524]1630
[691]1631            if QUEUE:
1632                for q in QUEUE:
1633                    if q == queue:
1634                        display_queue = 1
1635                        break
1636                    else:
1637                        display_queue = 0
1638                        continue
1639            if display_queue == 0:
1640                continue
[524]1641
[691]1642            runState = self.getAttr( attrs, 'status' )
1643            if runState == 4:
1644                status = 'R'
1645            else:
1646                status = 'Q'
1647            queued_timestamp = self.getAttr( attrs, 'submitTime' )
[524]1648
[691]1649            if status == 'R':
1650                start_timestamp = self.getAttr( attrs, 'startTime' )
1651                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1652                nodelist = nodesCpu.keys()
[524]1653
[691]1654                if DETECT_TIME_DIFFS:
[524]1655
[691]1656                    # If a job start if later than our current date,
1657                    # that must mean the Torque server's time is later
1658                    # than our local time.
[524]1659
[691]1660                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
[524]1661
[691]1662                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[524]1663
[691]1664            elif status == 'Q':
1665                start_timestamp = ''
1666                count_mynodes = 0
1667                numeric_node = 1
1668                nodelist = ''
[524]1669
[691]1670            myAttrs = { }
1671            if name == "":
1672                myAttrs['name'] = "none"
1673            else:
1674                myAttrs['name'] = name
[524]1675
[691]1676            myAttrs[ 'owner' ]        = owner
1677            myAttrs[ 'requested_time' ]    = str(requested_time)
1678            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1679            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1680            myAttrs[ 'ppn' ]        = str( ppn )
1681            myAttrs[ 'status' ]        = status
1682            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1683            myAttrs[ 'queue' ]        = str(queue)
1684            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1685            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1686            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1687            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1688            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
[524]1689
[691]1690            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1691                jobs[ job_id ] = myAttrs
[524]1692
[691]1693                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
[524]1694
[691]1695        for id, attrs in jobs.items():
1696            if id not in jobs_processed:
1697                # This one isn't there anymore
1698                #
1699                del jobs[ id ]
1700        self.jobs=jobs
[524]1701
1702
[355]1703class PbsDataGatherer( DataGatherer ):
[318]1704
[691]1705    """This is the DataGatherer for PBS and Torque"""
[318]1706
[691]1707    global PBSQuery, PBSError
[256]1708
[691]1709    def __init__( self ):
[354]1710
[691]1711        """Setup appropriate variables"""
[23]1712
[785]1713        self.jobs       = { }
1714        self.timeoffset = 0
1715        self.dp         = DataProcessor()
[354]1716
[691]1717        self.initPbsQuery()
[23]1718
[691]1719    def initPbsQuery( self ):
[91]1720
[785]1721        self.pq = None
[354]1722
[788]1723        try:
[354]1724
[788]1725            if( BATCH_SERVER ):
[91]1726
[788]1727                self.pq = PBSQuery( BATCH_SERVER )
1728            else:
1729                self.pq = PBSQuery()
1730
1731        except PBSError, details:
1732            print 'Cannot connect to pbs server'
1733            print details
1734            sys.exit( 1 )
1735
[691]1736        try:
[791]1737            # TODO: actually use new data structure
[691]1738            self.pq.old_data_structure()
[656]1739
[691]1740        except AttributeError:
[656]1741
[691]1742            # pbs_query is older
1743            #
1744            pass
[656]1745
[790]1746    def getNodeData( self ):
1747
1748        nodedict = { }
1749
1750        try:
1751            nodedict = self.pq.getnodes()
1752
1753        except PBSError, detail:
1754
1755            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1756
1757        return nodedict
1758
[691]1759    def getJobData( self ):
[354]1760
[691]1761        """Gather all data on current jobs in Torque"""
[26]1762
[785]1763        joblist            = {}
1764        self.cur_time      = 0
[349]1765
[691]1766        try:
1767            joblist        = self.pq.getjobs()
[785]1768            self.cur_time  = time.time()
[354]1769
[691]1770        except PBSError, detail:
[354]1771
[790]1772            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
[691]1773            return None
[354]1774
[691]1775        jobs_processed    = [ ]
[26]1776
[691]1777        for name, attrs in joblist.items():
[785]1778            display_queue = 1
1779            job_id        = name.split( '.' )[0]
[26]1780
[785]1781            name          = self.getAttr( attrs, 'Job_Name' )
1782            queue         = self.getAttr( attrs, 'queue' )
[317]1783
[691]1784            if QUEUE:
1785                for q in QUEUE:
1786                    if q == queue:
1787                        display_queue = 1
1788                        break
1789                    else:
1790                        display_queue = 0
1791                        continue
1792            if display_queue == 0:
1793                continue
[317]1794
1795
[691]1796            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
[785]1797            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1798            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
[95]1799
[785]1800            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
[95]1801
[785]1802            ppn = ''
[281]1803
[691]1804            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
[95]1805
[785]1806                mynoderequest_fields = mynoderequest.split( ':' )
[281]1807
[691]1808                for mynoderequest_field in mynoderequest_fields:
[281]1809
[691]1810                    if mynoderequest_field.find( 'ppn' ) != -1:
[281]1811
[785]1812                        ppn = mynoderequest_field.split( 'ppn=' )[1]
[281]1813
[785]1814            status = self.getAttr( attrs, 'job_state' )
[25]1815
[691]1816            if status in [ 'Q', 'R' ]:
[450]1817
[691]1818                jobs_processed.append( job_id )
[450]1819
[785]1820            queued_timestamp = self.getAttr( attrs, 'ctime' )
[243]1821
[691]1822            if status == 'R':
[133]1823
[785]1824                start_timestamp = self.getAttr( attrs, 'mtime' )
1825                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
[133]1826
[785]1827                nodeslist       = do_nodelist( nodes )
[354]1828
[691]1829                if DETECT_TIME_DIFFS:
[185]1830
[691]1831                    # If a job start if later than our current date,
1832                    # that must mean the Torque server's time is later
1833                    # than our local time.
1834               
1835                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
[185]1836
[785]1837                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
[185]1838
[691]1839            elif status == 'Q':
[95]1840
[691]1841                # 'mynodequest' can be a string in the following syntax according to the
1842                # Torque Administator's manual:
1843                #
1844                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1845                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1846                # etc
1847                #
[451]1848
[691]1849                #
1850                # For now we only count the amount of nodes request and ignore properties
1851                #
[451]1852
[785]1853                start_timestamp = ''
1854                count_mynodes   = 0
[354]1855
[691]1856                for node in mynoderequest.split( '+' ):
[67]1857
[691]1858                    # Just grab the {node_count|hostname} part and ignore properties
1859                    #
[785]1860                    nodepart     = node.split( ':' )[0]
[67]1861
[691]1862                    # Let's assume a node_count value
1863                    #
[785]1864                    numeric_node = 1
[451]1865
[691]1866                    # Chop the value up into characters
1867                    #
1868                    for letter in nodepart:
[67]1869
[691]1870                        # If this char is not a digit (0-9), this must be a hostname
1871                        #
1872                        if letter not in string.digits:
[133]1873
[785]1874                            numeric_node = 0
[133]1875
[691]1876                    # If this is a hostname, just count this as one (1) node
1877                    #
1878                    if not numeric_node:
[354]1879
[785]1880                        count_mynodes = count_mynodes + 1
[691]1881                    else:
[451]1882
[691]1883                        # If this a number, it must be the node_count
1884                        # and increase our count with it's value
1885                        #
1886                        try:
[785]1887                            count_mynodes = count_mynodes + int( nodepart )
[354]1888
[691]1889                        except ValueError, detail:
[354]1890
[691]1891                            # When we arrive here I must be bugged or very confused
1892                            # THIS SHOULD NOT HAPPEN!
1893                            #
1894                            debug_msg( 10, str( detail ) )
1895                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1896                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1897                            debug_msg( 10, 'job = ' + str( name ) )
1898                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1899                       
[785]1900                nodeslist       = str( count_mynodes )
[691]1901            else:
[785]1902                start_timestamp = ''
1903                nodeslist       = ''
[133]1904
[691]1905            myAttrs                = { }
[26]1906
[785]1907            myAttrs[ 'name' ]             = str( name )
1908            myAttrs[ 'queue' ]            = str( queue )
1909            myAttrs[ 'owner' ]            = str( owner )
1910            myAttrs[ 'requested_time' ]   = str( requested_time )
1911            myAttrs[ 'requested_memory' ] = str( requested_memory )
1912            myAttrs[ 'ppn' ]              = str( ppn )
1913            myAttrs[ 'status' ]           = str( status )
1914            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1915            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1916            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1917            myAttrs[ 'nodes' ]            = nodeslist
1918            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
[691]1919            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
[354]1920
[691]1921            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
[61]1922
[785]1923                self.jobs[ job_id ] = myAttrs
[26]1924
[691]1925        for id, attrs in self.jobs.items():
[76]1926
[691]1927            if id not in jobs_processed:
[76]1928
[691]1929                # This one isn't there anymore; toedeledoki!
1930                #
1931                del self.jobs[ id ]
[76]1932
[362]1933GMETRIC_DEFAULT_TYPE    = 'string'
1934GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1935GMETRIC_DEFAULT_PORT    = '8649'
[700]1936GMETRIC_DEFAULT_UNITS   = ''
[362]1937
1938class Gmetric:
1939
[691]1940    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
[362]1941
[700]1942    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1943    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1944    protocol        = ( 'udp', 'multicast' )
[362]1945
[691]1946    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
[700]1947               
[691]1948        global GMETRIC_DEFAULT_TYPE
[362]1949
[691]1950        self.prot       = self.checkHostProtocol( host )
[700]1951        self.data_msg   = xdrlib.Packer()
1952        self.meta_msg   = xdrlib.Packer()
[691]1953        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
[362]1954
[691]1955        if self.prot not in self.protocol:
[362]1956
[691]1957            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
[362]1958
[691]1959        if self.prot == 'multicast':
[362]1960
[691]1961            # Set multicast options
1962            #
1963            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
[362]1964
[691]1965        self.hostport   = ( host, int( port ) )
1966        self.slopestr   = 'both'
1967        self.tmax       = 60
[362]1968
[691]1969    def checkHostProtocol( self, ip ):
[362]1970
[691]1971        """Detect if a ip adress is a multicast address"""
[471]1972
[691]1973        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1974        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
[362]1975
[700]1976        ip_fields               = ip.split( '.' )
[362]1977
[691]1978        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
[362]1979
[691]1980            return 'multicast'
1981        else:
1982            return 'udp'
[362]1983
[691]1984    def send( self, name, value, dmax, typestr = '', units = '' ):
[362]1985
[691]1986        if len( units ) == 0:
[700]1987            units       = GMETRIC_DEFAULT_UNITS
[471]1988
[691]1989        if len( typestr ) == 0:
[700]1990            typestr     = GMETRIC_DEFAULT_TYPE
[362]1991
[700]1992        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
[409]1993
[700]1994        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1995        data_rt = self.socket.sendto( data_msg, self.hostport )
[362]1996
[700]1997        return ( meta_rt, data_rt )
[362]1998
[700]1999    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
2000
2001        hostname = "unset"
2002
[691]2003        if slopestr not in self.slope:
[362]2004
[691]2005            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
[362]2006
[691]2007        if typestr not in self.type:
[362]2008
[691]2009            raise ValueError( "Type must be one of: " + str( self.type ) )
[362]2010
[691]2011        if len( name ) == 0:
[362]2012
[691]2013            raise ValueError( "Name must be non-empty" )
[362]2014
[700]2015        self.meta_msg.reset()
2016        self.meta_msg.pack_int( 128 )
[362]2017
[700]2018        if not spoof:
2019            self.meta_msg.pack_string( hostname )
2020        else:
2021            self.meta_msg.pack_string( spoof )
[362]2022
[700]2023        self.meta_msg.pack_string( name )
2024
2025        if not spoof:
2026            self.meta_msg.pack_int( 0 )
2027        else:
2028            self.meta_msg.pack_int( 1 )
2029           
2030        self.meta_msg.pack_string( typestr )
2031        self.meta_msg.pack_string( name )
2032        self.meta_msg.pack_string( unitstr )
2033        self.meta_msg.pack_int( self.slope[ slopestr ] )
2034        self.meta_msg.pack_uint( int( tmax ) )
2035        self.meta_msg.pack_uint( int( dmax ) )
2036
2037        if not group:
2038            self.meta_msg.pack_int( 0 )
2039        else:
2040            self.meta_msg.pack_int( 1 )
2041            self.meta_msg.pack_string( "GROUP" )
2042            self.meta_msg.pack_string( group )
2043
2044        self.data_msg.reset()
2045        self.data_msg.pack_int( 128+5 )
2046
2047        if not spoof:
2048            self.data_msg.pack_string( hostname )
2049        else:
2050            self.data_msg.pack_string( spoof )
2051
2052        self.data_msg.pack_string( name )
2053
2054        if not spoof:
2055            self.data_msg.pack_int( 0 )
2056        else:
2057            self.data_msg.pack_int( 1 )
2058
2059        self.data_msg.pack_string( "%s" )
2060        self.data_msg.pack_string( str( value ) )
2061
2062        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2063
[26]2064def printTime( ):
[354]2065
[691]2066    """Print current time/date in human readable format for log/debug"""
[26]2067
[691]2068    return time.strftime("%a, %d %b %Y %H:%M:%S")
[26]2069
2070def debug_msg( level, msg ):
[354]2071
[691]2072    """Print msg if at or above current debug level"""
[26]2073
[691]2074    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
[377]2075
[691]2076    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2077        sys.stderr.write( msg + '\n' )
[26]2078
[691]2079    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2080        syslog.syslog( msg )
[373]2081
[307]2082def write_pidfile():
2083
[691]2084    # Write pidfile if PIDFILE is set
2085    #
2086    if PIDFILE:
[307]2087
[691]2088        pid    = os.getpid()
[354]2089
[691]2090        pidfile    = open( PIDFILE, 'w' )
[354]2091
[691]2092        pidfile.write( str( pid ) )
2093        pidfile.close()
[307]2094
[23]2095def main():
[354]2096
[691]2097    """Application start"""
[23]2098
[837]2099    global PBSQuery, PBSError, lsfObject, pyslurm
[854]2100    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
[256]2101
[691]2102    if not processArgs( sys.argv[1:] ):
[354]2103
[691]2104        sys.exit( 1 )
[212]2105
[691]2106    # Load appropriate DataGatherer depending on which BATCH_API is set
2107    # and any required modules for the Gatherer
2108    #
2109    if BATCH_API == 'pbs':
[256]2110
[691]2111        try:
2112            from PBSQuery import PBSQuery, PBSError
[256]2113
[787]2114        except ImportError, details:
[256]2115
[854]2116            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
[787]2117            print details
[691]2118            sys.exit( 1 )
[256]2119
[691]2120        gather = PbsDataGatherer()
[256]2121
[691]2122    elif BATCH_API == 'sge':
[256]2123
[854]2124        if BATCH_SERVER != 'localhost':
2125
2126            # Print and log, but continue execution
2127            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2128            print err_msg
2129            debug_msg( 0, err_msg )
2130
[691]2131        # Tested with SGE 6.0u11.
2132        #
2133        gather = SgeDataGatherer()
[368]2134
[691]2135    elif BATCH_API == 'lsf':
[368]2136
[854]2137        if BATCH_SERVER != 'localhost':
2138
2139            # Print and log, but continue execution
2140            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2141            print err_msg
2142            debug_msg( 0, err_msg )
2143
[691]2144        try:
2145            from lsfObject import lsfObject
2146        except:
[854]2147            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2148            sys.exit( 1 )
[256]2149
[691]2150        gather = LsfDataGatherer()
[524]2151
[837]2152    elif BATCH_API == 'slurm':
2153
[854]2154        if BATCH_SERVER != 'localhost':
2155
2156            # Print and log, but continue execution
2157            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2158            print err_msg
2159            debug_msg( 0, err_msg )
2160
[837]2161        try:
2162            import pyslurm
2163        except:
2164            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
[854]2165            sys.exit( 1 )
[837]2166
2167        gather = SLURMDataGatherer()
2168
[691]2169    else:
[786]2170        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
[354]2171
[691]2172        sys.exit( 1 )
[256]2173
[691]2174    if( DAEMONIZE and USE_SYSLOG ):
[373]2175
[691]2176        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[373]2177
[691]2178    if DAEMONIZE:
[354]2179
[691]2180        gather.daemon()
2181    else:
2182        gather.run()
[23]2183
[256]2184# wh00t? someone started me! :)
[65]2185#
[23]2186if __name__ == '__main__':
[691]2187    main()
Note: See TracBrowser for help on using the repository browser.