Changeset 691


Ignore:
Timestamp:
03/20/13 11:04:07 (11 years ago)
Author:
ramonb
Message:
  • cleanup
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/0.4/jobmond/jobmond.py

    r658 r691  
    33# This file is part of Jobmonarch
    44#
    5 # Copyright (C) 2006-2007  Ramon Bastiaans
     5# Copyright (C) 2006-2013  Ramon Bastiaans
    66# Copyright (C) 2007, 2009  Dave Love  (SGE code)
    77#
     
    2828from collections import deque
    2929
    30 VERSION='0.3.1+SVN'
     30VERSION='0.4+SVN'
    3131
    3232def usage( ver ):
    3333
    34         print 'jobmond %s' %VERSION
    35 
    36         if ver:
    37                 return 0
    38 
    39         print
    40         print 'Purpose:'
    41         print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
    42         print '  to Ganglia, which can be viewed with Job Monarch web frontend'
    43         print
    44         print 'Usage:   jobmond [OPTIONS]'
    45         print
    46         print '  -c, --config=FILE      The configuration file to use (default: /etc/jobmond.conf)'
    47         print '  -p, --pidfile=FILE     Use pid file to store the process id'
    48         print '  -h, --help             Print help and exit'
    49         print '  -v, --version          Print version and exit'
    50         print
     34    print 'jobmond %s' %VERSION
     35
     36    if ver:
     37        return 0
     38
     39    print
     40    print 'Purpose:'
     41    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
     42    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
     43    print
     44    print 'Usage:    jobmond [OPTIONS]'
     45    print
     46    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
     47    print '  -p, --pidfile=FILE    Use pid file to store the process id'
     48    print '  -h, --help        Print help and exit'
     49    print '  -v, --version      Print version and exit'
     50    print
    5151
    5252def processArgs( args ):
    5353
    54         SHORT_L         = 'p:hvc:'
    55         LONG_L          = [ 'help', 'config=', 'pidfile=', 'version' ]
    56 
    57         global PIDFILE
    58         PIDFILE         = None
    59 
    60         config_filename = '/etc/jobmond.conf'
    61 
    62         try:
    63 
    64                 opts, args      = getopt.getopt( args, SHORT_L, LONG_L )
    65 
    66         except getopt.GetoptError, detail:
    67 
    68                 print detail
    69                 usage()
    70                 sys.exit( 1 )
    71 
    72         for opt, value in opts:
    73 
    74                 if opt in [ '--config', '-c' ]:
    75                
    76                         config_filename = value
    77 
    78                 if opt in [ '--pidfile', '-p' ]:
    79 
    80                         PIDFILE         = value
    81                
    82                 if opt in [ '--help', '-h' ]:
     54    SHORT_L        = 'p:hvc:'
     55    LONG_L        = [ 'help', 'config=', 'pidfile=', 'version' ]
     56
     57    global PIDFILE
     58    PIDFILE        = None
     59
     60    config_filename    = '/etc/jobmond.conf'
     61
     62    try:
     63
     64        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
     65
     66    except getopt.GetoptError, detail:
     67
     68        print detail
     69        usage()
     70        sys.exit( 1 )
     71
     72    for opt, value in opts:
     73
     74        if opt in [ '--config', '-c' ]:
     75       
     76            config_filename    = value
     77
     78        if opt in [ '--pidfile', '-p' ]:
     79
     80            PIDFILE        = value
     81       
     82        if opt in [ '--help', '-h' ]:
    8383 
    84                         usage( False )
    85                         sys.exit( 0 )
    86 
    87                 if opt in [ '--version', '-v' ]:
    88 
    89                         usage( True )
    90                         sys.exit( 0 )
    91 
    92         return loadConfig( config_filename )
     84            usage( False )
     85            sys.exit( 0 )
     86
     87        if opt in [ '--version', '-v' ]:
     88
     89            usage( True )
     90            sys.exit( 0 )
     91
     92    return loadConfig( config_filename )
    9393
    9494# Fixme:  This doesn't DTRT with commented-out bits of the file.  E.g.
     
    9797class GangliaConfigParser:
    9898
    99         def __init__( self, config_file ):
    100 
    101                 self.config_file        = config_file
    102 
    103                 if not os.path.exists( self.config_file ):
    104 
    105                         debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
    106                         sys.exit( 1 )
    107 
    108         def removeQuotes( self, value ):
    109 
    110                 clean_value     = value
    111                 clean_value     = clean_value.replace( "'", "" )
    112                 clean_value     = clean_value.replace( '"', '' )
    113                 clean_value     = clean_value.strip()
    114 
    115                 return clean_value
    116 
    117         def getVal( self, section, valname ):
    118 
    119                 cfg_fp          = open( self.config_file )
    120                 section_start   = False
    121                 section_found   = False
    122                 value           = None
    123 
    124                 for line in cfg_fp.readlines():
    125 
    126                         if line.find( section ) != -1:
    127 
    128                                 section_found   = True
    129 
    130                         if line.find( '{' ) != -1 and section_found:
    131 
    132                                 section_start   = True
    133 
    134                         if line.find( '}' ) != -1 and section_found:
    135 
    136                                 section_start   = False
    137                                 section_found   = False
    138 
    139                         if line.find( valname ) != -1 and section_start:
    140 
    141                                 value           = string.join( line.split( '=' )[1:], '' ).strip()
    142 
    143                 cfg_fp.close()
    144 
    145                 return value
    146 
    147         def getInt( self, section, valname ):
    148 
    149                 value   = self.getVal( section, valname )
    150 
    151                 if not value:
    152                         return False
    153 
    154                 value   = self.removeQuotes( value )
    155 
    156                 return int( value )
    157 
    158         def getStr( self, section, valname ):
    159 
    160                 value   = self.getVal( section, valname )
    161 
    162                 if not value:
    163                         return False
    164 
    165                 value   = self.removeQuotes( value )
    166 
    167                 return str( value )
     99    def __init__( self, config_file ):
     100
     101        self.config_file    = config_file
     102
     103        if not os.path.exists( self.config_file ):
     104
     105            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
     106            sys.exit( 1 )
     107
     108    def removeQuotes( self, value ):
     109
     110        clean_value    = value
     111        clean_value    = clean_value.replace( "'", "" )
     112        clean_value    = clean_value.replace( '"', '' )
     113        clean_value    = clean_value.strip()
     114
     115        return clean_value
     116
     117    def getVal( self, section, valname ):
     118
     119        cfg_fp        = open( self.config_file )
     120        section_start    = False
     121        section_found    = False
     122        value        = None
     123
     124        for line in cfg_fp.readlines():
     125
     126            if line.find( section ) != -1:
     127
     128                section_found    = True
     129
     130            if line.find( '{' ) != -1 and section_found:
     131
     132                section_start    = True
     133
     134            if line.find( '}' ) != -1 and section_found:
     135
     136                section_start    = False
     137                section_found    = False
     138
     139            if line.find( valname ) != -1 and section_start:
     140
     141                value         = string.join( line.split( '=' )[1:], '' ).strip()
     142
     143        cfg_fp.close()
     144
     145        return value
     146
     147    def getInt( self, section, valname ):
     148
     149        value    = self.getVal( section, valname )
     150
     151        if not value:
     152            return False
     153
     154        value    = self.removeQuotes( value )
     155
     156        return int( value )
     157
     158    def getStr( self, section, valname ):
     159
     160        value    = self.getVal( section, valname )
     161
     162        if not value:
     163            return False
     164
     165        value    = self.removeQuotes( value )
     166
     167        return str( value )
    168168
    169169def findGmetric():
    170170
    171         for dir in os.path.expandvars( '$PATH' ).split( ':' ):
    172 
    173                 guess   = '%s/%s' %( dir, 'gmetric' )
    174 
    175                 if os.path.exists( guess ):
    176 
    177                         return guess
    178 
    179         return False
     171    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
     172
     173        guess    = '%s/%s' %( dir, 'gmetric' )
     174
     175        if os.path.exists( guess ):
     176
     177            return guess
     178
     179    return False
    180180
    181181def loadConfig( filename ):
    182182
    183         def getlist( cfg_string ):
    184 
    185                 my_list = [ ]
    186 
    187                 for item_txt in cfg_string.split( ',' ):
    188 
    189                         sep_char = None
    190 
    191                         item_txt = item_txt.strip()
    192 
    193                         for s_char in [ "'", '"' ]:
    194 
    195                                 if item_txt.find( s_char ) != -1:
    196 
    197                                         if item_txt.count( s_char ) != 2:
    198 
    199                                                 print 'Missing quote: %s' %item_txt
    200                                                 sys.exit( 1 )
    201 
    202                                         else:
    203 
    204                                                 sep_char = s_char
    205                                                 break
    206 
    207                         if sep_char:
    208 
    209                                 item_txt = item_txt.split( sep_char )[1]
    210 
    211                         my_list.append( item_txt )
    212 
    213                 return my_list
    214 
    215         cfg             = ConfigParser.ConfigParser()
    216 
    217         cfg.read( filename )
    218 
    219         global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
    220         global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
    221         global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
    222         global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
    223 
    224         DEBUG_LEVEL     = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
    225 
    226         DAEMONIZE       = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
    227 
    228         SYSLOG_LEVEL    = -1
    229         SYSLOG_FACILITY = None
    230 
    231         try:
    232                 USE_SYSLOG      = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
    233 
    234         except ConfigParser.NoOptionError:
    235 
    236                 USE_SYSLOG      = True
    237 
    238                 debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
    239 
    240         if USE_SYSLOG:
    241 
    242                 try:
    243                         SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
    244 
    245                 except ConfigParser.NoOptionError:
    246 
    247                         debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
    248                         SYSLOG_LEVEL    = 0
    249 
    250                 try:
    251 
    252                         SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
    253 
    254                 except ConfigParser.NoOptionError:
    255 
    256                         SYSLOG_FACILITY = syslog.LOG_DAEMON
    257 
    258                         debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
    259 
    260         try:
    261 
    262                 BATCH_SERVER            = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
    263 
    264         except ConfigParser.NoOptionError:
    265 
    266                 # Backwards compatibility for old configs
    267                 #
    268 
    269                 BATCH_SERVER            = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
    270                 api_guess               = 'pbs'
    271        
    272         try:
    273        
    274                 BATCH_POLL_INTERVAL     = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
    275 
    276         except ConfigParser.NoOptionError:
    277 
    278                 # Backwards compatibility for old configs
    279                 #
    280 
    281                 BATCH_POLL_INTERVAL     = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
    282                 api_guess               = 'pbs'
    283        
    284         try:
    285 
    286                 GMOND_CONF              = cfg.get( 'DEFAULT', 'GMOND_CONF' )
    287 
    288         except ConfigParser.NoOptionError:
    289 
    290                 # Not specified: assume /etc/gmond.conf
    291                 #
    292                 GMOND_CONF              = '/etc/gmond.conf'
    293 
    294         ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
    295 
    296         # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
    297         #
    298         gmetric_dest_ip         = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
    299 
    300         if not gmetric_dest_ip:
    301 
    302                 # Maybe unicast target then
    303                 #
    304                 gmetric_dest_ip         = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
    305 
    306         gmetric_dest_port       = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
    307 
    308         if gmetric_dest_ip and gmetric_dest_port:
    309 
    310                 GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
    311         else:
    312 
    313                 debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
    314 
    315                 # Couldn't figure it out: let's see if it's in our jobmond.conf
    316                 #
    317                 try:
    318 
    319                         GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
    320 
    321                 # Guess not: now just give up
    322                 #
    323                 except ConfigParser.NoOptionError:
    324 
    325                         GMETRIC_TARGET  = None
    326 
    327                         debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
    328 
    329         gmetric_bin     = findGmetric()
    330 
    331         if gmetric_bin:
    332 
    333                 GMETRIC_BINARY          = gmetric_bin
    334         else:
    335                 debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
    336 
    337                 try:
    338 
    339                         GMETRIC_BINARY          = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
    340 
    341                 except ConfigParser.NoOptionError:
    342 
    343                         debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
    344                         sys.exit( 1 )
    345 
    346         DETECT_TIME_DIFFS       = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
    347 
    348         BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
    349 
    350         try:
    351 
    352                 BATCH_API       = cfg.get( 'DEFAULT', 'BATCH_API' )
    353 
    354         except ConfigParser.NoOptionError, detail:
    355 
    356                 if BATCH_SERVER and api_guess:
    357 
    358                         BATCH_API       = api_guess
    359                 else:
    360                         debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
    361                         sys.exit( 1 )
    362 
    363         try:
    364 
    365                 QUEUE           = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
    366 
    367         except ConfigParser.NoOptionError, detail:
    368 
    369                 QUEUE           = None
    370 
    371         return True
     183    def getlist( cfg_string ):
     184
     185        my_list = [ ]
     186
     187        for item_txt in cfg_string.split( ',' ):
     188
     189            sep_char = None
     190
     191            item_txt = item_txt.strip()
     192
     193            for s_char in [ "'", '"' ]:
     194
     195                if item_txt.find( s_char ) != -1:
     196
     197                    if item_txt.count( s_char ) != 2:
     198
     199                        print 'Missing quote: %s' %item_txt
     200                        sys.exit( 1 )
     201
     202                    else:
     203
     204                        sep_char = s_char
     205                        break
     206
     207            if sep_char:
     208
     209                item_txt = item_txt.split( sep_char )[1]
     210
     211            my_list.append( item_txt )
     212
     213        return my_list
     214
     215    cfg        = ConfigParser.ConfigParser()
     216
     217    cfg.read( filename )
     218
     219    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
     220    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
     221    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
     222    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
     223
     224    DEBUG_LEVEL    = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
     225
     226    DAEMONIZE    = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
     227
     228    SYSLOG_LEVEL    = -1
     229    SYSLOG_FACILITY    = None
     230
     231    try:
     232        USE_SYSLOG    = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
     233
     234    except ConfigParser.NoOptionError:
     235
     236        USE_SYSLOG    = True
     237
     238        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
     239
     240    if USE_SYSLOG:
     241
     242        try:
     243            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
     244
     245        except ConfigParser.NoOptionError:
     246
     247            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
     248            SYSLOG_LEVEL    = 0
     249
     250        try:
     251
     252            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
     253
     254        except ConfigParser.NoOptionError:
     255
     256            SYSLOG_FACILITY = syslog.LOG_DAEMON
     257
     258            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
     259
     260    try:
     261
     262        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
     263
     264    except ConfigParser.NoOptionError:
     265
     266        # Backwards compatibility for old configs
     267        #
     268
     269        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
     270        api_guess        = 'pbs'
     271   
     272    try:
     273   
     274        BATCH_POLL_INTERVAL    = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
     275
     276    except ConfigParser.NoOptionError:
     277
     278        # Backwards compatibility for old configs
     279        #
     280
     281        BATCH_POLL_INTERVAL    = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
     282        api_guess        = 'pbs'
     283   
     284    try:
     285
     286        GMOND_CONF        = cfg.get( 'DEFAULT', 'GMOND_CONF' )
     287
     288    except ConfigParser.NoOptionError:
     289
     290        # Not specified: assume /etc/gmond.conf
     291        #
     292        GMOND_CONF        = '/etc/gmond.conf'
     293
     294    ganglia_cfg        = GangliaConfigParser( GMOND_CONF )
     295
     296    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
     297    #
     298    gmetric_dest_ip        = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
     299
     300    if not gmetric_dest_ip:
     301
     302        # Maybe unicast target then
     303        #
     304        gmetric_dest_ip        = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
     305
     306    gmetric_dest_port    = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
     307
     308    if gmetric_dest_ip and gmetric_dest_port:
     309
     310        GMETRIC_TARGET    = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
     311    else:
     312
     313        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
     314
     315        # Couldn't figure it out: let's see if it's in our jobmond.conf
     316        #
     317        try:
     318
     319            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
     320
     321        # Guess not: now just give up
     322        #
     323        except ConfigParser.NoOptionError:
     324
     325            GMETRIC_TARGET    = None
     326
     327            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
     328
     329    gmetric_bin    = findGmetric()
     330
     331    if gmetric_bin:
     332
     333        GMETRIC_BINARY        = gmetric_bin
     334    else:
     335        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
     336
     337        try:
     338
     339            GMETRIC_BINARY        = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
     340
     341        except ConfigParser.NoOptionError:
     342
     343            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
     344            sys.exit( 1 )
     345
     346    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
     347
     348    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
     349
     350    try:
     351
     352        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
     353
     354    except ConfigParser.NoOptionError, detail:
     355
     356        if BATCH_SERVER and api_guess:
     357
     358            BATCH_API    = api_guess
     359        else:
     360            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
     361            sys.exit( 1 )
     362
     363    try:
     364
     365        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
     366
     367    except ConfigParser.NoOptionError, detail:
     368
     369        QUEUE        = None
     370
     371    return True
    372372
    373373def fqdn_parts (fqdn):
    374374
    375         """Return pair of host and domain for fully-qualified domain name arg."""
    376 
    377         parts = fqdn.split (".")
    378 
    379         return (parts[0], string.join(parts[1:], "."))
     375    """Return pair of host and domain for fully-qualified domain name arg."""
     376
     377    parts = fqdn.split (".")
     378
     379    return (parts[0], string.join(parts[1:], "."))
    380380
    381381METRIC_MAX_VAL_LEN = 900
     
    383383class DataProcessor:
    384384
    385         """Class for processing of data"""
    386 
    387         binary = None
    388 
    389         def __init__( self, binary=None ):
    390 
    391                 """Remember alternate binary location if supplied"""
    392 
    393                 global GMETRIC_BINARY, GMOND_CONF
    394 
    395                 if binary:
    396                         self.binary = binary
    397 
    398                 if not self.binary:
    399                         self.binary = GMETRIC_BINARY
    400 
    401                 # Timeout for XML
    402                 #
    403                 # From ganglia's documentation:
    404                 #
    405                 # 'A metric will be deleted DMAX seconds after it is received, and
    406                 # DMAX=0 means eternal life.'
    407 
    408                 self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
    409 
    410                 if GMOND_CONF:
    411 
    412                         incompatible = self.checkGmetricVersion()
    413 
    414                         if incompatible:
    415 
    416                                 debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
    417                                 sys.exit( 1 )
    418 
    419         def checkGmetricVersion( self ):
    420 
    421                 """
    422                 Check version of gmetric is at least 3.0.1
    423                 for the syntax we use
    424                 """
    425 
    426                 global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
    427 
    428                 incompatible    = 0
    429 
    430                 gfp             = os.popen( self.binary + ' --version' )
    431                 lines           = gfp.readlines()
    432 
    433                 gfp.close()
    434 
    435                 for line in lines:
    436 
    437                         line = line.split( ' ' )
    438 
    439                         if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
    440                        
    441                                 gmetric_version = line[1].split( '\n' )[0]
    442 
    443                                 version_major   = int( gmetric_version.split( '.' )[0] )
    444                                 version_minor   = int( gmetric_version.split( '.' )[1] )
    445                                 version_patch   = int( gmetric_version.split( '.' )[2] )
    446 
    447                                 incompatible    = 0
    448 
    449                                 if version_major < 3:
    450 
    451                                         incompatible = 1
    452                                
    453                                 elif version_major == 3:
    454 
    455                                         if version_minor == 0:
    456 
    457                                                 if version_patch < 1:
    458                                                
    459                                                         incompatible = 1
    460 
    461                                                 # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
    462                                                 #
    463                                                 if version_patch < 3:
    464 
    465                                                         METRIC_MAX_VAL_LEN = 900
    466 
    467                                                 elif version_patch >= 3:
    468 
    469                                                         METRIC_MAX_VAL_LEN = 1400
    470 
    471                                         elif version_minor == 1:
    472 
    473                                                 debug_msg( 0, 'Gmetric 3.1 detected, internal gmetric handling disabled. Failing back to gmetric binary' )
    474 
    475                                                 METRIC_MAX_VAL_LEN = 500
    476 
    477                                                 # We don't speak 3.1 gmetric so use binary
    478                                                 #
    479                                                 GMETRIC_TARGET = None
    480 
    481                 return incompatible
    482 
    483         def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
    484 
    485                 """Call gmetric binary and multicast"""
    486 
    487                 cmd = self.binary
    488 
    489                 if GMETRIC_TARGET:
    490 
    491                         GMETRIC_TARGET_HOST     = GMETRIC_TARGET.split( ':' )[0]
    492                         GMETRIC_TARGET_PORT     = GMETRIC_TARGET.split( ':' )[1]
    493 
    494                         metric_debug            = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
    495 
    496                         debug_msg( 10, printTime() + ' ' + metric_debug)
    497 
    498                         gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
    499 
    500                         gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
    501 
    502                 else:
    503                         try:
    504                                 cmd = cmd + ' -c' + GMOND_CONF
    505 
    506                         except NameError:
    507 
    508                                 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
    509 
    510                         cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
    511 
    512                         if len( units ) > 0:
    513 
    514                                 cmd = cmd + ' -u"' + units + '"'
    515 
    516                         debug_msg( 10, printTime() + ' ' + cmd )
    517 
    518                         os.system( cmd )
     385    """Class for processing of data"""
     386
     387    binary = None
     388
     389    def __init__( self, binary=None ):
     390
     391        """Remember alternate binary location if supplied"""
     392
     393        global GMETRIC_BINARY, GMOND_CONF
     394
     395        if binary:
     396            self.binary = binary
     397
     398        if not self.binary:
     399            self.binary = GMETRIC_BINARY
     400
     401        # Timeout for XML
     402        #
     403        # From ganglia's documentation:
     404        #
     405        # 'A metric will be deleted DMAX seconds after it is received, and
     406        # DMAX=0 means eternal life.'
     407
     408        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
     409
     410        if GMOND_CONF:
     411
     412            incompatible = self.checkGmetricVersion()
     413
     414            if incompatible:
     415
     416                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
     417                sys.exit( 1 )
     418
     419    def checkGmetricVersion( self ):
     420
     421        """
     422        Check version of gmetric is at least 3.0.1
     423        for the syntax we use
     424        """
     425
     426        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
     427
     428        incompatible    = 0
     429
     430        gfp        = os.popen( self.binary + ' --version' )
     431        lines        = gfp.readlines()
     432
     433        gfp.close()
     434
     435        for line in lines:
     436
     437            line = line.split( ' ' )
     438
     439            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
     440           
     441                gmetric_version    = line[1].split( '\n' )[0]
     442
     443                version_major    = int( gmetric_version.split( '.' )[0] )
     444                version_minor    = int( gmetric_version.split( '.' )[1] )
     445                version_patch    = int( gmetric_version.split( '.' )[2] )
     446
     447                incompatible    = 0
     448
     449                if version_major < 3:
     450
     451                    incompatible = 1
     452               
     453                elif version_major == 3:
     454
     455                    if version_minor == 0:
     456
     457                        if version_patch < 1:
     458                       
     459                            incompatible = 1
     460
     461                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
     462                        #
     463                        if version_patch < 3:
     464
     465                            METRIC_MAX_VAL_LEN = 900
     466
     467                        elif version_patch >= 3:
     468
     469                            METRIC_MAX_VAL_LEN = 1400
     470
     471                    elif version_minor == 1:
     472
     473                        debug_msg( 0, 'Gmetric 3.1 detected, internal gmetric handling disabled. Failing back to gmetric binary' )
     474
     475                        METRIC_MAX_VAL_LEN = 500
     476
     477                        # We don't speak 3.1 gmetric so use binary
     478                        #
     479                        GMETRIC_TARGET = None
     480
     481        return incompatible
     482
     483    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
     484
     485        """Call gmetric binary and multicast"""
     486
     487        cmd = self.binary
     488
     489        if GMETRIC_TARGET:
     490
     491            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
     492            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
     493
     494            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
     495
     496            debug_msg( 10, printTime() + ' ' + metric_debug)
     497
     498            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
     499
     500            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
     501
     502        else:
     503            try:
     504                cmd = cmd + ' -c' + GMOND_CONF
     505
     506            except NameError:
     507
     508                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
     509
     510            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
     511
     512            if len( units ) > 0:
     513
     514                cmd = cmd + ' -u"' + units + '"'
     515
     516            debug_msg( 10, printTime() + ' ' + cmd )
     517
     518            os.system( cmd )
    519519
    520520class DataGatherer:
    521521
    522         """Skeleton class for batch system DataGatherer"""
    523 
    524         def printJobs( self, jobs ):
    525 
    526                 """Print a jobinfo overview"""
    527 
    528                 for name, attrs in self.jobs.items():
    529 
    530                         print 'job %s' %(name)
    531 
    532                         for name, val in attrs.items():
    533 
    534                                 print '\t%s = %s' %( name, val )
    535 
    536         def printJob( self, jobs, job_id ):
    537 
    538                 """Print job with job_id from jobs"""
    539 
    540                 print 'job %s' %(job_id)
    541 
    542                 for name, val in jobs[ job_id ].items():
    543 
    544                         print '\t%s = %s' %( name, val )
    545 
    546         def getAttr( self, attrs, name ):
    547 
    548                 """Return certain attribute from dictionary, if exists"""
    549 
    550                 if attrs.has_key( name ):
    551 
    552                         return attrs[ name ]
    553                 else:
    554                         return ''
    555 
    556         def jobDataChanged( self, jobs, job_id, attrs ):
    557 
    558                 """Check if job with attrs and job_id in jobs has changed"""
    559 
    560                 if jobs.has_key( job_id ):
    561 
    562                         oldData = jobs[ job_id ]       
    563                 else:
    564                         return 1
    565 
    566                 for name, val in attrs.items():
    567 
    568                         if oldData.has_key( name ):
    569 
    570                                 if oldData[ name ] != attrs[ name ]:
    571 
    572                                         return 1
    573 
    574                         else:
    575                                 return 1
    576 
    577                 return 0
    578 
    579         def submitJobData( self ):
    580 
    581                 """Submit job info list"""
    582 
    583                 global BATCH_API
    584 
    585                 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
    586 
    587                 running_jobs    = 0
    588                 queued_jobs     = 0
    589 
    590                 # Count how many running/queued jobs we found
     522    """Skeleton class for batch system DataGatherer"""
     523
     524    def printJobs( self, jobs ):
     525
     526        """Print a jobinfo overview"""
     527
     528        for name, attrs in self.jobs.items():
     529
     530            print 'job %s' %(name)
     531
     532            for name, val in attrs.items():
     533
     534                print '\t%s = %s' %( name, val )
     535
     536    def printJob( self, jobs, job_id ):
     537
     538        """Print job with job_id from jobs"""
     539
     540        print 'job %s' %(job_id)
     541
     542        for name, val in jobs[ job_id ].items():
     543
     544            print '\t%s = %s' %( name, val )
     545
     546    def getAttr( self, attrs, name ):
     547
     548        """Return certain attribute from dictionary, if exists"""
     549
     550        if attrs.has_key( name ):
     551
     552            return attrs[ name ]
     553        else:
     554            return ''
     555
     556    def jobDataChanged( self, jobs, job_id, attrs ):
     557
     558        """Check if job with attrs and job_id in jobs has changed"""
     559
     560        if jobs.has_key( job_id ):
     561
     562            oldData = jobs[ job_id ]   
     563        else:
     564            return 1
     565
     566        for name, val in attrs.items():
     567
     568            if oldData.has_key( name ):
     569
     570                if oldData[ name ] != attrs[ name ]:
     571
     572                    return 1
     573
     574            else:
     575                return 1
     576
     577        return 0
     578
     579    def submitJobData( self ):
     580
     581        """Submit job info list"""
     582
     583        global BATCH_API
     584
     585        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     586
     587        running_jobs    = 0
     588        queued_jobs    = 0
     589
     590        # Count how many running/queued jobs we found
     591        #
     592        for jobid, jobattrs in self.jobs.items():
     593
     594            if jobattrs[ 'status' ] == 'Q':
     595
     596                queued_jobs += 1
     597
     598            elif jobattrs[ 'status' ] == 'R':
     599
     600                running_jobs += 1
     601
     602        # Report running/queued jobs as seperate metric for a nice RRD graph
     603        #
     604        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
     605        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
     606
     607        # Report down/offline nodes in batch (PBS only ATM)
     608        #
     609        if BATCH_API == 'pbs':
     610
     611            domain        = fqdn_parts( socket.getfqdn() )[1]
     612
     613            downed_nodes    = list()
     614            offline_nodes    = list()
     615       
     616            l        = ['state']
     617       
     618            for name, node in self.pq.getnodes().items():
     619
     620                if ( node[ 'state' ].find( "down" ) != -1 ):
     621
     622                    downed_nodes.append( name )
     623
     624                if ( node[ 'state' ].find( "offline" ) != -1 ):
     625
     626                    offline_nodes.append( name )
     627
     628            downnodeslist        = do_nodelist( downed_nodes )
     629            offlinenodeslist    = do_nodelist( offline_nodes )
     630
     631            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     632            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
     633            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
     634            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
     635
     636        # Now let's spread the knowledge
     637        #
     638        for jobid, jobattrs in self.jobs.items():
     639
     640            # Make gmetric values for each job: respect max gmetric value length
     641            #
     642            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
     643            metric_increment    = 0
     644
     645            # If we have more job info than max gmetric value length allows, split it up
     646            # amongst multiple metrics
     647            #
     648            for val in gmetric_val:
     649
     650                self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )
     651
     652                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
    591653                #
    592                 for jobid, jobattrs in self.jobs.items():
    593 
    594                         if jobattrs[ 'status' ] == 'Q':
    595 
    596                                 queued_jobs += 1
    597 
    598                         elif jobattrs[ 'status' ] == 'R':
    599 
    600                                 running_jobs += 1
    601 
    602                 # Report running/queued jobs as seperate metric for a nice RRD graph
     654                metric_increment    = metric_increment + 1
     655
     656    def compileGmetricVal( self, jobid, jobattrs ):
     657
     658        """Create a val string for gmetric of jobinfo"""
     659
     660        gval_lists    = [ ]
     661        val_list    = { }
     662
     663        for val_name, val_value in jobattrs.items():
     664
     665            # These are our own metric names, i.e.: status, start_timestamp, etc
     666            #
     667            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
     668
     669            # These are their corresponding values
     670            #
     671            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
     672
     673            if val_name == 'nodes' and jobattrs['status'] == 'R':
     674
     675                node_str = None
     676
     677                for node in val_value:
     678
     679                    if node_str:
     680
     681                        node_str = node_str + ';' + node
     682                    else:
     683                        node_str = node
     684
     685                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
     686                    #
     687                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
     688
     689                        # It's too big, we need to make a new gmetric for the additional info
     690                        #
     691                        val_list[ val_name ]    = node_str
     692
     693                        gval_lists.append( val_list )
     694
     695                        val_list        = { }
     696                        node_str        = None
     697
     698                val_list[ val_name ]    = node_str
     699
     700                gval_lists.append( val_list )
     701
     702                val_list        = { }
     703
     704            elif val_value != '':
     705
     706                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    603707                #
    604                 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
    605                 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
    606 
    607                 # Report down/offline nodes in batch (PBS only ATM)
    608                 #
    609                 if BATCH_API == 'pbs':
    610 
    611                         domain          = fqdn_parts( socket.getfqdn() )[1]
    612 
    613                         downed_nodes    = list()
    614                         offline_nodes   = list()
    615                
    616                         l               = ['state']
    617                
    618                         for name, node in self.pq.getnodes().items():
    619 
    620                                 if ( node[ 'state' ].find( "down" ) != -1 ):
    621 
    622                                         downed_nodes.append( name )
    623 
    624                                 if ( node[ 'state' ].find( "offline" ) != -1 ):
    625 
    626                                         offline_nodes.append( name )
    627 
    628                         downnodeslist           = do_nodelist( downed_nodes )
    629                         offlinenodeslist        = do_nodelist( offline_nodes )
    630 
    631                         down_str        = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
    632                         offl_str        = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
    633                         self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
    634                         self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
    635 
    636                 # Now let's spread the knowledge
    637                 #
    638                 for jobid, jobattrs in self.jobs.items():
    639 
    640                         # Make gmetric values for each job: respect max gmetric value length
    641                         #
    642                         gmetric_val             = self.compileGmetricVal( jobid, jobattrs )
    643                         metric_increment        = 0
    644 
    645                         # If we have more job info than max gmetric value length allows, split it up
    646                         # amongst multiple metrics
    647                         #
    648                         for val in gmetric_val:
    649 
    650                                 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )
    651 
    652                                 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
    653                                 #
    654                                 metric_increment        = metric_increment + 1
    655 
    656         def compileGmetricVal( self, jobid, jobattrs ):
    657 
    658                 """Create a val string for gmetric of jobinfo"""
    659 
    660                 gval_lists      = [ ]
    661                 val_list        = { }
    662 
    663                 for val_name, val_value in jobattrs.items():
    664 
    665                         # These are our own metric names, i.e.: status, start_timestamp, etc
    666                         #
    667                         val_list_names_len      = len( string.join( val_list.keys() ) ) + len(val_list.keys())
    668 
    669                         # These are their corresponding values
    670                         #
    671                         val_list_vals_len       = len( string.join( val_list.values() ) ) + len(val_list.values())
    672 
    673                         if val_name == 'nodes' and jobattrs['status'] == 'R':
    674 
    675                                 node_str = None
    676 
    677                                 for node in val_value:
    678 
    679                                         if node_str:
    680 
    681                                                 node_str = node_str + ';' + node
    682                                         else:
    683                                                 node_str = node
    684 
    685                                         # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    686                                         #
    687                                         if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
    688 
    689                                                 # It's too big, we need to make a new gmetric for the additional info
    690                                                 #
    691                                                 val_list[ val_name ]    = node_str
    692 
    693                                                 gval_lists.append( val_list )
    694 
    695                                                 val_list                = { }
    696                                                 node_str                = None
    697 
    698                                 val_list[ val_name ]    = node_str
    699 
    700                                 gval_lists.append( val_list )
    701 
    702                                 val_list                = { }
    703 
    704                         elif val_value != '':
    705 
    706                                 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    707                                 #
    708                                 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
    709 
    710                                         # It's too big, we need to make a new gmetric for the additional info
    711                                         #
    712                                         gval_lists.append( val_list )
    713 
    714                                         val_list                = { }
    715 
    716                                 val_list[ val_name ]    = val_value
    717 
    718                 if len( val_list ) > 0:
    719 
    720                         gval_lists.append( val_list )
    721 
    722                 str_list        = [ ]
    723 
    724                 # Now append the value names and values together, i.e.: stop_timestamp=value, etc
    725                 #
    726                 for val_list in gval_lists:
    727 
    728                         my_val_str      = None
    729 
    730                         for val_name, val_value in val_list.items():
    731 
    732                                 if type(val_value) == list:
    733 
    734                                         val_value       = val_value.join( ',' )
    735 
    736                                 if my_val_str:
    737 
    738                                         try:
    739                                                 # fixme: It's getting
    740                                                 # ('nodes', None) items
    741                                                 my_val_str = my_val_str + ' ' + val_name + '=' + val_value
    742                                         except:
    743                                                 pass
    744 
    745                                 else:
    746                                         my_val_str = val_name + '=' + val_value
    747 
    748                         str_list.append( my_val_str )
    749 
    750                 return str_list
    751 
    752         def daemon( self ):
    753 
    754                 """Run as daemon forever"""
    755 
    756                 # Fork the first child
    757                 #
    758                 pid = os.fork()
    759                 if pid > 0:
    760                         sys.exit(0)  # end parent
    761 
    762                 # creates a session and sets the process group ID
    763                 #
    764                 os.setsid()
    765 
    766                 # Fork the second child
    767                 #
    768                 pid = os.fork()
    769                 if pid > 0:
    770                         sys.exit(0)  # end parent
    771 
    772                 write_pidfile()
    773 
    774                 # Go to the root directory and set the umask
    775                 #
    776                 os.chdir('/')
    777                 os.umask(0)
    778 
    779                 sys.stdin.close()
    780                 sys.stdout.close()
    781                 sys.stderr.close()
    782 
    783                 os.open('/dev/null', os.O_RDWR)
    784                 os.dup2(0, 1)
    785                 os.dup2(0, 2)
    786 
    787                 self.run()
    788 
    789         def run( self ):
    790 
    791                 """Main thread"""
    792 
    793                 while ( 1 ):
    794                
    795                         self.getJobData()
    796                         self.submitJobData()
    797                         time.sleep( BATCH_POLL_INTERVAL )       
     708                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
     709
     710                    # It's too big, we need to make a new gmetric for the additional info
     711                    #
     712                    gval_lists.append( val_list )
     713
     714                    val_list        = { }
     715
     716                val_list[ val_name ]    = val_value
     717
     718        if len( val_list ) > 0:
     719
     720            gval_lists.append( val_list )
     721
     722        str_list    = [ ]
     723
     724        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
     725        #
     726        for val_list in gval_lists:
     727
     728            my_val_str    = None
     729
     730            for val_name, val_value in val_list.items():
     731
     732                if type(val_value) == list:
     733
     734                    val_value    = val_value.join( ',' )
     735
     736                if my_val_str:
     737
     738                    try:
     739                        # fixme: It's getting
     740                        # ('nodes', None) items
     741                        my_val_str = my_val_str + ' ' + val_name + '=' + val_value
     742                    except:
     743                        pass
     744
     745                else:
     746                    my_val_str = val_name + '=' + val_value
     747
     748            str_list.append( my_val_str )
     749
     750        return str_list
     751
     752    def daemon( self ):
     753
     754        """Run as daemon forever"""
     755
     756        # Fork the first child
     757        #
     758        pid = os.fork()
     759        if pid > 0:
     760            sys.exit(0)  # end parent
     761
     762        # creates a session and sets the process group ID
     763        #
     764        os.setsid()
     765
     766        # Fork the second child
     767        #
     768        pid = os.fork()
     769        if pid > 0:
     770            sys.exit(0)  # end parent
     771
     772        write_pidfile()
     773
     774        # Go to the root directory and set the umask
     775        #
     776        os.chdir('/')
     777        os.umask(0)
     778
     779        sys.stdin.close()
     780        sys.stdout.close()
     781        sys.stderr.close()
     782
     783        os.open('/dev/null', os.O_RDWR)
     784        os.dup2(0, 1)
     785        os.dup2(0, 2)
     786
     787        self.run()
     788
     789    def run( self ):
     790
     791        """Main thread"""
     792
     793        while ( 1 ):
     794       
     795            self.getJobData()
     796            self.submitJobData()
     797            time.sleep( BATCH_POLL_INTERVAL )   
    798798
    799799# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
     
    802802
    803803class NoJobs (Exception):
    804         """Exception raised by empty job list in qstat output."""
    805         pass
     804    """Exception raised by empty job list in qstat output."""
     805    pass
    806806
    807807class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
    808         """SAX handler for XML output from Sun Grid Engine's `qstat'."""
    809 
    810         def __init__(self):
    811                 self.value = ""
    812                 self.joblist = []
    813                 self.job = {}
    814                 self.queue = ""
    815                 self.in_joblist = False
    816                 self.lrequest = False
    817                 self.eltq = deque()
    818                 xml.sax.handler.ContentHandler.__init__(self)
    819 
    820         # The structure of the output is as follows (for SGE 6.0).  It's
    821         # similar for 6.1, but radically different for 6.2, and is
    822         # undocumented generally.  Unfortunately it's voluminous, and probably
    823         # doesn't scale to large clusters/queues.
    824 
    825         # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
    826         #   <djob_info>
    827         #     <qmaster_response>  <!-- job -->
    828         #       ...
    829         #       <JB_ja_template> 
    830         #         <ulong_sublist>
    831         #         ...             <!-- start_time, state ... -->
    832         #         </ulong_sublist>
    833         #       </JB_ja_template> 
    834         #       <JB_ja_tasks>
    835         #         <ulong_sublist>
    836         #           ...           <!-- task info
    837         #         </ulong_sublist>
    838         #         ...
    839         #       </JB_ja_tasks>
    840         #       ...
    841         #     </qmaster_response>
    842         #   </djob_info>
    843         #   <messages>
    844         #   ...
    845 
    846         # NB.  We might treat each task as a separate job, like
    847         # straight qstat output, but the web interface expects jobs to
    848         # be identified by integers, not, say, <job number>.<task>.
    849 
    850         # So, I lied.  If the job list is empty, we get invalid XML
    851         # like this, which we need to defend against:
    852 
    853         # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
    854         #   <>
    855         #     <ST_name>*</ST_name>
    856         #   </>
    857         # </unknown_jobs>
    858 
    859         def startElement(self, name, attrs):
    860                 self.value = ""
    861                 if name == "djob_info": # job list
    862                         self.in_joblist = True
    863                 # The job container is "qmaster_response" in SGE 6.0
    864                 # and 6.1, but "element" in 6.2.  This is only the very
    865                 # start of what's necessary for 6.2, though (sigh).
    866                 elif (name == "qmaster_response" or name == "element") \
    867                             and self.eltq[-1] == "djob_info": # job
    868                         self.job = {"job_state": "U", "slots": 0,
    869                                     "nodes": [], "queued_timestamp": "",
    870                                     "queued_timestamp": "", "queue": "",
    871                                     "ppn": "0", "RN_max": 0,
    872                                     # fixme in endElement
    873                                     "requested_memory": 0, "requested_time": 0
    874                                     }
    875                         self.joblist.append(self.job)
    876                 elif name == "qstat_l_requests": # resource request
    877                         self.lrequest = True
    878                 elif name == "unknown_jobs":
    879                         raise NoJobs
    880                 self.eltq.append (name)
    881 
    882         def characters(self, ch):
    883                 self.value += ch
    884 
    885         def endElement(self, name):
    886                 """Snarf job elements contents into job dictionary.
    887                    Translate keys if appropriate."""
    888 
    889                 name_trans = {
    890                   "JB_job_number": "number",
    891                   "JB_job_name": "name", "JB_owner": "owner",
    892                   "queue_name": "queue", "JAT_start_time": "start_timestamp",
    893                   "JB_submission_time": "queued_timestamp"
    894                   }
    895                 value = self.value
    896                 self.eltq.pop ()
    897 
    898                 if name == "djob_info":
    899                         self.in_joblist = False
    900                         self.job = {}
    901                 elif name == "JAT_master_queue":
    902                         self.job["queue"] = value.split("@")[0]
    903                 elif name == "JG_qhostname":
    904                         if not (value in self.job["nodes"]):
    905                                 self.job["nodes"].append(value)
    906                 elif name == "JG_slots": # slots in use
    907                         self.job["slots"] += int(value)
    908                 elif name == "RN_max": # requested slots (tasks or parallel)
    909                         self.job["RN_max"] = max (self.job["RN_max"],
    910                                                   int(value))
    911                 elif name == "JAT_state": # job state (bitwise or)
    912                         value = int (value)
    913                         # Status values from sge_jobL.h
    914                         #define JIDLE                   0x00000000
    915                         #define JHELD                   0x00000010
    916                         #define JMIGRATING              0x00000020
    917                         #define JQUEUED                 0x00000040
    918                         #define JRUNNING                0x00000080
    919                         #define JSUSPENDED              0x00000100
    920                         #define JTRANSFERING            0x00000200
    921                         #define JDELETED                0x00000400
    922                         #define JWAITING                0x00000800
    923                         #define JEXITING                0x00001000
    924                         #define JWRITTEN                0x00002000
    925                         #define JSUSPENDED_ON_THRESHOLD 0x00010000
    926                         #define JFINISHED               0x00010000
    927                         if value & 0x80:
    928                                 self.job["status"] = "R"
    929                         elif value & 0x40:
    930                                 self.job["status"] = "Q"
    931                         else:
    932                                 self.job["status"] = "O" # `other'
    933                 elif name == "CE_name" and self.lrequest and self.value in \
    934                             ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
    935                         # We're in a container for an interesting resource
    936                         # request; record which type.
    937                         self.lrequest = self.value
    938                 elif name == "CE_doubleval" and self.lrequest:
    939                         # if we're in a container for an interesting
    940                         # resource request, use the maxmimum of the hard
    941                         # and soft requests to record the requested CPU
    942                         # or core.  Fixme:  I'm not sure if this logic is
    943                         # right.
    944                         if self.lrequest in ("h_core", "s_core"):
    945                                 self.job["requested_memory"] = \
    946                                     max (float (value),
    947                                         self.job["requested_memory"])
    948                         # Fixme:  Check what cpu means, c.f [hs]_cpu.
    949                         elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
    950                                 self.job["requested_time"] = \
    951                                     max (float (value),
    952                                         self.job["requested_time"])
    953                 elif name == "qstat_l_requests":
    954                         self.lrequest = False
    955                 elif self.job and self.in_joblist:
    956                         if name in name_trans:
    957                                 name = name_trans[name]
    958                                 self.job[name] = value
     808    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
     809
     810    def __init__(self):
     811        self.value = ""
     812        self.joblist = []
     813        self.job = {}
     814        self.queue = ""
     815        self.in_joblist = False
     816        self.lrequest = False
     817        self.eltq = deque()
     818        xml.sax.handler.ContentHandler.__init__(self)
     819
     820    # The structure of the output is as follows (for SGE 6.0).  It's
     821    # similar for 6.1, but radically different for 6.2, and is
     822    # undocumented generally.  Unfortunately it's voluminous, and probably
     823    # doesn't scale to large clusters/queues.
     824
     825    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
     826    #   <djob_info>
     827    #     <qmaster_response>  <!-- job -->
     828    #       ...
     829    #       <JB_ja_template> 
     830    #     <ulong_sublist>
     831    #     ...         <!-- start_time, state ... -->
     832    #     </ulong_sublist>
     833    #       </JB_ja_template> 
     834    #       <JB_ja_tasks>
     835    #     <ulong_sublist>
     836    #       ...       <!-- task info
     837    #     </ulong_sublist>
     838    #     ...
     839    #       </JB_ja_tasks>
     840    #       ...
     841    #     </qmaster_response>
     842    #   </djob_info>
     843    #   <messages>
     844    #   ...
     845
     846    # NB.  We might treat each task as a separate job, like
     847    # straight qstat output, but the web interface expects jobs to
     848    # be identified by integers, not, say, <job number>.<task>.
     849
     850    # So, I lied.  If the job list is empty, we get invalid XML
     851    # like this, which we need to defend against:
     852
     853    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
     854    #   <>
     855    #     <ST_name>*</ST_name>
     856    #   </>
     857    # </unknown_jobs>
     858
     859    def startElement(self, name, attrs):
     860        self.value = ""
     861        if name == "djob_info":    # job list
     862            self.in_joblist = True
     863        # The job container is "qmaster_response" in SGE 6.0
     864        # and 6.1, but "element" in 6.2.  This is only the very
     865        # start of what's necessary for 6.2, though (sigh).
     866        elif (name == "qmaster_response" or name == "element") \
     867                and self.eltq[-1] == "djob_info": # job
     868            self.job = {"job_state": "U", "slots": 0,
     869                    "nodes": [], "queued_timestamp": "",
     870                    "queued_timestamp": "", "queue": "",
     871                    "ppn": "0", "RN_max": 0,
     872                    # fixme in endElement
     873                    "requested_memory": 0, "requested_time": 0
     874                    }
     875            self.joblist.append(self.job)
     876        elif name == "qstat_l_requests": # resource request
     877            self.lrequest = True
     878        elif name == "unknown_jobs":
     879            raise NoJobs
     880        self.eltq.append (name)
     881
     882    def characters(self, ch):
     883        self.value += ch
     884
     885    def endElement(self, name):
     886        """Snarf job elements contents into job dictionary.
     887           Translate keys if appropriate."""
     888
     889        name_trans = {
     890          "JB_job_number": "number",
     891          "JB_job_name": "name", "JB_owner": "owner",
     892          "queue_name": "queue", "JAT_start_time": "start_timestamp",
     893          "JB_submission_time": "queued_timestamp"
     894          }
     895        value = self.value
     896        self.eltq.pop ()
     897
     898        if name == "djob_info":
     899            self.in_joblist = False
     900            self.job = {}
     901        elif name == "JAT_master_queue":
     902            self.job["queue"] = value.split("@")[0]
     903        elif name == "JG_qhostname":
     904            if not (value in self.job["nodes"]):
     905                self.job["nodes"].append(value)
     906        elif name == "JG_slots": # slots in use
     907            self.job["slots"] += int(value)
     908        elif name == "RN_max": # requested slots (tasks or parallel)
     909            self.job["RN_max"] = max (self.job["RN_max"],
     910                          int(value))
     911        elif name == "JAT_state": # job state (bitwise or)
     912            value = int (value)
     913            # Status values from sge_jobL.h
     914            #define JIDLE           0x00000000
     915            #define JHELD           0x00000010
     916            #define JMIGRATING          0x00000020
     917            #define JQUEUED         0x00000040
     918            #define JRUNNING        0x00000080
     919            #define JSUSPENDED          0x00000100
     920            #define JTRANSFERING        0x00000200
     921            #define JDELETED        0x00000400
     922            #define JWAITING        0x00000800
     923            #define JEXITING        0x00001000
     924            #define JWRITTEN        0x00002000
     925            #define JSUSPENDED_ON_THRESHOLD 0x00010000
     926            #define JFINISHED           0x00010000
     927            if value & 0x80:
     928                self.job["status"] = "R"
     929            elif value & 0x40:
     930                self.job["status"] = "Q"
     931            else:
     932                self.job["status"] = "O" # `other'
     933        elif name == "CE_name" and self.lrequest and self.value in \
     934                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
     935            # We're in a container for an interesting resource
     936            # request; record which type.
     937            self.lrequest = self.value
     938        elif name == "CE_doubleval" and self.lrequest:
     939            # if we're in a container for an interesting
     940            # resource request, use the maxmimum of the hard
     941            # and soft requests to record the requested CPU
     942            # or core.  Fixme:  I'm not sure if this logic is
     943            # right.
     944            if self.lrequest in ("h_core", "s_core"):
     945                self.job["requested_memory"] = \
     946                    max (float (value),
     947                    self.job["requested_memory"])
     948            # Fixme:  Check what cpu means, c.f [hs]_cpu.
     949            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
     950                self.job["requested_time"] = \
     951                    max (float (value),
     952                    self.job["requested_time"])
     953        elif name == "qstat_l_requests":
     954            self.lrequest = False
     955        elif self.job and self.in_joblist:
     956            if name in name_trans:
     957                name = name_trans[name]
     958                self.job[name] = value
    959959
    960960# Abstracted from PBS original.
     
    963963def do_nodelist( nodes ):
    964964
    965         """Translate node list as appropriate."""
    966 
    967         nodeslist               = [ ]
    968         my_domain               = fqdn_parts( socket.getfqdn() )[1]
    969 
    970         for node in nodes:
    971 
    972                 host            = node.split( '/' )[0] # not relevant for SGE
    973                 h, host_domain  = fqdn_parts(host)
    974 
    975                 if host_domain == my_domain:
    976 
    977                         host    = h
    978 
    979                 if nodeslist.count( host ) == 0:
    980 
    981                         for translate_pattern in BATCH_HOST_TRANSLATE:
    982 
    983                                 if translate_pattern.find( '/' ) != -1:
    984 
    985                                         translate_orig  = \
    986                                             translate_pattern.split( '/' )[1]
    987                                         translate_new   = \
    988                                             translate_pattern.split( '/' )[2]
    989                                         host = re.sub( translate_orig,
    990                                                        translate_new, host )
    991                         if not host in nodeslist:
    992                                 nodeslist.append( host )
    993         return nodeslist
     965    """Translate node list as appropriate."""
     966
     967    nodeslist        = [ ]
     968    my_domain        = fqdn_parts( socket.getfqdn() )[1]
     969
     970    for node in nodes:
     971
     972        host        = node.split( '/' )[0] # not relevant for SGE
     973        h, host_domain    = fqdn_parts(host)
     974
     975        if host_domain == my_domain:
     976
     977            host    = h
     978
     979        if nodeslist.count( host ) == 0:
     980
     981            for translate_pattern in BATCH_HOST_TRANSLATE:
     982
     983                if translate_pattern.find( '/' ) != -1:
     984
     985                    translate_orig    = \
     986                        translate_pattern.split( '/' )[1]
     987                    translate_new    = \
     988                        translate_pattern.split( '/' )[2]
     989                    host = re.sub( translate_orig,
     990                               translate_new, host )
     991            if not host in nodeslist:
     992                nodeslist.append( host )
     993    return nodeslist
    994994
    995995class SgeDataGatherer(DataGatherer):
    996996
    997         jobs = {}
    998 
    999         def __init__( self ):
    1000                 self.jobs = {}
    1001                 self.timeoffset = 0
    1002                 self.dp = DataProcessor()
    1003 
    1004         def getJobData( self ):
    1005                 """Gather all data on current jobs in SGE"""
    1006 
    1007                 import popen2
    1008 
    1009                 self.cur_time = 0
    1010                 queues = ""
    1011                 if QUEUE:       # only for specific queues
    1012                         # Fixme:  assumes queue names don't contain single
    1013                         # quote or comma.  Don't know what the SGE rules are.
    1014                         queues = " -q '" + string.join (QUEUE, ",") + "'"
    1015                 # Note the comment in SgeQstatXMLParser about scaling with
    1016                 # this method of getting data.  I haven't found better one.
    1017                 # Output with args `-xml -ext -f -r' is easier to parse
    1018                 # in some ways, harder in others, but it doesn't provide
    1019                 # the submission time (at least SGE 6.0).  The pipeline
    1020                 # into sed corrects bogus XML observed with a configuration
    1021                 # of SGE 6.0u8, which otherwise causes the parsing to hang.
    1022                 piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
     997    jobs = {}
     998
     999    def __init__( self ):
     1000        self.jobs = {}
     1001        self.timeoffset = 0
     1002        self.dp = DataProcessor()
     1003
     1004    def getJobData( self ):
     1005        """Gather all data on current jobs in SGE"""
     1006
     1007        import popen2
     1008
     1009        self.cur_time = 0
     1010        queues = ""
     1011        if QUEUE:    # only for specific queues
     1012            # Fixme:  assumes queue names don't contain single
     1013            # quote or comma.  Don't know what the SGE rules are.
     1014            queues = " -q '" + string.join (QUEUE, ",") + "'"
     1015        # Note the comment in SgeQstatXMLParser about scaling with
     1016        # this method of getting data.  I haven't found better one.
     1017        # Output with args `-xml -ext -f -r' is easier to parse
     1018        # in some ways, harder in others, but it doesn't provide
     1019        # the submission time (at least SGE 6.0).  The pipeline
     1020        # into sed corrects bogus XML observed with a configuration
     1021        # of SGE 6.0u8, which otherwise causes the parsing to hang.
     1022        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
    10231023sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
    1024                                                + queues, True)
    1025                 qstatparser = SgeQstatXMLParser()
    1026                 parse_err = 0
    1027                 try:
    1028                         xml.sax.parse(piping.fromchild, qstatparser)
    1029                 except NoJobs:
    1030                         pass
    1031                 except:
    1032                         parse_err = 1
    1033                 if piping.wait():
    1034                         debug_msg(10,
    1035                                   "qstat error, skipping until next polling interval: "
    1036                                   + piping.childerr.readline())
    1037                         return None
    1038                 elif parse_err:
    1039                         debug_msg(10, "Bad XML output from qstat"())
    1040                         exit (1)
    1041                 for f in piping.fromchild, piping.tochild, piping.childerr:
    1042                         f.close()
    1043                 self.cur_time = time.time()
    1044                 jobs_processed = []
    1045                 for job in qstatparser.joblist:
    1046                         job_id = job["number"]
    1047                         if job["status"] in [ 'Q', 'R' ]:
    1048                                 jobs_processed.append(job_id)
    1049                         if job["status"] == "R":
    1050                                 job["nodes"] = do_nodelist (job["nodes"])
    1051                                 # Fixme: why is job["nodes"] sometimes null?
    1052                                 try:
    1053                                         # Fixme: Is this sensible?  The
    1054                                         # PBS-type PPN isn't something you use
    1055                                         # with SGE.
    1056                                         job["ppn"] = float(job["slots"]) / \
    1057                                             len(job["nodes"])
    1058                                 except:
    1059                                         job["ppn"] = 0
    1060                                 if DETECT_TIME_DIFFS:
    1061                                         # If a job start is later than our
    1062                                         # current date, that must mean
    1063                                         # the SGE server's time is later
    1064                                         # than our local time.
    1065                                         start_timestamp = \
    1066                                             int (job["start_timestamp"])
    1067                                         if start_timestamp > \
    1068                                                     int(self.cur_time) + \
    1069                                                     int(self.timeoffset):
    1070 
    1071                                                 self.timeoffset = \
    1072                                                     start_timestamp - \
    1073                                                     int(self.cur_time)
    1074                         else:
    1075                                 # fixme: Note sure what this should be:
    1076                                 job["ppn"] = job["RN_max"]
    1077                                 job["nodes"] = "1"
    1078 
    1079                         myAttrs = {}
    1080                         for attr in ["name", "queue", "owner",
    1081                                      "requested_time", "status",
    1082                                      "requested_memory", "ppn",
    1083                                      "start_timestamp", "queued_timestamp"]:
    1084                                 myAttrs[attr] = str(job[attr])
    1085                         myAttrs["nodes"] = job["nodes"]
    1086                         myAttrs["reported"] = str(int(self.cur_time) + \
    1087                                                   int(self.timeoffset))
    1088                         myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
    1089                         myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
    1090 
    1091                         if self.jobDataChanged(self.jobs, job_id, myAttrs) \
    1092                                     and myAttrs["status"] in ["R", "Q"]:
    1093                                 self.jobs[job_id] = myAttrs
    1094                 for id, attrs in self.jobs.items():
    1095                         if id not in jobs_processed:
    1096                                 del self.jobs[id]
     1024                           + queues, True)
     1025        qstatparser = SgeQstatXMLParser()
     1026        parse_err = 0
     1027        try:
     1028            xml.sax.parse(piping.fromchild, qstatparser)
     1029        except NoJobs:
     1030            pass
     1031        except:
     1032            parse_err = 1
     1033               if piping.wait():
     1034            debug_msg(10,
     1035                  "qstat error, skipping until next polling interval: "
     1036                  + piping.childerr.readline())
     1037            return None
     1038        elif parse_err:
     1039            debug_msg(10, "Bad XML output from qstat"())
     1040            exit (1)
     1041        for f in piping.fromchild, piping.tochild, piping.childerr:
     1042            f.close()
     1043        self.cur_time = time.time()
     1044        jobs_processed = []
     1045        for job in qstatparser.joblist:
     1046            job_id = job["number"]
     1047            if job["status"] in [ 'Q', 'R' ]:
     1048                jobs_processed.append(job_id)
     1049            if job["status"] == "R":
     1050                job["nodes"] = do_nodelist (job["nodes"])
     1051                # Fixme: why is job["nodes"] sometimes null?
     1052                try:
     1053                    # Fixme: Is this sensible?  The
     1054                    # PBS-type PPN isn't something you use
     1055                    # with SGE.
     1056                    job["ppn"] = float(job["slots"]) / \
     1057                        len(job["nodes"])
     1058                except:
     1059                    job["ppn"] = 0
     1060                if DETECT_TIME_DIFFS:
     1061                    # If a job start is later than our
     1062                    # current date, that must mean
     1063                    # the SGE server's time is later
     1064                    # than our local time.
     1065                    start_timestamp = \
     1066                        int (job["start_timestamp"])
     1067                    if start_timestamp > \
     1068                            int(self.cur_time) + \
     1069                            int(self.timeoffset):
     1070
     1071                        self.timeoffset    = \
     1072                            start_timestamp - \
     1073                            int(self.cur_time)
     1074            else:
     1075                # fixme: Note sure what this should be:
     1076                job["ppn"] = job["RN_max"]
     1077                job["nodes"] = "1"
     1078
     1079            myAttrs = {}
     1080            for attr in ["name", "queue", "owner",
     1081                     "requested_time", "status",
     1082                     "requested_memory", "ppn",
     1083                     "start_timestamp", "queued_timestamp"]:
     1084                myAttrs[attr] = str(job[attr])
     1085            myAttrs["nodes"] = job["nodes"]
     1086            myAttrs["reported"] = str(int(self.cur_time) + \
     1087                          int(self.timeoffset))
     1088            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
     1089            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
     1090
     1091            if self.jobDataChanged(self.jobs, job_id, myAttrs) \
     1092                    and myAttrs["status"] in ["R", "Q"]:
     1093                self.jobs[job_id] = myAttrs
     1094        for id, attrs in self.jobs.items():
     1095            if id not in jobs_processed:
     1096                del self.jobs[id]
    10971097
    10981098# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
     
    11011101class LsfDataGatherer(DataGatherer):
    11021102
    1103         """This is the DataGatherer for LSf"""
    1104 
    1105         global lsfObject
    1106 
    1107         def __init__( self ):
    1108 
    1109                 self.jobs = { }
    1110                 self.timeoffset = 0
    1111                 self.dp = DataProcessor()
    1112                 self.initLsfQuery()
    1113 
    1114         def _countDuplicatesInList( self, dupedList ):
    1115 
    1116                 countDupes      = { }
    1117 
    1118                 for item in dupedList:
    1119 
    1120                         if not countDupes.has_key( item ):
    1121 
    1122                                 countDupes[ item ]      = 1
    1123                         else:
    1124                                 countDupes[ item ]      = countDupes[ item ] + 1
    1125 
    1126                 dupeCountList   = [ ]
    1127 
    1128                 for item, count in countDupes.items():
    1129 
    1130                         dupeCountList.append( ( item, count ) )
    1131 
    1132                 return dupeCountList
     1103    """This is the DataGatherer for LSf"""
     1104
     1105    global lsfObject
     1106
     1107    def __init__( self ):
     1108
     1109        self.jobs = { }
     1110        self.timeoffset = 0
     1111        self.dp = DataProcessor()
     1112        self.initLsfQuery()
     1113
     1114    def _countDuplicatesInList( self, dupedList ):
     1115
     1116        countDupes    = { }
     1117
     1118        for item in dupedList:
     1119
     1120            if not countDupes.has_key( item ):
     1121
     1122                countDupes[ item ]    = 1
     1123            else:
     1124                countDupes[ item ]    = countDupes[ item ] + 1
     1125
     1126        dupeCountList    = [ ]
     1127
     1128        for item, count in countDupes.items():
     1129
     1130            dupeCountList.append( ( item, count ) )
     1131
     1132        return dupeCountList
    11331133#
    11341134#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
     
    11371137########################
    11381138
    1139         def initLsfQuery( self ):
    1140                 self.pq = None
    1141                 self.pq = lsfObject.jobInfoEntObject()
    1142 
    1143         def getJobData( self, known_jobs="" ):
    1144                 """Gather all data on current jobs in LSF"""
    1145                 if len( known_jobs ) > 0:
    1146                         jobs = known_jobs
    1147                 else:
    1148                         jobs = { }
    1149                 joblist = {}
    1150                 joblist = self.pq.getJobInfo()
     1139    def initLsfQuery( self ):
     1140        self.pq = None
     1141        self.pq = lsfObject.jobInfoEntObject()
     1142
     1143    def getJobData( self, known_jobs="" ):
     1144        """Gather all data on current jobs in LSF"""
     1145        if len( known_jobs ) > 0:
     1146            jobs = known_jobs
     1147        else:
     1148            jobs = { }
     1149        joblist = {}
     1150        joblist = self.pq.getJobInfo()
     1151        nodelist = ''
     1152
     1153        self.cur_time = time.time()
     1154
     1155        jobs_processed = [ ]
     1156
     1157        for name, attrs in joblist.items():
     1158            job_id = str(name)
     1159            jobs_processed.append( job_id )
     1160            name = self.getAttr( attrs, 'jobName' )
     1161            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
     1162            owner = self.getAttr( attrs, 'user' )
     1163
     1164### THIS IS THE rLimit List index values
     1165#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
     1166#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
     1167#define LSF_RLIMIT_DATA     2        /* data size */
     1168#define LSF_RLIMIT_STACK    3        /* stack size */
     1169#define LSF_RLIMIT_CORE     4        /* core file size */
     1170#define LSF_RLIMIT_RSS      5        /* resident set size */
     1171#define LSF_RLIMIT_NOFILE   6        /* open files */
     1172#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
     1173#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
     1174#define LSF_RLIMIT_SWAP     8
     1175#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
     1176#define LSF_RLIMIT_PROCESS  10       /* process number limit */
     1177#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
     1178#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
     1179
     1180            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
     1181            if requested_time == -1:
     1182                requested_time = ""
     1183            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
     1184            if requested_memory == -1:
     1185                requested_memory = ""
     1186# This tries to get proc per node. We don't support this right now
     1187            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
     1188            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
     1189            if requested_cpus == None or requested_cpus == "":
     1190                requested_cpus = 1
     1191
     1192            if QUEUE:
     1193                for q in QUEUE:
     1194                    if q == queue:
     1195                        display_queue = 1
     1196                        break
     1197                    else:
     1198                        display_queue = 0
     1199                        continue
     1200            if display_queue == 0:
     1201                continue
     1202
     1203            runState = self.getAttr( attrs, 'status' )
     1204            if runState == 4:
     1205                status = 'R'
     1206            else:
     1207                status = 'Q'
     1208            queued_timestamp = self.getAttr( attrs, 'submitTime' )
     1209
     1210            if status == 'R':
     1211                start_timestamp = self.getAttr( attrs, 'startTime' )
     1212                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
     1213                nodelist = nodesCpu.keys()
     1214
     1215                if DETECT_TIME_DIFFS:
     1216
     1217                    # If a job start if later than our current date,
     1218                    # that must mean the Torque server's time is later
     1219                    # than our local time.
     1220
     1221                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
     1222
     1223                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
     1224
     1225            elif status == 'Q':
     1226                start_timestamp = ''
     1227                count_mynodes = 0
     1228                numeric_node = 1
    11511229                nodelist = ''
    11521230
    1153                 self.cur_time = time.time()
    1154 
    1155                 jobs_processed = [ ]
    1156 
    1157                 for name, attrs in joblist.items():
    1158                         job_id = str(name)
    1159                         jobs_processed.append( job_id )
    1160                         name = self.getAttr( attrs, 'jobName' )
    1161                         queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
    1162                         owner = self.getAttr( attrs, 'user' )
    1163 
    1164 ### THIS IS THE rLimit List index values
    1165 #define LSF_RLIMIT_CPU      0            /* cpu time in milliseconds */
    1166 #define LSF_RLIMIT_FSIZE    1            /* maximum file size */
    1167 #define LSF_RLIMIT_DATA     2            /* data size */
    1168 #define LSF_RLIMIT_STACK    3            /* stack size */
    1169 #define LSF_RLIMIT_CORE     4            /* core file size */
    1170 #define LSF_RLIMIT_RSS      5            /* resident set size */
    1171 #define LSF_RLIMIT_NOFILE   6            /* open files */
    1172 #define LSF_RLIMIT_OPEN_MAX 7            /* (from HP-UX) */
    1173 #define LSF_RLIMIT_VMEM     8            /* maximum swap mem */
    1174 #define LSF_RLIMIT_SWAP     8
    1175 #define LSF_RLIMIT_RUN      9            /* max wall-clock time limit */
    1176 #define LSF_RLIMIT_PROCESS  10           /* process number limit */
    1177 #define LSF_RLIMIT_THREAD   11           /* thread number limit (introduced in LSF6.0) */
    1178 #define LSF_RLIM_NLIMITS    12           /* number of resource limits */
    1179 
    1180                         requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
    1181                         if requested_time == -1:
    1182                                 requested_time = ""
    1183                         requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
    1184                         if requested_memory == -1:
    1185                                 requested_memory = ""
    1186 # This tries to get proc per node. We don't support this right now
    1187                         ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
    1188                         requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
    1189                         if requested_cpus == None or requested_cpus == "":
    1190                                 requested_cpus = 1
    1191 
    1192                         if QUEUE:
    1193                                 for q in QUEUE:
    1194                                         if q == queue:
    1195                                                 display_queue = 1
    1196                                                 break
    1197                                         else:
    1198                                                 display_queue = 0
    1199                                                 continue
    1200                         if display_queue == 0:
    1201                                 continue
    1202 
    1203                         runState = self.getAttr( attrs, 'status' )
    1204                         if runState == 4:
    1205                                 status = 'R'
    1206                         else:
    1207                                 status = 'Q'
    1208                         queued_timestamp = self.getAttr( attrs, 'submitTime' )
    1209 
    1210                         if status == 'R':
    1211                                 start_timestamp = self.getAttr( attrs, 'startTime' )
    1212                                 nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
    1213                                 nodelist = nodesCpu.keys()
    1214 
    1215                                 if DETECT_TIME_DIFFS:
    1216 
    1217                                         # If a job start if later than our current date,
    1218                                         # that must mean the Torque server's time is later
    1219                                         # than our local time.
    1220 
    1221                                         if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
    1222 
    1223                                                 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
    1224 
    1225                         elif status == 'Q':
    1226                                 start_timestamp = ''
    1227                                 count_mynodes = 0
    1228                                 numeric_node = 1
    1229                                 nodelist = ''
    1230 
    1231                         myAttrs = { }
    1232                         if name == "":
    1233                                 myAttrs['name'] = "none"
    1234                         else:
    1235                                 myAttrs['name'] = name
    1236 
    1237                         myAttrs[ 'owner' ]              = owner
    1238                         myAttrs[ 'requested_time' ]     = str(requested_time)
    1239                         myAttrs[ 'requested_memory' ]   = str(requested_memory)
    1240                         myAttrs[ 'requested_cpus' ]     = str(requested_cpus)
    1241                         myAttrs[ 'ppn' ]                = str( ppn )
    1242                         myAttrs[ 'status' ]             = status
    1243                         myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
    1244                         myAttrs[ 'queue' ]              = str(queue)
    1245                         myAttrs[ 'queued_timestamp' ]   = str(queued_timestamp)
    1246                         myAttrs[ 'reported' ]           = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    1247                         myAttrs[ 'nodes' ]              = do_nodelist( nodelist )
    1248                         myAttrs[ 'domain' ]             = fqdn_parts( socket.getfqdn() )[1]
    1249                         myAttrs[ 'poll_interval' ]      = str(BATCH_POLL_INTERVAL)
    1250 
    1251                         if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
    1252                                 jobs[ job_id ] = myAttrs
    1253 
    1254                                 debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
    1255 
    1256                 for id, attrs in jobs.items():
    1257                         if id not in jobs_processed:
    1258                                 # This one isn't there anymore
    1259                                 #
    1260                                 del jobs[ id ]
    1261                 self.jobs=jobs
     1231            myAttrs = { }
     1232            if name == "":
     1233                myAttrs['name'] = "none"
     1234            else:
     1235                myAttrs['name'] = name
     1236
     1237            myAttrs[ 'owner' ]        = owner
     1238            myAttrs[ 'requested_time' ]    = str(requested_time)
     1239            myAttrs[ 'requested_memory' ]    = str(requested_memory)
     1240            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
     1241            myAttrs[ 'ppn' ]        = str( ppn )
     1242            myAttrs[ 'status' ]        = status
     1243            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
     1244            myAttrs[ 'queue' ]        = str(queue)
     1245            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
     1246            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     1247            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
     1248            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
     1249            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
     1250
     1251            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
     1252                jobs[ job_id ] = myAttrs
     1253
     1254                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
     1255
     1256        for id, attrs in jobs.items():
     1257            if id not in jobs_processed:
     1258                # This one isn't there anymore
     1259                #
     1260                del jobs[ id ]
     1261        self.jobs=jobs
    12621262
    12631263
    12641264class PbsDataGatherer( DataGatherer ):
    12651265
    1266         """This is the DataGatherer for PBS and Torque"""
    1267 
    1268         global PBSQuery, PBSError
    1269 
    1270         def __init__( self ):
    1271 
    1272                 """Setup appropriate variables"""
    1273 
    1274                 self.jobs       = { }
    1275                 self.timeoffset = 0
    1276                 self.dp         = DataProcessor()
    1277 
    1278                 self.initPbsQuery()
    1279 
    1280         def initPbsQuery( self ):
    1281 
    1282                 self.pq         = None
    1283 
    1284                 if( BATCH_SERVER ):
    1285 
    1286                         self.pq         = PBSQuery( BATCH_SERVER )
    1287                 else:
    1288                         self.pq         = PBSQuery()
    1289 
    1290                 try:
    1291                         self.pq.old_data_structure()
    1292 
    1293                 except AttributeError:
    1294 
    1295                         # pbs_query is older
    1296                         #
    1297                         pass
    1298 
    1299         def getJobData( self ):
    1300 
    1301                 """Gather all data on current jobs in Torque"""
    1302 
    1303                 joblist         = {}
    1304                 self.cur_time   = 0
    1305 
    1306                 try:
    1307                         joblist         = self.pq.getjobs()
    1308                         self.cur_time   = time.time()
    1309 
    1310                 except PBSError, detail:
    1311 
    1312                         debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
    1313                         return None
    1314 
    1315                 jobs_processed  = [ ]
    1316 
    1317                 for name, attrs in joblist.items():
    1318                         display_queue           = 1
    1319                         job_id                  = name.split( '.' )[0]
    1320 
    1321                         name                    = self.getAttr( attrs, 'Job_Name' )
    1322                         queue                   = self.getAttr( attrs, 'queue' )
    1323 
    1324                         if QUEUE:
    1325                                 for q in QUEUE:
    1326                                         if q == queue:
    1327                                                 display_queue = 1
    1328                                                 break
    1329                                         else:
    1330                                                 display_queue = 0
    1331                                                 continue
    1332                         if display_queue == 0:
    1333                                 continue
    1334 
    1335 
    1336                         owner                   = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
    1337                         requested_time          = self.getAttr( attrs, 'Resource_List.walltime' )
    1338                         requested_memory        = self.getAttr( attrs, 'Resource_List.mem' )
    1339 
    1340                         mynoderequest           = self.getAttr( attrs, 'Resource_List.nodes' )
    1341 
    1342                         ppn                     = ''
    1343 
    1344                         if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
    1345 
    1346                                 mynoderequest_fields    = mynoderequest.split( ':' )
    1347 
    1348                                 for mynoderequest_field in mynoderequest_fields:
    1349 
    1350                                         if mynoderequest_field.find( 'ppn' ) != -1:
    1351 
    1352                                                 ppn     = mynoderequest_field.split( 'ppn=' )[1]
    1353 
    1354                         status                  = self.getAttr( attrs, 'job_state' )
    1355 
    1356                         if status in [ 'Q', 'R' ]:
    1357 
    1358                                 jobs_processed.append( job_id )
    1359 
    1360                         queued_timestamp        = self.getAttr( attrs, 'ctime' )
    1361 
    1362                         if status == 'R':
    1363 
    1364                                 start_timestamp         = self.getAttr( attrs, 'mtime' )
    1365                                 nodes                   = self.getAttr( attrs, 'exec_host' ).split( '+' )
    1366 
    1367                                 nodeslist               = do_nodelist( nodes )
    1368 
    1369                                 if DETECT_TIME_DIFFS:
    1370 
    1371                                         # If a job start if later than our current date,
    1372                                         # that must mean the Torque server's time is later
    1373                                         # than our local time.
    1374                                
    1375                                         if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
    1376 
    1377                                                 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
    1378 
    1379                         elif status == 'Q':
    1380 
    1381                                 # 'mynodequest' can be a string in the following syntax according to the
    1382                                 # Torque Administator's manual:
    1383                                 #
    1384                                 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
    1385                                 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
    1386                                 # etc
    1387                                 #
    1388 
    1389                                 #
    1390                                 # For now we only count the amount of nodes request and ignore properties
    1391                                 #
    1392 
    1393                                 start_timestamp         = ''
    1394                                 count_mynodes           = 0
    1395 
    1396                                 for node in mynoderequest.split( '+' ):
    1397 
    1398                                         # Just grab the {node_count|hostname} part and ignore properties
    1399                                         #
    1400                                         nodepart        = node.split( ':' )[0]
    1401 
    1402                                         # Let's assume a node_count value
    1403                                         #
    1404                                         numeric_node    = 1
    1405 
    1406                                         # Chop the value up into characters
    1407                                         #
    1408                                         for letter in nodepart:
    1409 
    1410                                                 # If this char is not a digit (0-9), this must be a hostname
    1411                                                 #
    1412                                                 if letter not in string.digits:
    1413 
    1414                                                         numeric_node    = 0
    1415 
    1416                                         # If this is a hostname, just count this as one (1) node
    1417                                         #
    1418                                         if not numeric_node:
    1419 
    1420                                                 count_mynodes   = count_mynodes + 1
    1421                                         else:
    1422 
    1423                                                 # If this a number, it must be the node_count
    1424                                                 # and increase our count with it's value
    1425                                                 #
    1426                                                 try:
    1427                                                         count_mynodes   = count_mynodes + int( nodepart )
    1428 
    1429                                                 except ValueError, detail:
    1430 
    1431                                                         # When we arrive here I must be bugged or very confused
    1432                                                         # THIS SHOULD NOT HAPPEN!
    1433                                                         #
    1434                                                         debug_msg( 10, str( detail ) )
    1435                                                         debug_msg( 10, "Encountered weird node in Resources_List?!" )
    1436                                                         debug_msg( 10, 'nodepart = ' + str( nodepart ) )
    1437                                                         debug_msg( 10, 'job = ' + str( name ) )
    1438                                                         debug_msg( 10, 'attrs = ' + str( attrs ) )
    1439                                                
    1440                                 nodeslist       = str( count_mynodes )
    1441                         else:
    1442                                 start_timestamp = ''
    1443                                 nodeslist       = ''
    1444 
    1445                         myAttrs                         = { }
    1446 
    1447                         myAttrs[ 'name' ]               = str( name )
    1448                         myAttrs[ 'queue' ]              = str( queue )
    1449                         myAttrs[ 'owner' ]              = str( owner )
    1450                         myAttrs[ 'requested_time' ]     = str( requested_time )
    1451                         myAttrs[ 'requested_memory' ]   = str( requested_memory )
    1452                         myAttrs[ 'ppn' ]                = str( ppn )
    1453                         myAttrs[ 'status' ]             = str( status )
    1454                         myAttrs[ 'start_timestamp' ]    = str( start_timestamp )
    1455                         myAttrs[ 'queued_timestamp' ]   = str( queued_timestamp )
    1456                         myAttrs[ 'reported' ]           = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    1457                         myAttrs[ 'nodes' ]              = nodeslist
    1458                         myAttrs[ 'domain' ]             = fqdn_parts( socket.getfqdn() )[1]
    1459                         myAttrs[ 'poll_interval' ]      = str( BATCH_POLL_INTERVAL )
    1460 
    1461                         if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
    1462 
    1463                                 self.jobs[ job_id ]     = myAttrs
    1464 
    1465                 for id, attrs in self.jobs.items():
    1466 
    1467                         if id not in jobs_processed:
    1468 
    1469                                 # This one isn't there anymore; toedeledoki!
    1470                                 #
    1471                                 del self.jobs[ id ]
     1266    """This is the DataGatherer for PBS and Torque"""
     1267
     1268    global PBSQuery, PBSError
     1269
     1270    def __init__( self ):
     1271
     1272        """Setup appropriate variables"""
     1273
     1274        self.jobs    = { }
     1275        self.timeoffset    = 0
     1276        self.dp        = DataProcessor()
     1277
     1278        self.initPbsQuery()
     1279
     1280    def initPbsQuery( self ):
     1281
     1282        self.pq        = None
     1283
     1284        if( BATCH_SERVER ):
     1285
     1286            self.pq        = PBSQuery( BATCH_SERVER )
     1287        else:
     1288            self.pq        = PBSQuery()
     1289
     1290        try:
     1291            self.pq.old_data_structure()
     1292
     1293        except AttributeError:
     1294
     1295            # pbs_query is older
     1296            #
     1297            pass
     1298
     1299    def getJobData( self ):
     1300
     1301        """Gather all data on current jobs in Torque"""
     1302
     1303        joblist        = {}
     1304        self.cur_time    = 0
     1305
     1306        try:
     1307            joblist        = self.pq.getjobs()
     1308            self.cur_time    = time.time()
     1309
     1310        except PBSError, detail:
     1311
     1312            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
     1313            return None
     1314
     1315        jobs_processed    = [ ]
     1316
     1317        for name, attrs in joblist.items():
     1318            display_queue        = 1
     1319            job_id            = name.split( '.' )[0]
     1320
     1321            name            = self.getAttr( attrs, 'Job_Name' )
     1322            queue            = self.getAttr( attrs, 'queue' )
     1323
     1324            if QUEUE:
     1325                for q in QUEUE:
     1326                    if q == queue:
     1327                        display_queue = 1
     1328                        break
     1329                    else:
     1330                        display_queue = 0
     1331                        continue
     1332            if display_queue == 0:
     1333                continue
     1334
     1335
     1336            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
     1337            requested_time        = self.getAttr( attrs, 'Resource_List.walltime' )
     1338            requested_memory    = self.getAttr( attrs, 'Resource_List.mem' )
     1339
     1340            mynoderequest        = self.getAttr( attrs, 'Resource_List.nodes' )
     1341
     1342            ppn            = ''
     1343
     1344            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
     1345
     1346                mynoderequest_fields    = mynoderequest.split( ':' )
     1347
     1348                for mynoderequest_field in mynoderequest_fields:
     1349
     1350                    if mynoderequest_field.find( 'ppn' ) != -1:
     1351
     1352                        ppn    = mynoderequest_field.split( 'ppn=' )[1]
     1353
     1354            status            = self.getAttr( attrs, 'job_state' )
     1355
     1356            if status in [ 'Q', 'R' ]:
     1357
     1358                jobs_processed.append( job_id )
     1359
     1360            queued_timestamp    = self.getAttr( attrs, 'ctime' )
     1361
     1362            if status == 'R':
     1363
     1364                start_timestamp        = self.getAttr( attrs, 'mtime' )
     1365                nodes            = self.getAttr( attrs, 'exec_host' ).split( '+' )
     1366
     1367                nodeslist        = do_nodelist( nodes )
     1368
     1369                if DETECT_TIME_DIFFS:
     1370
     1371                    # If a job start if later than our current date,
     1372                    # that must mean the Torque server's time is later
     1373                    # than our local time.
     1374               
     1375                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
     1376
     1377                        self.timeoffset    = int( int(start_timestamp) - int(self.cur_time) )
     1378
     1379            elif status == 'Q':
     1380
     1381                # 'mynodequest' can be a string in the following syntax according to the
     1382                # Torque Administator's manual:
     1383                #
     1384                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
     1385                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
     1386                # etc
     1387                #
     1388
     1389                #
     1390                # For now we only count the amount of nodes request and ignore properties
     1391                #
     1392
     1393                start_timestamp        = ''
     1394                count_mynodes        = 0
     1395
     1396                for node in mynoderequest.split( '+' ):
     1397
     1398                    # Just grab the {node_count|hostname} part and ignore properties
     1399                    #
     1400                    nodepart    = node.split( ':' )[0]
     1401
     1402                    # Let's assume a node_count value
     1403                    #
     1404                    numeric_node    = 1
     1405
     1406                    # Chop the value up into characters
     1407                    #
     1408                    for letter in nodepart:
     1409
     1410                        # If this char is not a digit (0-9), this must be a hostname
     1411                        #
     1412                        if letter not in string.digits:
     1413
     1414                            numeric_node    = 0
     1415
     1416                    # If this is a hostname, just count this as one (1) node
     1417                    #
     1418                    if not numeric_node:
     1419
     1420                        count_mynodes    = count_mynodes + 1
     1421                    else:
     1422
     1423                        # If this a number, it must be the node_count
     1424                        # and increase our count with it's value
     1425                        #
     1426                        try:
     1427                            count_mynodes    = count_mynodes + int( nodepart )
     1428
     1429                        except ValueError, detail:
     1430
     1431                            # When we arrive here I must be bugged or very confused
     1432                            # THIS SHOULD NOT HAPPEN!
     1433                            #
     1434                            debug_msg( 10, str( detail ) )
     1435                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
     1436                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
     1437                            debug_msg( 10, 'job = ' + str( name ) )
     1438                            debug_msg( 10, 'attrs = ' + str( attrs ) )
     1439                       
     1440                nodeslist    = str( count_mynodes )
     1441            else:
     1442                start_timestamp    = ''
     1443                nodeslist    = ''
     1444
     1445            myAttrs                = { }
     1446
     1447            myAttrs[ 'name' ]        = str( name )
     1448            myAttrs[ 'queue' ]        = str( queue )
     1449            myAttrs[ 'owner' ]        = str( owner )
     1450            myAttrs[ 'requested_time' ]    = str( requested_time )
     1451            myAttrs[ 'requested_memory' ]    = str( requested_memory )
     1452            myAttrs[ 'ppn' ]        = str( ppn )
     1453            myAttrs[ 'status' ]        = str( status )
     1454            myAttrs[ 'start_timestamp' ]    = str( start_timestamp )
     1455            myAttrs[ 'queued_timestamp' ]    = str( queued_timestamp )
     1456            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     1457            myAttrs[ 'nodes' ]        = nodeslist
     1458            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
     1459            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
     1460
     1461            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
     1462
     1463                self.jobs[ job_id ]    = myAttrs
     1464
     1465        for id, attrs in self.jobs.items():
     1466
     1467            if id not in jobs_processed:
     1468
     1469                # This one isn't there anymore; toedeledoki!
     1470                #
     1471                del self.jobs[ id ]
    14721472
    14731473#
     
    14881488GMETRIC_DEFAULT_HOST    = '127.0.0.1'
    14891489GMETRIC_DEFAULT_PORT    = '8649'
    1490 GMETRIC_DEFAULT_UNITS   = ''
     1490GMETRIC_DEFAULT_UNITS    = ''
    14911491
    14921492class Gmetric:
    14931493
    1494         global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
    1495 
    1496         slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
    1497         type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
    1498         protocol        = ( 'udp', 'multicast' )
    1499 
    1500         def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
    1501                
    1502                 global GMETRIC_DEFAULT_TYPE
    1503 
    1504                 self.prot       = self.checkHostProtocol( host )
    1505                 self.msg        = xdrlib.Packer()
    1506                 self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
    1507 
    1508                 if self.prot not in self.protocol:
    1509 
    1510                         raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
    1511 
    1512                 if self.prot == 'multicast':
    1513 
    1514                         # Set multicast options
    1515                         #
    1516                         self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
    1517 
    1518                 self.hostport   = ( host, int( port ) )
    1519                 self.slopestr   = 'both'
    1520                 self.tmax       = 60
    1521 
    1522         def checkHostProtocol( self, ip ):
    1523 
    1524                 """Detect if a ip adress is a multicast address"""
    1525 
    1526                 MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
    1527                 MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
    1528 
    1529                 ip_fields               = ip.split( '.' )
    1530 
    1531                 if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
    1532 
    1533                         return 'multicast'
    1534                 else:
    1535                         return 'udp'
    1536 
    1537         def send( self, name, value, dmax, typestr = '', units = '' ):
    1538 
    1539                 if len( units ) == 0:
    1540                         units           = GMETRIC_DEFAULT_UNITS
    1541 
    1542                 if len( typestr ) == 0:
    1543                         typestr         = GMETRIC_DEFAULT_TYPE
    1544 
    1545                 msg             = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
    1546 
    1547                 return self.socket.sendto( msg, self.hostport )
    1548 
    1549         def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ):
    1550 
    1551                 if slopestr not in self.slope:
    1552 
    1553                         raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
    1554 
    1555                 if typestr not in self.type:
    1556 
    1557                         raise ValueError( "Type must be one of: " + str( self.type ) )
    1558 
    1559                 if len( name ) == 0:
    1560 
    1561                         raise ValueError( "Name must be non-empty" )
    1562 
    1563                 self.msg.reset()
    1564                 self.msg.pack_int( 0 )
    1565                 self.msg.pack_string( typestr )
    1566                 self.msg.pack_string( name )
    1567                 self.msg.pack_string( str( value ) )
    1568                 self.msg.pack_string( unitstr )
    1569                 self.msg.pack_int( self.slope[ slopestr ] )
    1570                 self.msg.pack_uint( int( tmax ) )
    1571                 self.msg.pack_uint( int( dmax ) )
    1572 
    1573                 return self.msg.get_buffer()
     1494    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
     1495
     1496    slope       = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
     1497    type        = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
     1498    protocol    = ( 'udp', 'multicast' )
     1499
     1500    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
     1501       
     1502        global GMETRIC_DEFAULT_TYPE
     1503
     1504        self.prot       = self.checkHostProtocol( host )
     1505        self.msg    = xdrlib.Packer()
     1506        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
     1507
     1508        if self.prot not in self.protocol:
     1509
     1510            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
     1511
     1512        if self.prot == 'multicast':
     1513
     1514            # Set multicast options
     1515            #
     1516            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
     1517
     1518        self.hostport   = ( host, int( port ) )
     1519        self.slopestr   = 'both'
     1520        self.tmax       = 60
     1521
     1522    def checkHostProtocol( self, ip ):
     1523
     1524        """Detect if a ip adress is a multicast address"""
     1525
     1526        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
     1527        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
     1528
     1529        ip_fields           = ip.split( '.' )
     1530
     1531        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
     1532
     1533            return 'multicast'
     1534        else:
     1535            return 'udp'
     1536
     1537    def send( self, name, value, dmax, typestr = '', units = '' ):
     1538
     1539        if len( units ) == 0:
     1540            units        = GMETRIC_DEFAULT_UNITS
     1541
     1542        if len( typestr ) == 0:
     1543            typestr        = GMETRIC_DEFAULT_TYPE
     1544
     1545        msg         = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
     1546
     1547        return self.socket.sendto( msg, self.hostport )
     1548
     1549    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ):
     1550
     1551        if slopestr not in self.slope:
     1552
     1553            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
     1554
     1555        if typestr not in self.type:
     1556
     1557            raise ValueError( "Type must be one of: " + str( self.type ) )
     1558
     1559        if len( name ) == 0:
     1560
     1561            raise ValueError( "Name must be non-empty" )
     1562
     1563        self.msg.reset()
     1564        self.msg.pack_int( 0 )
     1565        self.msg.pack_string( typestr )
     1566        self.msg.pack_string( name )
     1567        self.msg.pack_string( str( value ) )
     1568        self.msg.pack_string( unitstr )
     1569        self.msg.pack_int( self.slope[ slopestr ] )
     1570        self.msg.pack_uint( int( tmax ) )
     1571        self.msg.pack_uint( int( dmax ) )
     1572
     1573        return self.msg.get_buffer()
    15741574
    15751575def printTime( ):
    15761576
    1577         """Print current time/date in human readable format for log/debug"""
    1578 
    1579         return time.strftime("%a, %d %b %Y %H:%M:%S")
     1577    """Print current time/date in human readable format for log/debug"""
     1578
     1579    return time.strftime("%a, %d %b %Y %H:%M:%S")
    15801580
    15811581def debug_msg( level, msg ):
    15821582
    1583         """Print msg if at or above current debug level"""
    1584 
    1585         global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
    1586 
    1587         if (not DAEMONIZE and DEBUG_LEVEL >= level):
    1588                 sys.stderr.write( msg + '\n' )
    1589 
    1590         if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
    1591                 syslog.syslog( msg )
     1583    """Print msg if at or above current debug level"""
     1584
     1585    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
     1586
     1587    if (not DAEMONIZE and DEBUG_LEVEL >= level):
     1588        sys.stderr.write( msg + '\n' )
     1589
     1590    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
     1591        syslog.syslog( msg )
    15921592
    15931593def write_pidfile():
    15941594
    1595         # Write pidfile if PIDFILE is set
    1596         #
    1597         if PIDFILE:
    1598 
    1599                 pid     = os.getpid()
    1600 
    1601                 pidfile = open( PIDFILE, 'w' )
    1602 
    1603                 pidfile.write( str( pid ) )
    1604                 pidfile.close()
     1595    # Write pidfile if PIDFILE is set
     1596    #
     1597    if PIDFILE:
     1598
     1599        pid    = os.getpid()
     1600
     1601        pidfile    = open( PIDFILE, 'w' )
     1602
     1603        pidfile.write( str( pid ) )
     1604        pidfile.close()
    16051605
    16061606def main():
    16071607
    1608         """Application start"""
    1609 
    1610         global PBSQuery, PBSError, lsfObject
    1611         global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
    1612 
    1613         if not processArgs( sys.argv[1:] ):
    1614 
    1615                 sys.exit( 1 )
    1616 
    1617         # Load appropriate DataGatherer depending on which BATCH_API is set
    1618         # and any required modules for the Gatherer
    1619         #
    1620         if BATCH_API == 'pbs':
    1621 
    1622                 try:
    1623                         from PBSQuery import PBSQuery, PBSError
    1624 
    1625                 except ImportError:
    1626 
    1627                         debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
    1628                         sys.exit( 1 )
    1629 
    1630                 gather = PbsDataGatherer()
    1631 
    1632         elif BATCH_API == 'sge':
    1633 
    1634                 # Tested with SGE 6.0u11.
    1635                 #
    1636                 gather = SgeDataGatherer()
    1637 
    1638         elif BATCH_API == 'lsf':
    1639 
    1640                 try:
    1641                         from lsfObject import lsfObject
    1642                 except:
    1643                         debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
    1644                         sys.exit( 1)
    1645 
    1646                 gather = LsfDataGatherer()
    1647 
    1648         else:
    1649                 debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
    1650 
    1651                 sys.exit( 1 )
    1652 
    1653         if( DAEMONIZE and USE_SYSLOG ):
    1654 
    1655                 syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
    1656 
    1657         if DAEMONIZE:
    1658 
    1659                 gather.daemon()
    1660         else:
    1661                 gather.run()
     1608    """Application start"""
     1609
     1610    global PBSQuery, PBSError, lsfObject
     1611    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
     1612
     1613    if not processArgs( sys.argv[1:] ):
     1614
     1615        sys.exit( 1 )
     1616
     1617    # Load appropriate DataGatherer depending on which BATCH_API is set
     1618    # and any required modules for the Gatherer
     1619    #
     1620    if BATCH_API == 'pbs':
     1621
     1622        try:
     1623            from PBSQuery import PBSQuery, PBSError
     1624
     1625        except ImportError:
     1626
     1627            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
     1628            sys.exit( 1 )
     1629
     1630        gather = PbsDataGatherer()
     1631
     1632    elif BATCH_API == 'sge':
     1633
     1634        # Tested with SGE 6.0u11.
     1635        #
     1636        gather = SgeDataGatherer()
     1637
     1638    elif BATCH_API == 'lsf':
     1639
     1640        try:
     1641            from lsfObject import lsfObject
     1642        except:
     1643            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
     1644            sys.exit( 1)
     1645
     1646        gather = LsfDataGatherer()
     1647
     1648    else:
     1649        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
     1650
     1651        sys.exit( 1 )
     1652
     1653    if( DAEMONIZE and USE_SYSLOG ):
     1654
     1655        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
     1656
     1657    if DAEMONIZE:
     1658
     1659        gather.daemon()
     1660    else:
     1661        gather.run()
    16621662
    16631663# wh00t? someone started me! :)
    16641664#
    16651665if __name__ == '__main__':
    1666         main()
     1666    main()
Note: See TracChangeset for help on using the changeset viewer.