Changeset 659 for trunk/jobmond


Ignore:
Timestamp:
09/03/12 12:08:10 (9 years ago)
Author:
ramonb
Message:
  • tab spacing fixed
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobmond/jobmond.py

    r622 r659  
    3232def usage( ver ):
    3333
    34         print 'jobmond %s' %VERSION
    35 
    36         if ver:
    37                 return 0
    38 
    39         print
    40         print 'Purpose:'
    41         print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
    42         print '  to Ganglia, which can be viewed with Job Monarch web frontend'
    43         print
    44         print 'Usage:   jobmond [OPTIONS]'
    45         print
    46         print '  -c, --config=FILE      The configuration file to use (default: /etc/jobmond.conf)'
    47         print '  -p, --pidfile=FILE     Use pid file to store the process id'
    48         print '  -h, --help             Print help and exit'
    49         print '  -v, --version          Print version and exit'
    50         print
     34    print 'jobmond %s' %VERSION
     35
     36    if ver:
     37        return 0
     38
     39    print
     40    print 'Purpose:'
     41    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
     42    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
     43    print
     44    print 'Usage:   jobmond [OPTIONS]'
     45    print
     46    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
     47    print '  -p, --pidfile=FILE Use pid file to store the process id'
     48    print '  -h, --help     Print help and exit'
     49    print '  -v, --version          Print version and exit'
     50    print
    5151
    5252def processArgs( args ):
    5353
    54         SHORT_L         = 'p:hvc:'
    55         LONG_L          = [ 'help', 'config=', 'pidfile=', 'version' ]
    56 
    57         global PIDFILE
    58         PIDFILE         = None
    59 
    60         config_filename = '/etc/jobmond.conf'
    61 
    62         try:
    63 
    64                 opts, args      = getopt.getopt( args, SHORT_L, LONG_L )
    65 
    66         except getopt.GetoptError, detail:
    67 
    68                 print detail
    69                 usage()
    70                 sys.exit( 1 )
    71 
    72         for opt, value in opts:
    73 
    74                 if opt in [ '--config', '-c' ]:
    75                
    76                         config_filename = value
    77 
    78                 if opt in [ '--pidfile', '-p' ]:
    79 
    80                         PIDFILE         = value
    81                
    82                 if opt in [ '--help', '-h' ]:
     54    SHORT_L     = 'p:hvc:'
     55    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
     56
     57    global PIDFILE
     58    PIDFILE     = None
     59
     60    config_filename = '/etc/jobmond.conf'
     61
     62    try:
     63
     64        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
     65
     66    except getopt.GetoptError, detail:
     67
     68        print detail
     69        usage()
     70        sys.exit( 1 )
     71
     72    for opt, value in opts:
     73
     74        if opt in [ '--config', '-c' ]:
     75       
     76            config_filename = value
     77
     78        if opt in [ '--pidfile', '-p' ]:
     79
     80            PIDFILE     = value
     81       
     82        if opt in [ '--help', '-h' ]:
    8383 
    84                         usage( False )
    85                         sys.exit( 0 )
    86 
    87                 if opt in [ '--version', '-v' ]:
    88 
    89                         usage( True )
    90                         sys.exit( 0 )
    91 
    92         return loadConfig( config_filename )
     84            usage( False )
     85            sys.exit( 0 )
     86
     87        if opt in [ '--version', '-v' ]:
     88
     89            usage( True )
     90            sys.exit( 0 )
     91
     92    return loadConfig( config_filename )
    9393
    9494# Fixme:  This doesn't DTRT with commented-out bits of the file.  E.g.
     
    9797class GangliaConfigParser:
    9898
    99         def __init__( self, config_file ):
    100 
    101                 self.config_file        = config_file
    102 
    103                 if not os.path.exists( self.config_file ):
    104 
    105                         debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
    106                         sys.exit( 1 )
    107 
    108         def removeQuotes( self, value ):
    109 
    110                 clean_value     = value
    111                 clean_value     = clean_value.replace( "'", "" )
    112                 clean_value     = clean_value.replace( '"', '' )
    113                 clean_value     = clean_value.strip()
    114 
    115                 return clean_value
    116 
    117         def getVal( self, section, valname ):
    118 
    119                 cfg_fp          = open( self.config_file )
    120                 section_start   = False
    121                 section_found   = False
    122                 value           = None
    123 
    124                 for line in cfg_fp.readlines():
    125 
    126                         if line.find( section ) != -1:
    127 
    128                                 section_found   = True
    129 
    130                         if line.find( '{' ) != -1 and section_found:
    131 
    132                                 section_start   = True
    133 
    134                         if line.find( '}' ) != -1 and section_found:
    135 
    136                                 section_start   = False
    137                                 section_found   = False
    138 
    139                         if line.find( valname ) != -1 and section_start:
    140 
    141                                 value           = string.join( line.split( '=' )[1:], '' ).strip()
    142 
    143                 cfg_fp.close()
    144 
    145                 return value
    146 
    147         def getInt( self, section, valname ):
    148 
    149                 value   = self.getVal( section, valname )
    150 
    151                 if not value:
    152                         return False
    153 
    154                 value   = self.removeQuotes( value )
    155 
    156                 return int( value )
    157 
    158         def getStr( self, section, valname ):
    159 
    160                 value   = self.getVal( section, valname )
    161 
    162                 if not value:
    163                         return False
    164 
    165                 value   = self.removeQuotes( value )
    166 
    167                 return str( value )
     99    def __init__( self, config_file ):
     100
     101        self.config_file    = config_file
     102
     103        if not os.path.exists( self.config_file ):
     104
     105            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
     106            sys.exit( 1 )
     107
     108    def removeQuotes( self, value ):
     109
     110        clean_value = value
     111        clean_value = clean_value.replace( "'", "" )
     112        clean_value = clean_value.replace( '"', '' )
     113        clean_value = clean_value.strip()
     114
     115        return clean_value
     116
     117    def getVal( self, section, valname ):
     118
     119        cfg_fp      = open( self.config_file )
     120        section_start   = False
     121        section_found   = False
     122        value       = None
     123
     124        for line in cfg_fp.readlines():
     125
     126            if line.find( section ) != -1:
     127
     128                section_found   = True
     129
     130            if line.find( '{' ) != -1 and section_found:
     131
     132                section_start   = True
     133
     134            if line.find( '}' ) != -1 and section_found:
     135
     136                section_start   = False
     137                section_found   = False
     138
     139            if line.find( valname ) != -1 and section_start:
     140
     141                value       = string.join( line.split( '=' )[1:], '' ).strip()
     142
     143        cfg_fp.close()
     144
     145        return value
     146
     147    def getInt( self, section, valname ):
     148
     149        value   = self.getVal( section, valname )
     150
     151        if not value:
     152            return False
     153
     154        value   = self.removeQuotes( value )
     155
     156        return int( value )
     157
     158    def getStr( self, section, valname ):
     159
     160        value   = self.getVal( section, valname )
     161
     162        if not value:
     163            return False
     164
     165        value   = self.removeQuotes( value )
     166
     167        return str( value )
    168168
    169169def findGmetric():
    170170
    171         for dir in os.path.expandvars( '$PATH' ).split( ':' ):
    172 
    173                 guess   = '%s/%s' %( dir, 'gmetric' )
    174 
    175                 if os.path.exists( guess ):
    176 
    177                         return guess
    178 
    179         return False
     171    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
     172
     173        guess   = '%s/%s' %( dir, 'gmetric' )
     174
     175        if os.path.exists( guess ):
     176
     177            return guess
     178
     179    return False
    180180
    181181def loadConfig( filename ):
     
    213213                return my_list
    214214
    215         cfg             = ConfigParser.ConfigParser()
    216 
    217         cfg.read( filename )
    218 
    219         global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
    220         global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
    221         global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
    222         global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
    223 
    224         DEBUG_LEVEL     = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
    225 
    226         DAEMONIZE       = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
    227 
    228         SYSLOG_LEVEL    = -1
    229         SYSLOG_FACILITY = None
    230 
    231         try:
    232                 USE_SYSLOG      = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
    233 
    234         except ConfigParser.NoOptionError:
    235 
    236                 USE_SYSLOG      = True
    237 
    238                 debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
    239 
    240         if USE_SYSLOG:
    241 
    242                 try:
    243                         SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
    244 
    245                 except ConfigParser.NoOptionError:
    246 
    247                         debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
    248                         SYSLOG_LEVEL    = 0
    249 
    250                 try:
    251 
    252                         SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
    253 
    254                 except ConfigParser.NoOptionError:
    255 
    256                         SYSLOG_FACILITY = syslog.LOG_DAEMON
    257 
    258                         debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
    259 
    260         try:
    261 
    262                 BATCH_SERVER            = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
    263 
    264         except ConfigParser.NoOptionError:
    265 
    266                 # Backwards compatibility for old configs
    267                 #
    268 
    269                 BATCH_SERVER            = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
    270                 api_guess               = 'pbs'
    271        
    272         try:
    273        
    274                 BATCH_POLL_INTERVAL     = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
    275 
    276         except ConfigParser.NoOptionError:
    277 
    278                 # Backwards compatibility for old configs
    279                 #
    280 
    281                 BATCH_POLL_INTERVAL     = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
    282                 api_guess               = 'pbs'
    283        
    284         try:
    285 
    286                 GMOND_CONF              = cfg.get( 'DEFAULT', 'GMOND_CONF' )
    287 
    288         except ConfigParser.NoOptionError:
    289 
    290                 # Not specified: assume /etc/gmond.conf
    291                 #
    292                 GMOND_CONF              = '/etc/gmond.conf'
    293 
    294         ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
    295 
    296         # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
    297         #
    298         gmetric_dest_ip         = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
    299 
    300         if not gmetric_dest_ip:
    301 
    302                 # Maybe unicast target then
    303                 #
    304                 gmetric_dest_ip         = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
    305 
    306         gmetric_dest_port       = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
    307 
    308         if gmetric_dest_ip and gmetric_dest_port:
    309 
    310                 GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
    311         else:
    312 
    313                 debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
    314 
    315                 # Couldn't figure it out: let's see if it's in our jobmond.conf
    316                 #
    317                 try:
    318 
    319                         GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
    320 
    321                 # Guess not: now just give up
    322                 #
    323                 except ConfigParser.NoOptionError:
    324 
    325                         GMETRIC_TARGET  = None
    326 
    327                         debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
    328 
    329         gmetric_bin     = findGmetric()
    330 
    331         if gmetric_bin:
    332 
    333                 GMETRIC_BINARY          = gmetric_bin
    334         else:
    335                 debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
    336 
    337                 try:
    338 
    339                         GMETRIC_BINARY          = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
    340 
    341                 except ConfigParser.NoOptionError:
    342 
    343                         debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
    344                         sys.exit( 1 )
    345 
    346         DETECT_TIME_DIFFS       = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
    347 
    348         BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
    349 
    350         try:
    351 
    352                 BATCH_API       = cfg.get( 'DEFAULT', 'BATCH_API' )
    353 
    354         except ConfigParser.NoOptionError, detail:
    355 
    356                 if BATCH_SERVER and api_guess:
    357 
    358                         BATCH_API       = api_guess
    359                 else:
    360                         debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
    361                         sys.exit( 1 )
    362 
    363         try:
    364 
    365                 QUEUE           = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
    366 
    367         except ConfigParser.NoOptionError, detail:
    368 
    369                 QUEUE           = None
    370 
    371         return True
     215    cfg     = ConfigParser.ConfigParser()
     216
     217    cfg.read( filename )
     218
     219    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
     220    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
     221    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
     222    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
     223
     224    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
     225
     226    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
     227
     228    SYSLOG_LEVEL    = -1
     229    SYSLOG_FACILITY = None
     230
     231    try:
     232        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
     233
     234    except ConfigParser.NoOptionError:
     235
     236        USE_SYSLOG  = True
     237
     238        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
     239
     240    if USE_SYSLOG:
     241
     242        try:
     243            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
     244
     245        except ConfigParser.NoOptionError:
     246
     247            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
     248            SYSLOG_LEVEL    = 0
     249
     250        try:
     251
     252            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
     253
     254        except ConfigParser.NoOptionError:
     255
     256            SYSLOG_FACILITY = syslog.LOG_DAEMON
     257
     258            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
     259
     260    try:
     261
     262        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
     263
     264    except ConfigParser.NoOptionError:
     265
     266        # Backwards compatibility for old configs
     267        #
     268
     269        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
     270        api_guess       = 'pbs'
     271   
     272    try:
     273   
     274        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
     275
     276    except ConfigParser.NoOptionError:
     277
     278        # Backwards compatibility for old configs
     279        #
     280
     281        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
     282        api_guess       = 'pbs'
     283   
     284    try:
     285
     286        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
     287
     288    except ConfigParser.NoOptionError:
     289
     290        # Not specified: assume /etc/gmond.conf
     291        #
     292        GMOND_CONF      = '/etc/gmond.conf'
     293
     294    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
     295
     296    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
     297    #
     298    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
     299
     300    if not gmetric_dest_ip:
     301
     302        # Maybe unicast target then
     303        #
     304        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
     305
     306    gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
     307
     308    if gmetric_dest_ip and gmetric_dest_port:
     309
     310        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
     311    else:
     312
     313        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
     314
     315        # Couldn't figure it out: let's see if it's in our jobmond.conf
     316        #
     317        try:
     318
     319            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
     320
     321        # Guess not: now just give up
     322        #
     323        except ConfigParser.NoOptionError:
     324
     325            GMETRIC_TARGET  = None
     326
     327            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
     328
     329    gmetric_bin = findGmetric()
     330
     331    if gmetric_bin:
     332
     333        GMETRIC_BINARY      = gmetric_bin
     334    else:
     335        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
     336
     337        try:
     338
     339            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
     340
     341        except ConfigParser.NoOptionError:
     342
     343            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
     344            sys.exit( 1 )
     345
     346    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
     347
     348    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
     349
     350    try:
     351
     352        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
     353
     354    except ConfigParser.NoOptionError, detail:
     355
     356        if BATCH_SERVER and api_guess:
     357
     358            BATCH_API   = api_guess
     359        else:
     360            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
     361            sys.exit( 1 )
     362
     363    try:
     364
     365        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
     366
     367    except ConfigParser.NoOptionError, detail:
     368
     369        QUEUE       = None
     370
     371    return True
    372372
    373373def fqdn_parts (fqdn):
    374374
    375         """Return pair of host and domain for fully-qualified domain name arg."""
    376 
    377         parts = fqdn.split (".")
    378 
    379         return (parts[0], string.join(parts[1:], "."))
     375    """Return pair of host and domain for fully-qualified domain name arg."""
     376
     377    parts = fqdn.split (".")
     378
     379    return (parts[0], string.join(parts[1:], "."))
    380380
    381381METRIC_MAX_VAL_LEN = 900
     
    383383class DataProcessor:
    384384
    385         """Class for processing of data"""
    386 
    387         binary = None
    388 
    389         def __init__( self, binary=None ):
    390 
    391                 """Remember alternate binary location if supplied"""
    392 
    393                 global GMETRIC_BINARY
    394 
    395                 if binary:
    396                         self.binary = binary
    397 
    398                 if not self.binary:
    399                         self.binary = GMETRIC_BINARY
    400 
    401                 # Timeout for XML
    402                 #
    403                 # From ganglia's documentation:
    404                 #
    405                 # 'A metric will be deleted DMAX seconds after it is received, and
    406                 # DMAX=0 means eternal life.'
    407 
    408                 self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
    409 
    410                 if GMOND_CONF:
    411 
    412                         incompatible = self.checkGmetricVersion()
    413 
    414                         if incompatible:
    415 
    416                                 debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
    417                                 sys.exit( 1 )
    418 
    419         def checkGmetricVersion( self ):
    420 
    421                 """
    422                 Check version of gmetric is at least 3.0.1
    423                 for the syntax we use
    424                 """
    425 
    426                 global METRIC_MAX_VAL_LEN
    427 
    428                 incompatible    = 0
    429 
    430                 gfp             = os.popen( self.binary + ' --version' )
    431                 lines           = gfp.readlines()
    432 
    433                 gfp.close()
    434 
    435                 for line in lines:
    436 
    437                         line = line.split( ' ' )
    438 
    439                         if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
    440                        
    441                                 gmetric_version = line[1].split( '\n' )[0]
    442 
    443                                 version_major   = int( gmetric_version.split( '.' )[0] )
    444                                 version_minor   = int( gmetric_version.split( '.' )[1] )
    445                                 version_patch   = int( gmetric_version.split( '.' )[2] )
    446 
    447                                 incompatible    = 0
    448 
    449                                 if version_major < 3:
    450 
    451                                         incompatible = 1
    452                                
    453                                 elif version_major == 3:
    454 
    455                                         if version_minor == 0:
    456 
    457                                                 if version_patch < 1:
    458                                                
    459                                                         incompatible = 1
    460 
    461                                                 # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
    462                                                 #
    463                                                 if version_patch < 3:
    464 
    465                                                         METRIC_MAX_VAL_LEN = 900
    466 
    467                                                 elif version_patch >= 3:
    468 
    469                                                         METRIC_MAX_VAL_LEN = 1400
    470 
    471                 return incompatible
    472 
    473         def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
    474 
    475                 """Call gmetric binary and multicast"""
    476 
    477                 cmd = self.binary
    478 
    479                 if GMETRIC_TARGET:
    480 
    481                         GMETRIC_TARGET_HOST     = GMETRIC_TARGET.split( ':' )[0]
    482                         GMETRIC_TARGET_PORT     = GMETRIC_TARGET.split( ':' )[1]
    483 
    484                         metric_debug            = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
    485 
    486                         debug_msg( 10, printTime() + ' ' + metric_debug)
    487 
    488                         gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
    489 
    490                         gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
    491 
    492                 else:
    493                         try:
    494                                 cmd = cmd + ' -c' + GMOND_CONF
    495 
    496                         except NameError:
    497 
    498                                 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
    499 
    500                         cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
    501 
    502                         if len( units ) > 0:
    503 
    504                                 cmd = cmd + ' -u"' + units + '"'
    505 
    506                         debug_msg( 10, printTime() + ' ' + cmd )
    507 
    508                         os.system( cmd )
     385    """Class for processing of data"""
     386
     387    binary = None
     388
     389    def __init__( self, binary=None ):
     390
     391        """Remember alternate binary location if supplied"""
     392
     393        global GMETRIC_BINARY
     394
     395        if binary:
     396            self.binary = binary
     397
     398        if not self.binary:
     399            self.binary = GMETRIC_BINARY
     400
     401        # Timeout for XML
     402        #
     403        # From ganglia's documentation:
     404        #
     405        # 'A metric will be deleted DMAX seconds after it is received, and
     406            # DMAX=0 means eternal life.'
     407
     408        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
     409
     410        if GMOND_CONF:
     411
     412            incompatible = self.checkGmetricVersion()
     413
     414            if incompatible:
     415
     416                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
     417                sys.exit( 1 )
     418
     419    def checkGmetricVersion( self ):
     420
     421        """
     422        Check version of gmetric is at least 3.0.1
     423        for the syntax we use
     424        """
     425
     426        global METRIC_MAX_VAL_LEN
     427
     428        incompatible    = 0
     429
     430        gfp     = os.popen( self.binary + ' --version' )
     431        lines       = gfp.readlines()
     432
     433        gfp.close()
     434
     435        for line in lines:
     436
     437            line = line.split( ' ' )
     438
     439            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
     440           
     441                gmetric_version = line[1].split( '\n' )[0]
     442
     443                version_major   = int( gmetric_version.split( '.' )[0] )
     444                version_minor   = int( gmetric_version.split( '.' )[1] )
     445                version_patch   = int( gmetric_version.split( '.' )[2] )
     446
     447                incompatible    = 0
     448
     449                if version_major < 3:
     450
     451                    incompatible = 1
     452               
     453                elif version_major == 3:
     454
     455                    if version_minor == 0:
     456
     457                        if version_patch < 1:
     458                       
     459                            incompatible = 1
     460
     461                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
     462                        #
     463                        if version_patch < 3:
     464
     465                            METRIC_MAX_VAL_LEN = 900
     466
     467                        elif version_patch >= 3:
     468
     469                            METRIC_MAX_VAL_LEN = 1400
     470
     471        return incompatible
     472
     473    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
     474
     475        """Call gmetric binary and multicast"""
     476
     477        cmd = self.binary
     478
     479        if GMETRIC_TARGET:
     480
     481            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
     482            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
     483
     484            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
     485
     486            debug_msg( 10, printTime() + ' ' + metric_debug)
     487
     488            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
     489
     490            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
     491
     492        else:
     493            try:
     494                cmd = cmd + ' -c' + GMOND_CONF
     495
     496            except NameError:
     497
     498                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
     499
     500            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
     501
     502            if len( units ) > 0:
     503
     504                cmd = cmd + ' -u"' + units + '"'
     505
     506            debug_msg( 10, printTime() + ' ' + cmd )
     507
     508            os.system( cmd )
    509509
    510510class DataGatherer:
    511511
    512         """Skeleton class for batch system DataGatherer"""
    513 
    514         def printJobs( self, jobs ):
    515 
    516                 """Print a jobinfo overview"""
    517 
    518                 for name, attrs in self.jobs.items():
    519 
    520                         print 'job %s' %(name)
    521 
    522                         for name, val in attrs.items():
    523 
    524                                 print '\t%s = %s' %( name, val )
    525 
    526         def printJob( self, jobs, job_id ):
    527 
    528                 """Print job with job_id from jobs"""
    529 
    530                 print 'job %s' %(job_id)
    531 
    532                 for name, val in jobs[ job_id ].items():
    533 
    534                         print '\t%s = %s' %( name, val )
    535 
    536         def getAttr( self, attrs, name ):
    537 
    538                 """Return certain attribute from dictionary, if exists"""
    539 
    540                 if attrs.has_key( name ):
    541 
    542                         return attrs[ name ]
    543                 else:
    544                         return ''
    545 
    546         def jobDataChanged( self, jobs, job_id, attrs ):
    547 
    548                 """Check if job with attrs and job_id in jobs has changed"""
    549 
    550                 if jobs.has_key( job_id ):
    551 
    552                         oldData = jobs[ job_id ]       
    553                 else:
    554                         return 1
    555 
    556                 for name, val in attrs.items():
    557 
    558                         if oldData.has_key( name ):
    559 
    560                                 if oldData[ name ] != attrs[ name ]:
    561 
    562                                         return 1
    563 
    564                         else:
    565                                 return 1
    566 
    567                 return 0
    568 
    569         def submitJobData( self ):
    570 
    571                 """Submit job info list"""
    572 
    573                 global BATCH_API
    574 
    575                 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
    576 
    577                 running_jobs    = 0
    578                 queued_jobs     = 0
    579 
    580                 # Count how many running/queued jobs we found
     512    """Skeleton class for batch system DataGatherer"""
     513
     514    def printJobs( self, jobs ):
     515
     516        """Print a jobinfo overview"""
     517
     518        for name, attrs in self.jobs.items():
     519
     520            print 'job %s' %(name)
     521
     522            for name, val in attrs.items():
     523
     524                print '\t%s = %s' %( name, val )
     525
     526    def printJob( self, jobs, job_id ):
     527
     528        """Print job with job_id from jobs"""
     529
     530        print 'job %s' %(job_id)
     531
     532        for name, val in jobs[ job_id ].items():
     533
     534            print '\t%s = %s' %( name, val )
     535
     536    def getAttr( self, attrs, name ):
     537
     538        """Return certain attribute from dictionary, if exists"""
     539
     540        if attrs.has_key( name ):
     541
     542            return attrs[ name ]
     543        else:
     544            return ''
     545
     546    def jobDataChanged( self, jobs, job_id, attrs ):
     547
     548        """Check if job with attrs and job_id in jobs has changed"""
     549
     550        if jobs.has_key( job_id ):
     551
     552            oldData = jobs[ job_id ]   
     553        else:
     554            return 1
     555
     556        for name, val in attrs.items():
     557
     558            if oldData.has_key( name ):
     559
     560                if oldData[ name ] != attrs[ name ]:
     561
     562                    return 1
     563
     564            else:
     565                return 1
     566
     567        return 0
     568
     569    def submitJobData( self ):
     570
     571        """Submit job info list"""
     572
     573        global BATCH_API
     574
     575        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     576
     577        running_jobs    = 0
     578        queued_jobs = 0
     579
     580        # Count how many running/queued jobs we found
    581581                #
    582                 for jobid, jobattrs in self.jobs.items():
    583 
    584                         if jobattrs[ 'status' ] == 'Q':
    585 
    586                                 queued_jobs += 1
    587 
    588                         elif jobattrs[ 'status' ] == 'R':
    589 
    590                                 running_jobs += 1
    591 
    592                 # Report running/queued jobs as seperate metric for a nice RRD graph
     582        for jobid, jobattrs in self.jobs.items():
     583
     584            if jobattrs[ 'status' ] == 'Q':
     585
     586                queued_jobs += 1
     587
     588            elif jobattrs[ 'status' ] == 'R':
     589
     590                running_jobs += 1
     591
     592        # Report running/queued jobs as seperate metric for a nice RRD graph
    593593                #
    594                 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
    595                 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
    596 
    597                 # Report down/offline nodes in batch (PBS only ATM)
    598                 #
    599                 if BATCH_API == 'pbs':
    600 
    601                         domain          = fqdn_parts( socket.getfqdn() )[1]
    602 
    603                         downed_nodes    = list()
    604                         offline_nodes   = list()
    605                
    606                         l               = ['state']
    607                
    608                         for name, node in self.pq.getnodes().items():
    609 
    610                                 if ( node[ 'state' ].find( "down" ) != -1 ):
    611 
    612                                         downed_nodes.append( name )
    613 
    614                                 if ( node[ 'state' ].find( "offline" ) != -1 ):
    615 
    616                                         offline_nodes.append( name )
    617 
    618                         downnodeslist           = do_nodelist( downed_nodes )
    619                         offlinenodeslist        = do_nodelist( offline_nodes )
    620 
    621                         down_str        = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
    622                         offl_str        = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
    623                         self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
    624                         self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
    625 
    626                 # Now let's spread the knowledge
    627                 #
    628                 for jobid, jobattrs in self.jobs.items():
    629 
    630                         # Make gmetric values for each job: respect max gmetric value length
     594        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
     595        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
     596
     597        # Report down/offline nodes in batch (PBS only ATM)
     598        #
     599        if BATCH_API == 'pbs':
     600
     601            domain      = fqdn_parts( socket.getfqdn() )[1]
     602
     603            downed_nodes    = list()
     604            offline_nodes   = list()
     605       
     606            l       = ['state']
     607       
     608            for name, node in self.pq.getnodes().items():
     609
     610                if ( node[ 'state' ].find( "down" ) != -1 ):
     611
     612                    downed_nodes.append( name )
     613
     614                if ( node[ 'state' ].find( "offline" ) != -1 ):
     615
     616                    offline_nodes.append( name )
     617
     618            downnodeslist       = do_nodelist( downed_nodes )
     619            offlinenodeslist    = do_nodelist( offline_nodes )
     620
     621            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     622            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     623            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
     624            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
     625
     626        # Now let's spread the knowledge
     627        #
     628        for jobid, jobattrs in self.jobs.items():
     629
     630            # Make gmetric values for each job: respect max gmetric value length
    631631                        #
    632                         gmetric_val             = self.compileGmetricVal( jobid, jobattrs )
    633                         metric_increment        = 0
    634 
    635                         # If we have more job info than max gmetric value length allows, split it up
     632            gmetric_val     = self.compileGmetricVal( jobid, jobattrs )
     633            metric_increment    = 0
     634
     635            # If we have more job info than max gmetric value length allows, split it up
    636636                        # amongst multiple metrics
    637                         #
    638                         for val in gmetric_val:
    639 
    640                                 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )
    641 
    642                                 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
     637            #
     638            for val in gmetric_val:
     639
     640                self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )
     641
     642                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
    643643                                #
    644                                 metric_increment        = metric_increment + 1
    645 
    646         def compileGmetricVal( self, jobid, jobattrs ):
    647 
    648                 """Create a val string for gmetric of jobinfo"""
    649 
    650                 gval_lists      = [ ]
    651                 val_list        = { }
    652 
    653                 for val_name, val_value in jobattrs.items():
    654 
    655                         # These are our own metric names, i.e.: status, start_timestamp, etc
     644                metric_increment    = metric_increment + 1
     645
     646    def compileGmetricVal( self, jobid, jobattrs ):
     647
     648        """Create a val string for gmetric of jobinfo"""
     649
     650        gval_lists  = [ ]
     651        val_list    = { }
     652
     653        for val_name, val_value in jobattrs.items():
     654
     655            # These are our own metric names, i.e.: status, start_timestamp, etc
    656656                        #
    657                         val_list_names_len      = len( string.join( val_list.keys() ) ) + len(val_list.keys())
    658 
    659                         # These are their corresponding values
     657            val_list_names_len  = len( string.join( val_list.keys() ) ) + len(val_list.keys())
     658
     659            # These are their corresponding values
    660660                        #
    661                         val_list_vals_len       = len( string.join( val_list.values() ) ) + len(val_list.values())
    662 
    663                         if val_name == 'nodes' and jobattrs['status'] == 'R':
    664 
    665                                 node_str = None
    666 
    667                                 for node in val_value:
    668 
    669                                         if node_str:
    670 
    671                                                 node_str = node_str + ';' + node
    672                                         else:
    673                                                 node_str = node
    674 
    675                                         # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
     661            val_list_vals_len   = len( string.join( val_list.values() ) ) + len(val_list.values())
     662
     663            if val_name == 'nodes' and jobattrs['status'] == 'R':
     664
     665                node_str = None
     666
     667                for node in val_value:
     668
     669                    if node_str:
     670
     671                        node_str = node_str + ';' + node
     672                    else:
     673                        node_str = node
     674
     675                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    676676                                        #
    677                                         if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
    678 
    679                                                 # It's too big, we need to make a new gmetric for the additional info
     677                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
     678
     679                        # It's too big, we need to make a new gmetric for the additional info
    680680                                                #
    681                                                 val_list[ val_name ]    = node_str
    682 
    683                                                 gval_lists.append( val_list )
    684 
    685                                                 val_list                = { }
    686                                                 node_str                = None
    687 
    688                                 val_list[ val_name ]    = node_str
    689 
    690                                 gval_lists.append( val_list )
    691 
    692                                 val_list                = { }
    693 
    694                         elif val_value != '':
    695 
    696                                 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
     681                        val_list[ val_name ]    = node_str
     682
     683                        gval_lists.append( val_list )
     684
     685                        val_list        = { }
     686                        node_str        = None
     687
     688                val_list[ val_name ]    = node_str
     689
     690                gval_lists.append( val_list )
     691
     692                val_list        = { }
     693
     694            elif val_value != '':
     695
     696                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    697697                                #
    698                                 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
    699 
    700                                         # It's too big, we need to make a new gmetric for the additional info
     698                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
     699
     700                    # It's too big, we need to make a new gmetric for the additional info
    701701                                        #
    702                                         gval_lists.append( val_list )
    703 
    704                                         val_list                = { }
    705 
    706                                 val_list[ val_name ]    = val_value
    707 
    708                 if len( val_list ) > 0:
    709 
    710                         gval_lists.append( val_list )
    711 
    712                 str_list        = [ ]
    713 
    714                 # Now append the value names and values together, i.e.: stop_timestamp=value, etc
     702                    gval_lists.append( val_list )
     703
     704                    val_list        = { }
     705
     706                val_list[ val_name ]    = val_value
     707
     708        if len( val_list ) > 0:
     709
     710            gval_lists.append( val_list )
     711
     712        str_list    = [ ]
     713
     714        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
    715715                #
    716                 for val_list in gval_lists:
    717 
    718                         my_val_str      = None
    719 
    720                         for val_name, val_value in val_list.items():
    721 
    722                                 if type(val_value) == list:
    723 
    724                                         val_value       = val_value.join( ',' )
    725 
    726                                 if my_val_str:
    727 
    728                                         try:
    729                                                 # fixme: It's getting
    730                                                 # ('nodes', None) items
    731                                                 my_val_str = my_val_str + ' ' + val_name + '=' + val_value
    732                                         except:
    733                                                 pass
    734 
    735                                 else:
    736                                         my_val_str = val_name + '=' + val_value
    737 
    738                         str_list.append( my_val_str )
    739 
    740                 return str_list
     716        for val_list in gval_lists:
     717
     718            my_val_str  = None
     719
     720            for val_name, val_value in val_list.items():
     721
     722                if type(val_value) == list:
     723
     724                    val_value   = val_value.join( ',' )
     725
     726                if my_val_str:
     727
     728                    try:
     729                        # fixme: It's getting
     730                        # ('nodes', None) items
     731                        my_val_str = my_val_str + ' ' + val_name + '=' + val_value
     732                    except:
     733                        pass
     734
     735                else:
     736                    my_val_str = val_name + '=' + val_value
     737
     738            str_list.append( my_val_str )
     739
     740        return str_list
    741741
    742742        def daemon( self ):
     
    760760                        sys.exit(0)  # end parent
    761761
    762                 write_pidfile()
     762        write_pidfile()
    763763
    764764                # Go to the root directory and set the umask
     
    782782
    783783                while ( 1 ):
    784                
    785                         self.getJobData()
    786                         self.submitJobData()
    787                         time.sleep( BATCH_POLL_INTERVAL )       
     784       
     785            self.getJobData()
     786            self.submitJobData()
     787            time.sleep( BATCH_POLL_INTERVAL )   
    788788
    789789# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
     
    792792
    793793class NoJobs (Exception):
    794         """Exception raised by empty job list in qstat output."""
    795         pass
     794    """Exception raised by empty job list in qstat output."""
     795    pass
    796796
    797797class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
    798         """SAX handler for XML output from Sun Grid Engine's `qstat'."""
    799 
    800         def __init__(self):
    801                 self.value = ""
    802                 self.joblist = []
    803                 self.job = {}
    804                 self.queue = ""
    805                 self.in_joblist = False
    806                 self.lrequest = False
    807                 self.eltq = deque()
    808                 xml.sax.handler.ContentHandler.__init__(self)
    809 
    810         # The structure of the output is as follows (for SGE 6.0).  It's
    811         # similar for 6.1, but radically different for 6.2, and is
    812         # undocumented generally.  Unfortunately it's voluminous, and probably
    813         # doesn't scale to large clusters/queues.
    814 
    815         # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
    816         #   <djob_info>
    817         #     <qmaster_response>  <!-- job -->
    818         #       ...
    819         #       <JB_ja_template> 
    820         #         <ulong_sublist>
    821         #         ...             <!-- start_time, state ... -->
    822         #         </ulong_sublist>
    823         #       </JB_ja_template> 
    824         #       <JB_ja_tasks>
    825         #         <ulong_sublist>
    826         #           ...           <!-- task info
    827         #         </ulong_sublist>
    828         #         ...
    829         #       </JB_ja_tasks>
    830         #       ...
    831         #     </qmaster_response>
    832         #   </djob_info>
    833         #   <messages>
    834         #   ...
    835 
    836         # NB.  We might treat each task as a separate job, like
    837         # straight qstat output, but the web interface expects jobs to
    838         # be identified by integers, not, say, <job number>.<task>.
    839 
    840         # So, I lied.  If the job list is empty, we get invalid XML
    841         # like this, which we need to defend against:
    842 
    843         # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
    844         #   <>
    845         #     <ST_name>*</ST_name>
    846         #   </>
    847         # </unknown_jobs>
    848 
    849         def startElement(self, name, attrs):
    850                 self.value = ""
    851                 if name == "djob_info": # job list
    852                         self.in_joblist = True
    853                 # The job container is "qmaster_response" in SGE 6.0
    854                 # and 6.1, but "element" in 6.2.  This is only the very
    855                 # start of what's necessary for 6.2, though (sigh).
    856                 elif (name == "qmaster_response" or name == "element") \
    857                             and self.eltq[-1] == "djob_info": # job
    858                         self.job = {"job_state": "U", "slots": 0,
    859                                     "nodes": [], "queued_timestamp": "",
    860                                     "queued_timestamp": "", "queue": "",
    861                                     "ppn": "0", "RN_max": 0,
    862                                     # fixme in endElement
    863                                     "requested_memory": 0, "requested_time": 0
    864                                     }
    865                         self.joblist.append(self.job)
    866                 elif name == "qstat_l_requests": # resource request
    867                         self.lrequest = True
    868                 elif name == "unknown_jobs":
    869                         raise NoJobs
    870                 self.eltq.append (name)
    871 
    872         def characters(self, ch):
    873                 self.value += ch
    874 
    875         def endElement(self, name):
    876                 """Snarf job elements contents into job dictionary.
    877                    Translate keys if appropriate."""
    878 
    879                 name_trans = {
    880                   "JB_job_number": "number",
    881                   "JB_job_name": "name", "JB_owner": "owner",
    882                   "queue_name": "queue", "JAT_start_time": "start_timestamp",
    883                   "JB_submission_time": "queued_timestamp"
    884                   }
    885                 value = self.value
    886                 self.eltq.pop ()
    887 
    888                 if name == "djob_info":
    889                         self.in_joblist = False
    890                         self.job = {}
    891                 elif name == "JAT_master_queue":
    892                         self.job["queue"] = value.split("@")[0]
    893                 elif name == "JG_qhostname":
    894                         if not (value in self.job["nodes"]):
    895                                 self.job["nodes"].append(value)
    896                 elif name == "JG_slots": # slots in use
    897                         self.job["slots"] += int(value)
    898                 elif name == "RN_max": # requested slots (tasks or parallel)
    899                         self.job["RN_max"] = max (self.job["RN_max"],
    900                                                   int(value))
    901                 elif name == "JAT_state": # job state (bitwise or)
    902                         value = int (value)
    903                         # Status values from sge_jobL.h
    904                         #define JIDLE                   0x00000000
    905                         #define JHELD                   0x00000010
    906                         #define JMIGRATING              0x00000020
    907                         #define JQUEUED                 0x00000040
    908                         #define JRUNNING                0x00000080
    909                         #define JSUSPENDED              0x00000100
    910                         #define JTRANSFERING            0x00000200
    911                         #define JDELETED                0x00000400
    912                         #define JWAITING                0x00000800
    913                         #define JEXITING                0x00001000
    914                         #define JWRITTEN                0x00002000
    915                         #define JSUSPENDED_ON_THRESHOLD 0x00010000
    916                         #define JFINISHED               0x00010000
    917                         if value & 0x80:
    918                                 self.job["status"] = "R"
    919                         elif value & 0x40:
    920                                 self.job["status"] = "Q"
    921                         else:
    922                                 self.job["status"] = "O" # `other'
    923                 elif name == "CE_name" and self.lrequest and self.value in \
    924                             ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
    925                         # We're in a container for an interesting resource
    926                         # request; record which type.
    927                         self.lrequest = self.value
    928                 elif name == "CE_doubleval" and self.lrequest:
    929                         # if we're in a container for an interesting
    930                         # resource request, use the maxmimum of the hard
    931                         # and soft requests to record the requested CPU
    932                         # or core.  Fixme:  I'm not sure if this logic is
    933                         # right.
    934                         if self.lrequest in ("h_core", "s_core"):
    935                                 self.job["requested_memory"] = \
    936                                     max (float (value),
    937                                         self.job["requested_memory"])
    938                         # Fixme:  Check what cpu means, c.f [hs]_cpu.
    939                         elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
    940                                 self.job["requested_time"] = \
    941                                     max (float (value),
    942                                         self.job["requested_time"])
    943                 elif name == "qstat_l_requests":
    944                         self.lrequest = False
    945                 elif self.job and self.in_joblist:
    946                         if name in name_trans:
    947                                 name = name_trans[name]
    948                                 self.job[name] = value
     798    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
     799
     800    def __init__(self):
     801        self.value = ""
     802        self.joblist = []
     803        self.job = {}
     804        self.queue = ""
     805        self.in_joblist = False
     806        self.lrequest = False
     807        self.eltq = deque()
     808        xml.sax.handler.ContentHandler.__init__(self)
     809
     810    # The structure of the output is as follows (for SGE 6.0).  It's
     811    # similar for 6.1, but radically different for 6.2, and is
     812    # undocumented generally.  Unfortunately it's voluminous, and probably
     813    # doesn't scale to large clusters/queues.
     814
     815    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
     816    #   <djob_info>
     817    #     <qmaster_response>  <!-- job -->
     818    #       ...
     819    #       <JB_ja_template> 
     820    #         <ulong_sublist>
     821    #         ...             <!-- start_time, state ... -->
     822    #         </ulong_sublist>
     823    #       </JB_ja_template> 
     824    #       <JB_ja_tasks>
     825    #         <ulong_sublist>
     826    #           ...           <!-- task info
     827    #         </ulong_sublist>
     828    #         ...
     829    #       </JB_ja_tasks>
     830    #       ...
     831    #     </qmaster_response>
     832    #   </djob_info>
     833    #   <messages>
     834    #   ...
     835
     836    # NB.  We might treat each task as a separate job, like
     837    # straight qstat output, but the web interface expects jobs to
     838    # be identified by integers, not, say, <job number>.<task>.
     839
     840    # So, I lied.  If the job list is empty, we get invalid XML
     841    # like this, which we need to defend against:
     842
     843    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
     844    #   <>
     845    #     <ST_name>*</ST_name>
     846    #   </>
     847    # </unknown_jobs>
     848
     849    def startElement(self, name, attrs):
     850        self.value = ""
     851        if name == "djob_info": # job list
     852            self.in_joblist = True
     853        # The job container is "qmaster_response" in SGE 6.0
     854        # and 6.1, but "element" in 6.2.  This is only the very
     855        # start of what's necessary for 6.2, though (sigh).
     856        elif (name == "qmaster_response" or name == "element") \
     857                and self.eltq[-1] == "djob_info": # job
     858            self.job = {"job_state": "U", "slots": 0,
     859                    "nodes": [], "queued_timestamp": "",
     860                    "queued_timestamp": "", "queue": "",
     861                    "ppn": "0", "RN_max": 0,
     862                    # fixme in endElement
     863                    "requested_memory": 0, "requested_time": 0
     864                    }
     865            self.joblist.append(self.job)
     866        elif name == "qstat_l_requests": # resource request
     867            self.lrequest = True
     868        elif name == "unknown_jobs":
     869            raise NoJobs
     870        self.eltq.append (name)
     871
     872    def characters(self, ch):
     873        self.value += ch
     874
     875    def endElement(self, name):
     876        """Snarf job elements contents into job dictionary.
     877           Translate keys if appropriate."""
     878
     879        name_trans = {
     880          "JB_job_number": "number",
     881          "JB_job_name": "name", "JB_owner": "owner",
     882          "queue_name": "queue", "JAT_start_time": "start_timestamp",
     883          "JB_submission_time": "queued_timestamp"
     884          }
     885        value = self.value
     886        self.eltq.pop ()
     887
     888        if name == "djob_info":
     889            self.in_joblist = False
     890            self.job = {}
     891        elif name == "JAT_master_queue":
     892            self.job["queue"] = value.split("@")[0]
     893        elif name == "JG_qhostname":
     894            if not (value in self.job["nodes"]):
     895                self.job["nodes"].append(value)
     896        elif name == "JG_slots": # slots in use
     897            self.job["slots"] += int(value)
     898        elif name == "RN_max": # requested slots (tasks or parallel)
     899            self.job["RN_max"] = max (self.job["RN_max"],
     900                          int(value))
     901        elif name == "JAT_state": # job state (bitwise or)
     902            value = int (value)
     903            # Status values from sge_jobL.h
     904            #define JIDLE                   0x00000000
     905            #define JHELD                   0x00000010
     906            #define JMIGRATING              0x00000020
     907            #define JQUEUED                 0x00000040
     908            #define JRUNNING                0x00000080
     909            #define JSUSPENDED              0x00000100
     910            #define JTRANSFERING            0x00000200
     911            #define JDELETED                0x00000400
     912            #define JWAITING                0x00000800
     913            #define JEXITING                0x00001000
     914            #define JWRITTEN                0x00002000
     915            #define JSUSPENDED_ON_THRESHOLD 0x00010000
     916            #define JFINISHED               0x00010000
     917            if value & 0x80:
     918                self.job["status"] = "R"
     919            elif value & 0x40:
     920                self.job["status"] = "Q"
     921            else:
     922                self.job["status"] = "O" # `other'
     923        elif name == "CE_name" and self.lrequest and self.value in \
     924                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
     925            # We're in a container for an interesting resource
     926            # request; record which type.
     927            self.lrequest = self.value
     928        elif name == "CE_doubleval" and self.lrequest:
     929            # if we're in a container for an interesting
     930            # resource request, use the maxmimum of the hard
     931            # and soft requests to record the requested CPU
     932            # or core.  Fixme:  I'm not sure if this logic is
     933            # right.
     934            if self.lrequest in ("h_core", "s_core"):
     935                self.job["requested_memory"] = \
     936                    max (float (value),
     937                    self.job["requested_memory"])
     938            # Fixme:  Check what cpu means, c.f [hs]_cpu.
     939            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
     940                self.job["requested_time"] = \
     941                    max (float (value),
     942                    self.job["requested_time"])
     943        elif name == "qstat_l_requests":
     944            self.lrequest = False
     945        elif self.job and self.in_joblist:
     946            if name in name_trans:
     947                name = name_trans[name]
     948                self.job[name] = value
    949949
    950950# Abstracted from PBS original.
     
    953953def do_nodelist( nodes ):
    954954
    955         """Translate node list as appropriate."""
    956 
    957         nodeslist               = [ ]
    958         my_domain               = fqdn_parts( socket.getfqdn() )[1]
    959 
    960         for node in nodes:
    961 
    962                 host            = node.split( '/' )[0] # not relevant for SGE
    963                 h, host_domain  = fqdn_parts(host)
    964 
    965                 if host_domain == my_domain:
    966 
    967                         host    = h
    968 
    969                 if nodeslist.count( host ) == 0:
    970 
    971                         for translate_pattern in BATCH_HOST_TRANSLATE:
    972 
    973                                 if translate_pattern.find( '/' ) != -1:
    974 
    975                                         translate_orig  = \
    976                                             translate_pattern.split( '/' )[1]
    977                                         translate_new   = \
    978                                             translate_pattern.split( '/' )[2]
    979                                         host = re.sub( translate_orig,
    980                                                        translate_new, host )
    981                         if not host in nodeslist:
    982                                 nodeslist.append( host )
    983         return nodeslist
     955    """Translate node list as appropriate."""
     956
     957    nodeslist       = [ ]
     958    my_domain       = fqdn_parts( socket.getfqdn() )[1]
     959
     960    for node in nodes:
     961
     962        host        = node.split( '/' )[0] # not relevant for SGE
     963        h, host_domain  = fqdn_parts(host)
     964
     965        if host_domain == my_domain:
     966
     967            host    = h
     968
     969        if nodeslist.count( host ) == 0:
     970
     971            for translate_pattern in BATCH_HOST_TRANSLATE:
     972
     973                if translate_pattern.find( '/' ) != -1:
     974
     975                    translate_orig  = \
     976                        translate_pattern.split( '/' )[1]
     977                    translate_new   = \
     978                        translate_pattern.split( '/' )[2]
     979                    host = re.sub( translate_orig,
     980                               translate_new, host )
     981            if not host in nodeslist:
     982                nodeslist.append( host )
     983    return nodeslist
    984984
    985985class SgeDataGatherer(DataGatherer):
    986986
    987         jobs = {}
    988 
    989         def __init__( self ):
    990                 self.jobs = {}
    991                 self.timeoffset = 0
    992                 self.dp = DataProcessor()
    993 
    994         def getJobData( self ):
    995                 """Gather all data on current jobs in SGE"""
    996 
    997                 import popen2
    998 
    999                 self.cur_time = 0
    1000                 queues = ""
    1001                 if QUEUE:       # only for specific queues
    1002                         # Fixme:  assumes queue names don't contain single
    1003                         # quote or comma.  Don't know what the SGE rules are.
    1004                         queues = " -q '" + string.join (QUEUE, ",") + "'"
    1005                 # Note the comment in SgeQstatXMLParser about scaling with
    1006                 # this method of getting data.  I haven't found better one.
    1007                 # Output with args `-xml -ext -f -r' is easier to parse
    1008                 # in some ways, harder in others, but it doesn't provide
    1009                 # the submission time (at least SGE 6.0).  The pipeline
    1010                 # into sed corrects bogus XML observed with a configuration
    1011                 # of SGE 6.0u8, which otherwise causes the parsing to hang.
    1012                 piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
     987    jobs = {}
     988
     989    def __init__( self ):
     990        self.jobs = {}
     991        self.timeoffset = 0
     992        self.dp = DataProcessor()
     993
     994    def getJobData( self ):
     995        """Gather all data on current jobs in SGE"""
     996
     997        import popen2
     998
     999        self.cur_time = 0
     1000        queues = ""
     1001        if QUEUE:   # only for specific queues
     1002            # Fixme:  assumes queue names don't contain single
     1003            # quote or comma.  Don't know what the SGE rules are.
     1004            queues = " -q '" + string.join (QUEUE, ",") + "'"
     1005        # Note the comment in SgeQstatXMLParser about scaling with
     1006        # this method of getting data.  I haven't found better one.
     1007        # Output with args `-xml -ext -f -r' is easier to parse
     1008        # in some ways, harder in others, but it doesn't provide
     1009        # the submission time (at least SGE 6.0).  The pipeline
     1010        # into sed corrects bogus XML observed with a configuration
     1011        # of SGE 6.0u8, which otherwise causes the parsing to hang.
     1012        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
    10131013sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
    1014                                                + queues, True)
    1015                 qstatparser = SgeQstatXMLParser()
    1016                 parse_err = 0
    1017                 try:
    1018                         xml.sax.parse(piping.fromchild, qstatparser)
    1019                 except NoJobs:
    1020                         pass
    1021                 except:
    1022                         parse_err = 1
    1023                 if piping.wait():
    1024                         debug_msg(10,
    1025                                   "qstat error, skipping until next polling interval: "
    1026                                   + piping.childerr.readline())
    1027                         return None
    1028                 elif parse_err:
    1029                         debug_msg(10, "Bad XML output from qstat"())
    1030                         exit (1)
    1031                 for f in piping.fromchild, piping.tochild, piping.childerr:
    1032                         f.close()
    1033                 self.cur_time = time.time()
    1034                 jobs_processed = []
    1035                 for job in qstatparser.joblist:
    1036                         job_id = job["number"]
    1037                         if job["status"] in [ 'Q', 'R' ]:
    1038                                 jobs_processed.append(job_id)
    1039                         if job["status"] == "R":
    1040                                 job["nodes"] = do_nodelist (job["nodes"])
    1041                                 # Fixme: why is job["nodes"] sometimes null?
    1042                                 try:
    1043                                         # Fixme: Is this sensible?  The
    1044                                         # PBS-type PPN isn't something you use
    1045                                         # with SGE.
    1046                                         job["ppn"] = float(job["slots"]) / \
    1047                                             len(job["nodes"])
    1048                                 except:
    1049                                         job["ppn"] = 0
    1050                                 if DETECT_TIME_DIFFS:
    1051                                         # If a job start is later than our
    1052                                         # current date, that must mean
    1053                                         # the SGE server's time is later
    1054                                         # than our local time.
    1055                                         start_timestamp = \
    1056                                             int (job["start_timestamp"])
    1057                                         if start_timestamp > \
    1058                                                     int(self.cur_time) + \
    1059                                                     int(self.timeoffset):
    1060 
    1061                                                 self.timeoffset = \
    1062                                                     start_timestamp - \
    1063                                                     int(self.cur_time)
    1064                         else:
    1065                                 # fixme: Note sure what this should be:
    1066                                 job["ppn"] = job["RN_max"]
    1067                                 job["nodes"] = "1"
    1068 
    1069                         myAttrs = {}
    1070                         for attr in ["name", "queue", "owner",
    1071                                      "requested_time", "status",
    1072                                      "requested_memory", "ppn",
    1073                                      "start_timestamp", "queued_timestamp"]:
    1074                                 myAttrs[attr] = str(job[attr])
    1075                         myAttrs["nodes"] = job["nodes"]
    1076                         myAttrs["reported"] = str(int(self.cur_time) + \
    1077                                                   int(self.timeoffset))
    1078                         myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
    1079                         myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
    1080 
    1081                         if self.jobDataChanged(self.jobs, job_id, myAttrs) \
    1082                                     and myAttrs["status"] in ["R", "Q"]:
    1083                                 self.jobs[job_id] = myAttrs
    1084                 for id, attrs in self.jobs.items():
    1085                         if id not in jobs_processed:
    1086                                 del self.jobs[id]
     1014                           + queues, True)
     1015        qstatparser = SgeQstatXMLParser()
     1016        parse_err = 0
     1017        try:
     1018            xml.sax.parse(piping.fromchild, qstatparser)
     1019        except NoJobs:
     1020            pass
     1021        except:
     1022            parse_err = 1
     1023            if piping.wait():
     1024            debug_msg(10,
     1025                  "qstat error, skipping until next polling interval: "
     1026                  + piping.childerr.readline())
     1027            return None
     1028        elif parse_err:
     1029            debug_msg(10, "Bad XML output from qstat"())
     1030            exit (1)
     1031        for f in piping.fromchild, piping.tochild, piping.childerr:
     1032            f.close()
     1033        self.cur_time = time.time()
     1034        jobs_processed = []
     1035        for job in qstatparser.joblist:
     1036            job_id = job["number"]
     1037            if job["status"] in [ 'Q', 'R' ]:
     1038                jobs_processed.append(job_id)
     1039            if job["status"] == "R":
     1040                job["nodes"] = do_nodelist (job["nodes"])
     1041                # Fixme: why is job["nodes"] sometimes null?
     1042                try:
     1043                    # Fixme: Is this sensible?  The
     1044                    # PBS-type PPN isn't something you use
     1045                    # with SGE.
     1046                    job["ppn"] = float(job["slots"]) / \
     1047                        len(job["nodes"])
     1048                except:
     1049                    job["ppn"] = 0
     1050                if DETECT_TIME_DIFFS:
     1051                    # If a job start is later than our
     1052                    # current date, that must mean
     1053                    # the SGE server's time is later
     1054                    # than our local time.
     1055                    start_timestamp = \
     1056                        int (job["start_timestamp"])
     1057                    if start_timestamp > \
     1058                            int(self.cur_time) + \
     1059                            int(self.timeoffset):
     1060
     1061                        self.timeoffset = \
     1062                            start_timestamp - \
     1063                            int(self.cur_time)
     1064            else:
     1065                # fixme: Note sure what this should be:
     1066                job["ppn"] = job["RN_max"]
     1067                job["nodes"] = "1"
     1068
     1069            myAttrs = {}
     1070            for attr in ["name", "queue", "owner",
     1071                     "requested_time", "status",
     1072                     "requested_memory", "ppn",
     1073                     "start_timestamp", "queued_timestamp"]:
     1074                myAttrs[attr] = str(job[attr])
     1075            myAttrs["nodes"] = job["nodes"]
     1076            myAttrs["reported"] = str(int(self.cur_time) + \
     1077                          int(self.timeoffset))
     1078            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
     1079            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
     1080
     1081            if self.jobDataChanged(self.jobs, job_id, myAttrs) \
     1082                    and myAttrs["status"] in ["R", "Q"]:
     1083                self.jobs[job_id] = myAttrs
     1084        for id, attrs in self.jobs.items():
     1085            if id not in jobs_processed:
     1086                del self.jobs[id]
    10871087
    10881088# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
     
    11041104        def _countDuplicatesInList( self, dupedList ):
    11051105
    1106                 countDupes      = { }
    1107 
    1108                 for item in dupedList:
    1109 
    1110                         if not countDupes.has_key( item ):
    1111 
    1112                                 countDupes[ item ]      = 1
    1113                         else:
    1114                                 countDupes[ item ]      = countDupes[ item ] + 1
    1115 
    1116                 dupeCountList   = [ ]
    1117 
    1118                 for item, count in countDupes.items():
    1119 
    1120                         dupeCountList.append( ( item, count ) )
     1106        countDupes  = { }
     1107
     1108        for item in dupedList:
     1109
     1110            if not countDupes.has_key( item ):
     1111
     1112                countDupes[ item ]  = 1
     1113            else:
     1114                countDupes[ item ]  = countDupes[ item ] + 1
     1115
     1116        dupeCountList   = [ ]
     1117
     1118        for item, count in countDupes.items():
     1119
     1120            dupeCountList.append( ( item, count ) )
    11211121
    11221122                return dupeCountList
     
    11801180                                requested_cpus = 1
    11811181
    1182                         if QUEUE:
    1183                                 for q in QUEUE:
    1184                                         if q == queue:
    1185                                                 display_queue = 1
    1186                                                 break
    1187                                         else:
    1188                                                 display_queue = 0
    1189                                                 continue
    1190                         if display_queue == 0:
    1191                                 continue
     1182            if QUEUE:
     1183                for q in QUEUE:
     1184                    if q == queue:
     1185                        display_queue = 1
     1186                        break
     1187                    else:
     1188                        display_queue = 0
     1189                        continue
     1190            if display_queue == 0:
     1191                continue
    11921192
    11931193                        runState = self.getAttr( attrs, 'status' )
     
    12251225                                myAttrs['name'] = name
    12261226
    1227                         myAttrs[ 'owner' ]              = owner
    1228                         myAttrs[ 'requested_time' ]     = str(requested_time)
    1229                         myAttrs[ 'requested_memory' ]   = str(requested_memory)
    1230                         myAttrs[ 'requested_cpus' ]     = str(requested_cpus)
    1231                         myAttrs[ 'ppn' ]                = str( ppn )
    1232                         myAttrs[ 'status' ]             = status
    1233                         myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
    1234                         myAttrs[ 'queue' ]              = str(queue)
    1235                         myAttrs[ 'queued_timestamp' ]   = str(queued_timestamp)
    1236                         myAttrs[ 'reported' ]           = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    1237                         myAttrs[ 'nodes' ]              = do_nodelist( nodelist )
    1238                         myAttrs[ 'domain' ]             = fqdn_parts( socket.getfqdn() )[1]
    1239                         myAttrs[ 'poll_interval' ]      = str(BATCH_POLL_INTERVAL)
     1227                        myAttrs[ 'owner' ]      = owner
     1228                        myAttrs[ 'requested_time' ] = str(requested_time)
     1229                        myAttrs[ 'requested_memory' ]   = str(requested_memory)
     1230                        myAttrs[ 'requested_cpus' ] = str(requested_cpus)
     1231                        myAttrs[ 'ppn' ]        = str( ppn )
     1232                        myAttrs[ 'status' ]     = status
     1233                        myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
     1234                        myAttrs[ 'queue' ]      = str(queue)
     1235                        myAttrs[ 'queued_timestamp' ]   = str(queued_timestamp)
     1236                        myAttrs[ 'reported' ]       = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     1237                        myAttrs[ 'nodes' ]      = do_nodelist( nodelist )
     1238            myAttrs[ 'domain' ]     = fqdn_parts( socket.getfqdn() )[1]
     1239                        myAttrs[ 'poll_interval' ]  = str(BATCH_POLL_INTERVAL)
    12401240
    12411241                        if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
     
    12541254class PbsDataGatherer( DataGatherer ):
    12551255
    1256         """This is the DataGatherer for PBS and Torque"""
    1257 
    1258         global PBSQuery, PBSError
    1259 
    1260         def __init__( self ):
    1261 
    1262                 """Setup appropriate variables"""
    1263 
    1264                 self.jobs       = { }
    1265                 self.timeoffset = 0
    1266                 self.dp         = DataProcessor()
    1267 
    1268                 self.initPbsQuery()
    1269 
    1270         def initPbsQuery( self ):
    1271 
    1272                 self.pq         = None
    1273 
    1274                 if( BATCH_SERVER ):
    1275 
    1276                         self.pq         = PBSQuery( BATCH_SERVER )
    1277                 else:
    1278                         self.pq         = PBSQuery()
    1279 
    1280         def getJobData( self ):
    1281 
    1282                 """Gather all data on current jobs in Torque"""
    1283 
    1284                 joblist         = {}
    1285                 self.cur_time   = 0
    1286 
    1287                 try:
    1288                         joblist         = self.pq.getjobs()
    1289                         self.cur_time   = time.time()
    1290 
    1291                 except PBSError, detail:
    1292 
    1293                         debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
    1294                         return None
    1295 
    1296                 jobs_processed  = [ ]
    1297 
    1298                 for name, attrs in joblist.items():
    1299                         display_queue           = 1
    1300                         job_id                  = name.split( '.' )[0]
    1301 
    1302                         name                    = self.getAttr( attrs, 'Job_Name' )
    1303                         queue                   = self.getAttr( attrs, 'queue' )
    1304 
    1305                         if QUEUE:
    1306                                 for q in QUEUE:
    1307                                         if q == queue:
    1308                                                 display_queue = 1
    1309                                                 break
    1310                                         else:
    1311                                                 display_queue = 0
    1312                                                 continue
    1313                         if display_queue == 0:
    1314                                 continue
    1315 
    1316 
    1317                         owner                   = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
    1318                         requested_time          = self.getAttr( attrs, 'Resource_List.walltime' )
    1319                         requested_memory        = self.getAttr( attrs, 'Resource_List.mem' )
    1320 
    1321                         mynoderequest           = self.getAttr( attrs, 'Resource_List.nodes' )
    1322 
    1323                         ppn                     = ''
    1324 
    1325                         if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
    1326 
    1327                                 mynoderequest_fields    = mynoderequest.split( ':' )
    1328 
    1329                                 for mynoderequest_field in mynoderequest_fields:
    1330 
    1331                                         if mynoderequest_field.find( 'ppn' ) != -1:
    1332 
    1333                                                 ppn     = mynoderequest_field.split( 'ppn=' )[1]
    1334 
    1335                         status                  = self.getAttr( attrs, 'job_state' )
    1336 
    1337                         if status in [ 'Q', 'R' ]:
    1338 
    1339                                 jobs_processed.append( job_id )
    1340 
    1341                         queued_timestamp        = self.getAttr( attrs, 'ctime' )
    1342 
    1343                         if status == 'R':
    1344 
    1345                                 start_timestamp         = self.getAttr( attrs, 'mtime' )
    1346                                 nodes                   = self.getAttr( attrs, 'exec_host' ).split( '+' )
    1347 
    1348                                 nodeslist               = do_nodelist( nodes )
    1349 
    1350                                 if DETECT_TIME_DIFFS:
    1351 
    1352                                         # If a job start if later than our current date,
    1353                                         # that must mean the Torque server's time is later
    1354                                         # than our local time.
    1355                                
    1356                                         if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
    1357 
    1358                                                 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
    1359 
    1360                         elif status == 'Q':
    1361 
    1362                                 # 'mynodequest' can be a string in the following syntax according to the
    1363                                 # Torque Administator's manual:
    1364                                 #
    1365                                 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
    1366                                 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
    1367                                 # etc
    1368                                 #
    1369 
    1370                                 #
    1371                                 # For now we only count the amount of nodes request and ignore properties
    1372                                 #
    1373 
    1374                                 start_timestamp         = ''
    1375                                 count_mynodes           = 0
    1376 
    1377                                 for node in mynoderequest.split( '+' ):
    1378 
    1379                                         # Just grab the {node_count|hostname} part and ignore properties
    1380                                         #
    1381                                         nodepart        = node.split( ':' )[0]
    1382 
    1383                                         # Let's assume a node_count value
    1384                                         #
    1385                                         numeric_node    = 1
    1386 
    1387                                         # Chop the value up into characters
    1388                                         #
    1389                                         for letter in nodepart:
    1390 
    1391                                                 # If this char is not a digit (0-9), this must be a hostname
    1392                                                 #
    1393                                                 if letter not in string.digits:
    1394 
    1395                                                         numeric_node    = 0
    1396 
    1397                                         # If this is a hostname, just count this as one (1) node
    1398                                         #
    1399                                         if not numeric_node:
    1400 
    1401                                                 count_mynodes   = count_mynodes + 1
    1402                                         else:
    1403 
    1404                                                 # If this a number, it must be the node_count
    1405                                                 # and increase our count with it's value
    1406                                                 #
    1407                                                 try:
    1408                                                         count_mynodes   = count_mynodes + int( nodepart )
    1409 
    1410                                                 except ValueError, detail:
    1411 
    1412                                                         # When we arrive here I must be bugged or very confused
    1413                                                         # THIS SHOULD NOT HAPPEN!
    1414                                                         #
    1415                                                         debug_msg( 10, str( detail ) )
    1416                                                         debug_msg( 10, "Encountered weird node in Resources_List?!" )
    1417                                                         debug_msg( 10, 'nodepart = ' + str( nodepart ) )
    1418                                                         debug_msg( 10, 'job = ' + str( name ) )
    1419                                                         debug_msg( 10, 'attrs = ' + str( attrs ) )
    1420                                                
    1421                                 nodeslist       = str( count_mynodes )
    1422                         else:
    1423                                 start_timestamp = ''
    1424                                 nodeslist       = ''
    1425 
    1426                         myAttrs                         = { }
    1427 
    1428                         myAttrs[ 'name' ]               = str( name )
    1429                         myAttrs[ 'queue' ]              = str( queue )
    1430                         myAttrs[ 'owner' ]              = str( owner )
    1431                         myAttrs[ 'requested_time' ]     = str( requested_time )
    1432                         myAttrs[ 'requested_memory' ]   = str( requested_memory )
    1433                         myAttrs[ 'ppn' ]                = str( ppn )
    1434                         myAttrs[ 'status' ]             = str( status )
    1435                         myAttrs[ 'start_timestamp' ]    = str( start_timestamp )
    1436                         myAttrs[ 'queued_timestamp' ]   = str( queued_timestamp )
    1437                         myAttrs[ 'reported' ]           = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    1438                         myAttrs[ 'nodes' ]              = nodeslist
    1439                         myAttrs[ 'domain' ]             = fqdn_parts( socket.getfqdn() )[1]
    1440                         myAttrs[ 'poll_interval' ]      = str( BATCH_POLL_INTERVAL )
    1441 
    1442                         if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
    1443 
    1444                                 self.jobs[ job_id ]     = myAttrs
    1445 
    1446                 for id, attrs in self.jobs.items():
    1447 
    1448                         if id not in jobs_processed:
    1449 
    1450                                 # This one isn't there anymore; toedeledoki!
    1451                                 #
    1452                                 del self.jobs[ id ]
     1256    """This is the DataGatherer for PBS and Torque"""
     1257
     1258    global PBSQuery, PBSError
     1259
     1260    def __init__( self ):
     1261
     1262        """Setup appropriate variables"""
     1263
     1264        self.jobs   = { }
     1265        self.timeoffset = 0
     1266        self.dp     = DataProcessor()
     1267
     1268        self.initPbsQuery()
     1269
     1270    def initPbsQuery( self ):
     1271
     1272        self.pq     = None
     1273
     1274        if( BATCH_SERVER ):
     1275
     1276            self.pq     = PBSQuery( BATCH_SERVER )
     1277        else:
     1278            self.pq     = PBSQuery()
     1279
     1280    def getJobData( self ):
     1281
     1282        """Gather all data on current jobs in Torque"""
     1283
     1284        joblist     = {}
     1285        self.cur_time   = 0
     1286
     1287        try:
     1288            joblist     = self.pq.getjobs()
     1289            self.cur_time   = time.time()
     1290
     1291        except PBSError, detail:
     1292
     1293            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
     1294            return None
     1295
     1296        jobs_processed  = [ ]
     1297
     1298        for name, attrs in joblist.items():
     1299            display_queue       = 1
     1300            job_id          = name.split( '.' )[0]
     1301
     1302            name            = self.getAttr( attrs, 'Job_Name' )
     1303            queue           = self.getAttr( attrs, 'queue' )
     1304
     1305            if QUEUE:
     1306                for q in QUEUE:
     1307                    if q == queue:
     1308                        display_queue = 1
     1309                        break
     1310                    else:
     1311                        display_queue = 0
     1312                        continue
     1313            if display_queue == 0:
     1314                continue
     1315
     1316
     1317            owner           = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
     1318            requested_time      = self.getAttr( attrs, 'Resource_List.walltime' )
     1319            requested_memory    = self.getAttr( attrs, 'Resource_List.mem' )
     1320
     1321            mynoderequest       = self.getAttr( attrs, 'Resource_List.nodes' )
     1322
     1323            ppn         = ''
     1324
     1325            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
     1326
     1327                mynoderequest_fields    = mynoderequest.split( ':' )
     1328
     1329                for mynoderequest_field in mynoderequest_fields:
     1330
     1331                    if mynoderequest_field.find( 'ppn' ) != -1:
     1332
     1333                        ppn = mynoderequest_field.split( 'ppn=' )[1]
     1334
     1335            status          = self.getAttr( attrs, 'job_state' )
     1336
     1337            if status in [ 'Q', 'R' ]:
     1338
     1339                jobs_processed.append( job_id )
     1340
     1341            queued_timestamp    = self.getAttr( attrs, 'ctime' )
     1342
     1343            if status == 'R':
     1344
     1345                start_timestamp     = self.getAttr( attrs, 'mtime' )
     1346                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
     1347
     1348                nodeslist       = do_nodelist( nodes )
     1349
     1350                if DETECT_TIME_DIFFS:
     1351
     1352                    # If a job start if later than our current date,
     1353                    # that must mean the Torque server's time is later
     1354                    # than our local time.
     1355               
     1356                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
     1357
     1358                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
     1359
     1360            elif status == 'Q':
     1361
     1362                # 'mynodequest' can be a string in the following syntax according to the
     1363                # Torque Administator's manual:
     1364                #
     1365                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
     1366                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
     1367                # etc
     1368                #
     1369
     1370                #
     1371                # For now we only count the amount of nodes request and ignore properties
     1372                #
     1373
     1374                start_timestamp     = ''
     1375                count_mynodes       = 0
     1376
     1377                for node in mynoderequest.split( '+' ):
     1378
     1379                    # Just grab the {node_count|hostname} part and ignore properties
     1380                    #
     1381                    nodepart    = node.split( ':' )[0]
     1382
     1383                    # Let's assume a node_count value
     1384                    #
     1385                    numeric_node    = 1
     1386
     1387                    # Chop the value up into characters
     1388                    #
     1389                    for letter in nodepart:
     1390
     1391                        # If this char is not a digit (0-9), this must be a hostname
     1392                        #
     1393                        if letter not in string.digits:
     1394
     1395                            numeric_node    = 0
     1396
     1397                    # If this is a hostname, just count this as one (1) node
     1398                    #
     1399                    if not numeric_node:
     1400
     1401                        count_mynodes   = count_mynodes + 1
     1402                    else:
     1403
     1404                        # If this a number, it must be the node_count
     1405                        # and increase our count with it's value
     1406                        #
     1407                        try:
     1408                            count_mynodes   = count_mynodes + int( nodepart )
     1409
     1410                        except ValueError, detail:
     1411
     1412                            # When we arrive here I must be bugged or very confused
     1413                            # THIS SHOULD NOT HAPPEN!
     1414                            #
     1415                            debug_msg( 10, str( detail ) )
     1416                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
     1417                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
     1418                            debug_msg( 10, 'job = ' + str( name ) )
     1419                            debug_msg( 10, 'attrs = ' + str( attrs ) )
     1420                       
     1421                nodeslist   = str( count_mynodes )
     1422            else:
     1423                start_timestamp = ''
     1424                nodeslist   = ''
     1425
     1426            myAttrs             = { }
     1427
     1428            myAttrs[ 'name' ]       = str( name )
     1429            myAttrs[ 'queue' ]      = str( queue )
     1430            myAttrs[ 'owner' ]      = str( owner )
     1431            myAttrs[ 'requested_time' ] = str( requested_time )
     1432            myAttrs[ 'requested_memory' ]   = str( requested_memory )
     1433            myAttrs[ 'ppn' ]        = str( ppn )
     1434            myAttrs[ 'status' ]     = str( status )
     1435            myAttrs[ 'start_timestamp' ]    = str( start_timestamp )
     1436            myAttrs[ 'queued_timestamp' ]   = str( queued_timestamp )
     1437            myAttrs[ 'reported' ]       = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     1438            myAttrs[ 'nodes' ]      = nodeslist
     1439            myAttrs[ 'domain' ]     = fqdn_parts( socket.getfqdn() )[1]
     1440            myAttrs[ 'poll_interval' ]  = str( BATCH_POLL_INTERVAL )
     1441
     1442            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
     1443
     1444                self.jobs[ job_id ] = myAttrs
     1445
     1446        for id, attrs in self.jobs.items():
     1447
     1448            if id not in jobs_processed:
     1449
     1450                # This one isn't there anymore; toedeledoki!
     1451                #
     1452                del self.jobs[ id ]
    14531453
    14541454#
     
    14691469GMETRIC_DEFAULT_HOST    = '127.0.0.1'
    14701470GMETRIC_DEFAULT_PORT    = '8649'
    1471 GMETRIC_DEFAULT_UNITS   = ''
     1471GMETRIC_DEFAULT_UNITS   = ''
    14721472
    14731473class Gmetric:
    14741474
    1475         global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
    1476 
    1477         slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
    1478         type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
    1479         protocol        = ( 'udp', 'multicast' )
    1480 
    1481         def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
     1475    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
     1476
     1477    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
     1478    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
     1479    protocol        = ( 'udp', 'multicast' )
     1480
     1481    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
    14821482               
    1483                 global GMETRIC_DEFAULT_TYPE
    1484 
    1485                 self.prot       = self.checkHostProtocol( host )
    1486                 self.msg        = xdrlib.Packer()
    1487                 self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
    1488 
    1489                 if self.prot not in self.protocol:
    1490 
    1491                         raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
    1492 
    1493                 if self.prot == 'multicast':
    1494 
    1495                         # Set multicast options
    1496                         #
    1497                         self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
    1498 
    1499                 self.hostport   = ( host, int( port ) )
    1500                 self.slopestr   = 'both'
    1501                 self.tmax       = 60
    1502 
    1503         def checkHostProtocol( self, ip ):
    1504 
    1505                 """Detect if a ip adress is a multicast address"""
    1506 
    1507                 MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
    1508                 MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
    1509 
    1510                 ip_fields               = ip.split( '.' )
    1511 
    1512                 if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
    1513 
    1514                         return 'multicast'
    1515                 else:
    1516                         return 'udp'
    1517 
    1518         def send( self, name, value, dmax, typestr = '', units = '' ):
    1519 
    1520                 if len( units ) == 0:
    1521                         units           = GMETRIC_DEFAULT_UNITS
    1522 
    1523                 if len( typestr ) == 0:
    1524                         typestr         = GMETRIC_DEFAULT_TYPE
    1525 
    1526                 msg             = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
    1527 
    1528                 return self.socket.sendto( msg, self.hostport )
    1529 
    1530         def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ):
    1531 
    1532                 if slopestr not in self.slope:
    1533 
    1534                         raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
    1535 
    1536                 if typestr not in self.type:
    1537 
    1538                         raise ValueError( "Type must be one of: " + str( self.type ) )
    1539 
    1540                 if len( name ) == 0:
    1541 
    1542                         raise ValueError( "Name must be non-empty" )
    1543 
    1544                 self.msg.reset()
    1545                 self.msg.pack_int( 0 )
    1546                 self.msg.pack_string( typestr )
    1547                 self.msg.pack_string( name )
    1548                 self.msg.pack_string( str( value ) )
    1549                 self.msg.pack_string( unitstr )
    1550                 self.msg.pack_int( self.slope[ slopestr ] )
    1551                 self.msg.pack_uint( int( tmax ) )
    1552                 self.msg.pack_uint( int( dmax ) )
    1553 
    1554                 return self.msg.get_buffer()
     1483        global GMETRIC_DEFAULT_TYPE
     1484
     1485        self.prot       = self.checkHostProtocol( host )
     1486        self.msg        = xdrlib.Packer()
     1487        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
     1488
     1489        if self.prot not in self.protocol:
     1490
     1491            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
     1492
     1493        if self.prot == 'multicast':
     1494
     1495            # Set multicast options
     1496            #
     1497            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
     1498
     1499        self.hostport   = ( host, int( port ) )
     1500        self.slopestr   = 'both'
     1501        self.tmax       = 60
     1502
     1503    def checkHostProtocol( self, ip ):
     1504
     1505        """Detect if a ip adress is a multicast address"""
     1506
     1507        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
     1508        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
     1509
     1510        ip_fields               = ip.split( '.' )
     1511
     1512        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
     1513
     1514            return 'multicast'
     1515        else:
     1516            return 'udp'
     1517
     1518    def send( self, name, value, dmax, typestr = '', units = '' ):
     1519
     1520        if len( units ) == 0:
     1521            units       = GMETRIC_DEFAULT_UNITS
     1522
     1523        if len( typestr ) == 0:
     1524            typestr     = GMETRIC_DEFAULT_TYPE
     1525
     1526        msg             = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
     1527
     1528        return self.socket.sendto( msg, self.hostport )
     1529
     1530    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ):
     1531
     1532        if slopestr not in self.slope:
     1533
     1534            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
     1535
     1536        if typestr not in self.type:
     1537
     1538            raise ValueError( "Type must be one of: " + str( self.type ) )
     1539
     1540        if len( name ) == 0:
     1541
     1542            raise ValueError( "Name must be non-empty" )
     1543
     1544        self.msg.reset()
     1545        self.msg.pack_int( 0 )
     1546        self.msg.pack_string( typestr )
     1547        self.msg.pack_string( name )
     1548        self.msg.pack_string( str( value ) )
     1549        self.msg.pack_string( unitstr )
     1550        self.msg.pack_int( self.slope[ slopestr ] )
     1551        self.msg.pack_uint( int( tmax ) )
     1552        self.msg.pack_uint( int( dmax ) )
     1553
     1554        return self.msg.get_buffer()
    15551555
    15561556def printTime( ):
    15571557
    1558         """Print current time/date in human readable format for log/debug"""
    1559 
    1560         return time.strftime("%a, %d %b %Y %H:%M:%S")
     1558    """Print current time/date in human readable format for log/debug"""
     1559
     1560    return time.strftime("%a, %d %b %Y %H:%M:%S")
    15611561
    15621562def debug_msg( level, msg ):
    15631563
    1564         """Print msg if at or above current debug level"""
    1565 
    1566         global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
     1564    """Print msg if at or above current debug level"""
     1565
     1566    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
    15671567
    15681568        if (not DAEMONIZE and DEBUG_LEVEL >= level):
    1569                 sys.stderr.write( msg + '\n' )
    1570 
    1571         if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
    1572                 syslog.syslog( msg )
     1569        sys.stderr.write( msg + '\n' )
     1570
     1571    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
     1572        syslog.syslog( msg )
    15731573
    15741574def write_pidfile():
    15751575
    1576         # Write pidfile if PIDFILE is set
    1577         #
    1578         if PIDFILE:
    1579 
    1580                 pid     = os.getpid()
    1581 
    1582                 pidfile = open( PIDFILE, 'w' )
    1583 
    1584                 pidfile.write( str( pid ) )
    1585                 pidfile.close()
     1576    # Write pidfile if PIDFILE is set
     1577    #
     1578    if PIDFILE:
     1579
     1580        pid = os.getpid()
     1581
     1582        pidfile = open( PIDFILE, 'w' )
     1583
     1584        pidfile.write( str( pid ) )
     1585        pidfile.close()
    15861586
    15871587def main():
    15881588
    1589         """Application start"""
    1590 
    1591         global PBSQuery, PBSError, lsfObject
    1592         global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
    1593 
    1594         if not processArgs( sys.argv[1:] ):
    1595 
    1596                 sys.exit( 1 )
    1597 
    1598         # Load appropriate DataGatherer depending on which BATCH_API is set
    1599         # and any required modules for the Gatherer
    1600         #
    1601         if BATCH_API == 'pbs':
    1602 
    1603                 try:
    1604                         from PBSQuery import PBSQuery, PBSError
    1605 
    1606                 except ImportError:
    1607 
    1608                         debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
    1609                         sys.exit( 1 )
    1610 
    1611                 gather = PbsDataGatherer()
    1612 
    1613         elif BATCH_API == 'sge':
    1614 
    1615                 # Tested with SGE 6.0u11.
    1616                 #
    1617                 gather = SgeDataGatherer()
    1618 
    1619         elif BATCH_API == 'lsf':
    1620 
    1621                 try:
    1622                         from lsfObject import lsfObject
    1623                 except:
    1624                         debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
    1625                         sys.exit( 1)
    1626 
    1627                 gather = LsfDataGatherer()
    1628 
    1629         else:
    1630                 debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
    1631 
    1632                 sys.exit( 1 )
    1633 
    1634         if( DAEMONIZE and USE_SYSLOG ):
    1635 
    1636                 syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
    1637 
    1638         if DAEMONIZE:
    1639 
    1640                 gather.daemon()
    1641         else:
    1642                 gather.run()
     1589    """Application start"""
     1590
     1591    global PBSQuery, PBSError, lsfObject
     1592    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
     1593
     1594    if not processArgs( sys.argv[1:] ):
     1595
     1596        sys.exit( 1 )
     1597
     1598    # Load appropriate DataGatherer depending on which BATCH_API is set
     1599    # and any required modules for the Gatherer
     1600    #
     1601    if BATCH_API == 'pbs':
     1602
     1603        try:
     1604            from PBSQuery import PBSQuery, PBSError
     1605
     1606        except ImportError:
     1607
     1608            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
     1609            sys.exit( 1 )
     1610
     1611        gather = PbsDataGatherer()
     1612
     1613    elif BATCH_API == 'sge':
     1614
     1615        # Tested with SGE 6.0u11.
     1616        #
     1617        gather = SgeDataGatherer()
     1618
     1619    elif BATCH_API == 'lsf':
     1620
     1621        try:
     1622            from lsfObject import lsfObject
     1623        except:
     1624            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
     1625            sys.exit( 1)
     1626
     1627        gather = LsfDataGatherer()
     1628
     1629    else:
     1630        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
     1631
     1632        sys.exit( 1 )
     1633
     1634    if( DAEMONIZE and USE_SYSLOG ):
     1635
     1636        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
     1637
     1638    if DAEMONIZE:
     1639
     1640        gather.daemon()
     1641    else:
     1642        gather.run()
    16431643
    16441644# wh00t? someone started me! :)
    16451645#
    16461646if __name__ == '__main__':
    1647         main()
     1647    main()
Note: See TracChangeset for help on using the changeset viewer.