#!/usr/bin/env python # # This file is part of Jobmonarch # # Copyright (C) 2006-2013 Ramon Bastiaans # Copyright (C) 2007, 2009 Dave Love (SGE code) # # Jobmonarch is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Jobmonarch is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # SVN $Id: jobmond.py 784 2013-04-04 14:01:03Z ramonb $ # # vi :set ts=4 import sys, getopt, ConfigParser, time, os, socket, string, re import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path from xml.sax.handler import feature_namespaces from collections import deque from glob import glob VERSION='0.4+SVN' def usage( ver ): print 'jobmond %s' %VERSION if ver: return 0 print print 'Purpose:' print ' The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics' print ' to Ganglia, which can be viewed with Job Monarch web frontend' print print 'Usage: jobmond [OPTIONS]' print print ' -c, --config=FILE The configuration file to use (default: /etc/jobmond.conf)' print ' -p, --pidfile=FILE Use pid file to store the process id' print ' -h, --help Print help and exit' print ' -v, --version Print version and exit' print def processArgs( args ): SHORT_L = 'p:hvc:' LONG_L = [ 'help', 'config=', 'pidfile=', 'version' ] global PIDFILE, JOBMOND_CONF PIDFILE = None JOBMOND_CONF = '/etc/jobmond.conf' try: opts, args = getopt.getopt( args, SHORT_L, LONG_L ) except getopt.GetoptError, detail: print detail usage() sys.exit( 1 ) for opt, value in opts: if opt in [ '--config', '-c' ]: JOBMOND_CONF = value if opt in [ '--pidfile', '-p' ]: PIDFILE = value if opt in [ '--help', '-h' ]: usage( False ) sys.exit( 0 ) if opt in [ '--version', '-v' ]: usage( True ) sys.exit( 0 ) return loadConfig( JOBMOND_CONF ) class GangliaConfigParser: def __init__( self, filename ): self.conf_lijst = [ ] self.conf_dict = { } self.filename = filename self.file_pointer = file( filename, 'r' ) self.lexx = shlex.shlex( self.file_pointer ) self.lexx.whitespace_split = True self.parse() def __del__( self ): """ Cleanup: close file descriptor """ self.file_pointer.close() del self.lexx del self.conf_lijst def removeQuotes( self, value ): clean_value = value clean_value = clean_value.replace( "'", "" ) clean_value = clean_value.replace( '"', '' ) clean_value = clean_value.strip() return clean_value def removeBraces( self, value ): clean_value = value clean_value = clean_value.replace( "(", "" ) clean_value = clean_value.replace( ')', '' ) clean_value = clean_value.strip() return clean_value def parse( self ): """ Parse self.filename using shlex scanning. - Removes /* comments */ - Traverses (recursively) through all include () statements - Stores complete valid config tokens in self.conf_list i.e.: ['globals', '{', 'daemonize', '=', 'yes', 'setuid', '=', 'yes', 'user', '=', 'ganglia', 'debug_level', '=', '0', ] """ t = 'bogus' c = False i = False while t != self.lexx.eof: #print 'get token' t = self.lexx.get_token() if len( t ) >= 2: if len( t ) >= 4: if t[:2] == '/*' and t[-2:] == '*/': #print 'comment line' #print 'skipping: %s' %t continue if t == '/*' or t[:2] == '/*': c = True #print 'comment start' #print 'skipping: %s' %t continue if t == '*/' or t[-2:] == '*/': c = False #print 'skipping: %s' %t #print 'comment end' continue if c: #print 'skipping: %s' %t continue if t == 'include': i = True #print 'include start' #print 'skipping: %s' %t continue if i: #print 'include start: %s' %t t2 = self.removeQuotes( t ) t2 = self.removeBraces( t ) for in_file in glob( self.removeQuotes(t2) ): #print 'including file: %s' %in_file parse_infile = GangliaConfigParser( in_file ) self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst() del parse_infile i = False #print 'include end' #print 'skipping: %s' %t continue #print 'keep: %s' %t self.conf_lijst.append( self.removeQuotes(t) ) def getConfLijst( self ): return self.conf_lijst def confListToDict( self, parent_list=None ): """ Recursively traverses a conf_list and creates dictionary from it """ new_dict = { } count = 0 skip = 0 if not parent_list: parent_list = self.conf_lijst #print 'entering confListToDict(): (parent) list size %s' %len(parent_list) for n, c in enumerate( parent_list ): count = count + 1 #print 'CL: n %d c %s' %(n, c) if skip > 0: #print '- skipped' skip = skip - 1 continue if (n+1) <= (len( parent_list )-1): if parent_list[(n+1)] == '{': if not new_dict.has_key( c ): new_dict[ c ] = [ ] (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] ) new_dict[ c ].append( temp_new_dict ) if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1): if not new_dict.has_key( c ): new_dict[ c ] = [ ] new_dict[ c ].append( parent_list[ (n+2) ] ) skip = 2 if parent_list[n] == '}': #print 'leaving confListToDict(): new dict = %s' %new_dict return (new_dict, count) def makeConfDict( self ): """ Walks through self.conf_list and creates a dictionary based upon config values i.e.: 'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'], 'ip': ['"127.0.0.1"'], 'mask': ['32']}]}], 'port': ['8649']}], 'udp_recv_channel': [{'port': ['8649']}], 'udp_send_channel': [{'host': ['145.101.32.3'], 'port': ['8649']}, {'host': ['145.101.32.207'], 'port': ['8649']}]} """ new_dict = { } skip = 0 #print 'entering makeConfDict()' for n, c in enumerate( self.conf_lijst ): #print 'M: n %d c %s' %(n, c) if skip > 0: #print '- skipped' skip = skip - 1 continue if (n+1) <= (len( self.conf_lijst )-1): if self.conf_lijst[(n+1)] == '{': if not new_dict.has_key( c ): new_dict[ c ] = [ ] ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] ) new_dict[ c ].append( temp_new_dict ) if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1): if not new_dict.has_key( c ): new_dict[ c ] = [ ] new_dict[ c ].append( self.conf_lijst[ (n+2) ] ) skip = 2 self.conf_dict = new_dict #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict ) def checkConfDict( self ): if len( self.conf_lijst ) == 0: raise Exception("Something went wrong generating conf list for %s" %self.file_name ) if len( self.conf_dict ) == 0: self.makeConfDict() def getConfDict( self ): self.checkConfDict() return self.conf_dict def getUdpSendChannels( self ): self.checkConfDict() udp_send_channels = [ ] # IP:PORT if not self.conf_dict.has_key( 'udp_send_channel' ): return None for u in self.conf_dict[ 'udp_send_channel' ]: if u.has_key( 'mcast_join' ): ip = u['mcast_join'][0] elif u.has_key( 'host' ): ip = u['host'][0] port = u['port'][0] udp_send_channels.append( ( ip, port ) ) if len( udp_send_channels ) == 0: return None return udp_send_channels def getSectionLastOption( self, section, option ): """ Get last option set in a config section that could be set multiple times in multiple (include) files. i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' ) """ self.checkConfDict() value = None if not self.conf_dict.has_key( section ): return None # Could be set multiple times in multiple (include) files: get last one set for c in self.conf_dict[ section ]: if c.has_key( option ): value = c[ option ][0] return value def getClusterName( self ): return self.getSectionLastOption( 'cluster', 'name' ) def getVal( self, section, option ): return self.getSectionLastOption( section, option ) def getInt( self, section, valname ): value = self.getVal( section, valname ) if not value: return None return int( value ) def getStr( self, section, valname ): value = self.getVal( section, valname ) if not value: return None return str( value ) def findGmetric(): for dir in os.path.expandvars( '$PATH' ).split( ':' ): guess = '%s/%s' %( dir, 'gmetric' ) if os.path.exists( guess ): return guess return False def loadConfig( filename ): def getlist( cfg_string ): my_list = [ ] for item_txt in cfg_string.split( ',' ): sep_char = None item_txt = item_txt.strip() for s_char in [ "'", '"' ]: if item_txt.find( s_char ) != -1: if item_txt.count( s_char ) != 2: print 'Missing quote: %s' %item_txt sys.exit( 1 ) else: sep_char = s_char break if sep_char: item_txt = item_txt.split( sep_char )[1] my_list.append( item_txt ) return my_list if not os.path.isfile( JOBMOND_CONF ): print "Is not a file or does not exist: '%s'" %JOBMOND_CONF sys.exit( 1 ) try: f = open( JOBMOND_CONF, 'r' ) except IOError, detail: print "Cannot read config file: '%s'" %JOBMOND_CONF sys.exit( 1 ) else: f.close() cfg = ConfigParser.ConfigParser() cfg.read( filename ) global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) SYSLOG_LEVEL = -1 SYSLOG_FACILITY = None try: USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' ) except ConfigParser.NoOptionError: USE_SYSLOG = True debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' ) if USE_SYSLOG: try: SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' ) except ConfigParser.NoOptionError: debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' ) SYSLOG_LEVEL = 0 try: SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) ) except ConfigParser.NoOptionError: SYSLOG_FACILITY = syslog.LOG_DAEMON debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' ) try: BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' ) except ConfigParser.NoOptionError: # Backwards compatibility for old configs # BATCH_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' ) api_guess = 'pbs' try: BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' ) except ConfigParser.NoOptionError: # Backwards compatibility for old configs # BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' ) api_guess = 'pbs' try: GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' ) except ConfigParser.NoOptionError: # Not specified: assume /etc/ganglia/gmond.conf # GMOND_CONF = '/etc/ganglia/gmond.conf' ganglia_cfg = GangliaConfigParser( GMOND_CONF ) GMETRIC_TARGET = None GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels() if not GMOND_UDP_SEND_CHANNELS: debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) ) # Couldn't figure it out: let's see if it's in our jobmond.conf # try: GMETRIC_TARGET = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' ) # Guess not: now just give up except ConfigParser.NoOptionError: GMETRIC_TARGET = None debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" ) gmetric_bin = findGmetric() if gmetric_bin: GMETRIC_BINARY = gmetric_bin else: debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" ) try: GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' ) except ConfigParser.NoOptionError: debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" ) sys.exit( 1 ) #TODO: is this really still needed or should be automatic DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' ) BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) ) try: BATCH_API = cfg.get( 'DEFAULT', 'BATCH_API' ) except ConfigParser.NoOptionError, detail: if BATCH_SERVER and api_guess: BATCH_API = api_guess else: debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" ) sys.exit( 1 ) try: QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) ) except ConfigParser.NoOptionError, detail: QUEUE = None METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' ) return True def fqdn_parts (fqdn): """Return pair of host and domain for fully-qualified domain name arg.""" parts = fqdn.split (".") return (parts[0], string.join(parts[1:], ".")) class DataProcessor: """Class for processing of data""" binary = None def __init__( self, binary=None ): """Remember alternate binary location if supplied""" global GMETRIC_BINARY, GMOND_CONF if binary: self.binary = binary if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS: self.binary = GMETRIC_BINARY # Timeout for XML # # From ganglia's documentation: # # 'A metric will be deleted DMAX seconds after it is received, and # DMAX=0 means eternal life.' self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) ) if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS: incompatible = self.checkGmetricVersion() if incompatible: debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.4.0' ) sys.exit( 1 ) def checkGmetricVersion( self ): """ Check version of gmetric is at least 3.4.0 for the syntax we use """ global METRIC_MAX_VAL_LEN, GMETRIC_TARGET incompatible = 0 gfp = os.popen( self.binary + ' --version' ) lines = gfp.readlines() gfp.close() for line in lines: line = line.split( ' ' ) if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1: gmetric_version = line[1].split( '\n' )[0] version_major = int( gmetric_version.split( '.' )[0] ) version_minor = int( gmetric_version.split( '.' )[1] ) version_patch = int( gmetric_version.split( '.' )[2] ) incompatible = 0 if version_major < 3: incompatible = 1 elif version_major == 3: if version_minor < 4: incompatible = 1 return incompatible def multicastGmetric( self, metricname, metricval, valtype='string', units='' ): """Call gmetric binary and multicast""" cmd = self.binary if GMOND_UDP_SEND_CHANNELS: for c_ip, c_port in GMOND_UDP_SEND_CHANNELS: metric_debug = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) ) debug_msg( 10, printTime() + ' ' + metric_debug) gm = Gmetric( c_ip, c_port ) gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units ) elif GMETRIC_TARGET: GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0] GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1] metric_debug = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) ) debug_msg( 10, printTime() + ' ' + metric_debug) gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT ) gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units ) else: try: cmd = cmd + ' -c' + GMOND_CONF except NameError: debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' ) cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax ) if len( units ) > 0: cmd = cmd + ' -u"' + units + '"' debug_msg( 10, printTime() + ' ' + cmd ) os.system( cmd ) class DataGatherer: """Skeleton class for batch system DataGatherer""" def printJobs( self, jobs ): """Print a jobinfo overview""" for name, attrs in self.jobs.items(): print 'job %s' %(name) for name, val in attrs.items(): print '\t%s = %s' %( name, val ) def printJob( self, jobs, job_id ): """Print job with job_id from jobs""" print 'job %s' %(job_id) for name, val in jobs[ job_id ].items(): print '\t%s = %s' %( name, val ) def getAttr( self, attrs, name ): """Return certain attribute from dictionary, if exists""" if attrs.has_key( name ): return attrs[ name ] else: return '' def jobDataChanged( self, jobs, job_id, attrs ): """Check if job with attrs and job_id in jobs has changed""" if jobs.has_key( job_id ): oldData = jobs[ job_id ] else: return 1 for name, val in attrs.items(): if oldData.has_key( name ): if oldData[ name ] != attrs[ name ]: return 1 else: return 1 return 0 def submitJobData( self ): """Submit job info list""" global BATCH_API self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) running_jobs = 0 queued_jobs = 0 # Count how many running/queued jobs we found # for jobid, jobattrs in self.jobs.items(): if jobattrs[ 'status' ] == 'Q': queued_jobs += 1 elif jobattrs[ 'status' ] == 'R': running_jobs += 1 # Report running/queued jobs as seperate metric for a nice RRD graph # self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' ) self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' ) # Report down/offline nodes in batch (PBS only ATM) # if BATCH_API == 'pbs': domain = fqdn_parts( socket.getfqdn() )[1] downed_nodes = list() offline_nodes = list() l = ['state'] for name, node in self.pq.getnodes().items(): if ( node[ 'state' ].find( "down" ) != -1 ): downed_nodes.append( name ) if ( node[ 'state' ].find( "offline" ) != -1 ): offline_nodes.append( name ) downnodeslist = do_nodelist( downed_nodes ) offlinenodeslist = do_nodelist( offline_nodes ) down_str = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) offl_str = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) self.dp.multicastGmetric( 'zplugin_monarch_down' , down_str ) self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str ) # Now let's spread the knowledge # for jobid, jobattrs in self.jobs.items(): # Make gmetric values for each job: respect max gmetric value length # gmetric_val = self.compileGmetricVal( jobid, jobattrs ) metric_increment = 0 # If we have more job info than max gmetric value length allows, split it up # amongst multiple metrics # for val in gmetric_val: metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) ) self.dp.multicastGmetric( metric_name, val ) # Increase follow number if this jobinfo is split up amongst more than 1 gmetric # metric_increment = metric_increment + 1 def compileGmetricVal( self, jobid, jobattrs ): """Create a val string for gmetric of jobinfo""" gval_lists = [ ] val_list = { } for val_name, val_value in jobattrs.items(): # These are our own metric names, i.e.: status, start_timestamp, etc # val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) # These are their corresponding values # val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) if val_name == 'nodes' and jobattrs['status'] == 'R': node_str = None for node in val_value: if node_str: node_str = node_str + ';' + node else: node_str = node # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN # if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: # It's too big, we need to make a new gmetric for the additional info # val_list[ val_name ] = node_str gval_lists.append( val_list ) val_list = { } node_str = None val_list[ val_name ] = node_str gval_lists.append( val_list ) val_list = { } elif val_value != '': # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN # if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: # It's too big, we need to make a new gmetric for the additional info # gval_lists.append( val_list ) val_list = { } val_list[ val_name ] = val_value if len( val_list ) > 0: gval_lists.append( val_list ) str_list = [ ] # Now append the value names and values together, i.e.: stop_timestamp=value, etc # for val_list in gval_lists: my_val_str = None for val_name, val_value in val_list.items(): if type(val_value) == list: val_value = val_value.join( ',' ) if my_val_str: try: # fixme: It's getting # ('nodes', None) items my_val_str = my_val_str + ' ' + val_name + '=' + val_value except: pass else: my_val_str = val_name + '=' + val_value str_list.append( my_val_str ) return str_list def daemon( self ): """Run as daemon forever""" # Fork the first child # pid = os.fork() if pid > 0: sys.exit(0) # end parent # creates a session and sets the process group ID # os.setsid() # Fork the second child # pid = os.fork() if pid > 0: sys.exit(0) # end parent write_pidfile() # Go to the root directory and set the umask # os.chdir('/') os.umask(0) sys.stdin.close() sys.stdout.close() sys.stderr.close() os.open('/dev/null', os.O_RDWR) os.dup2(0, 1) os.dup2(0, 2) self.run() def run( self ): """Main thread""" while ( 1 ): self.getJobData() self.submitJobData() time.sleep( BATCH_POLL_INTERVAL ) # SGE code by Dave Love . Tested with SGE 6.0u8 and 6.0u11. May # work with SGE 6.1 (else should be easily fixable), but definitely doesn't # with 6.2. See also the fixmes. class NoJobs (Exception): """Exception raised by empty job list in qstat output.""" pass class SgeQstatXMLParser(xml.sax.handler.ContentHandler): """SAX handler for XML output from Sun Grid Engine's `qstat'.""" def __init__(self): self.value = "" self.joblist = [] self.job = {} self.queue = "" self.in_joblist = False self.lrequest = False self.eltq = deque() xml.sax.handler.ContentHandler.__init__(self) # The structure of the output is as follows (for SGE 6.0). It's # similar for 6.1, but radically different for 6.2, and is # undocumented generally. Unfortunately it's voluminous, and probably # doesn't scale to large clusters/queues. # # # # ... # # # ... # # # # # ...