#!/usr/bin/env python # # This file is part of Jobmonarch # # Copyright (C) 2006-2007 Ramon Bastiaans # Copyright (C) 2007, 2009 Dave Love (SGE code) # # Jobmonarch is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Jobmonarch is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # SVN $Id: jobmond.py 658 2011-02-22 14:32:10Z ramonb $ # import sys, getopt, ConfigParser, time, os, socket, string, re import xdrlib, socket, syslog, xml, xml.sax from xml.sax.handler import feature_namespaces from collections import deque VERSION='0.3.1+SVN' def usage( ver ): print 'jobmond %s' %VERSION if ver: return 0 print print 'Purpose:' print ' The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics' print ' to Ganglia, which can be viewed with Job Monarch web frontend' print print 'Usage: jobmond [OPTIONS]' print print ' -c, --config=FILE The configuration file to use (default: /etc/jobmond.conf)' print ' -p, --pidfile=FILE Use pid file to store the process id' print ' -h, --help Print help and exit' print ' -v, --version Print version and exit' print def processArgs( args ): SHORT_L = 'p:hvc:' LONG_L = [ 'help', 'config=', 'pidfile=', 'version' ] global PIDFILE PIDFILE = None config_filename = '/etc/jobmond.conf' try: opts, args = getopt.getopt( args, SHORT_L, LONG_L ) except getopt.GetoptError, detail: print detail usage() sys.exit( 1 ) for opt, value in opts: if opt in [ '--config', '-c' ]: config_filename = value if opt in [ '--pidfile', '-p' ]: PIDFILE = value if opt in [ '--help', '-h' ]: usage( False ) sys.exit( 0 ) if opt in [ '--version', '-v' ]: usage( True ) sys.exit( 0 ) return loadConfig( config_filename ) # Fixme: This doesn't DTRT with commented-out bits of the file. E.g. # it picked up a commented-out `mcast_join' and tried to use a # multicast channel when it shouldn't have done. class GangliaConfigParser: def __init__( self, config_file ): self.config_file = config_file if not os.path.exists( self.config_file ): debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" ) sys.exit( 1 ) def removeQuotes( self, value ): clean_value = value clean_value = clean_value.replace( "'", "" ) clean_value = clean_value.replace( '"', '' ) clean_value = clean_value.strip() return clean_value def getVal( self, section, valname ): cfg_fp = open( self.config_file ) section_start = False section_found = False value = None for line in cfg_fp.readlines(): if line.find( section ) != -1: section_found = True if line.find( '{' ) != -1 and section_found: section_start = True if line.find( '}' ) != -1 and section_found: section_start = False section_found = False if line.find( valname ) != -1 and section_start: value = string.join( line.split( '=' )[1:], '' ).strip() cfg_fp.close() return value def getInt( self, section, valname ): value = self.getVal( section, valname ) if not value: return False value = self.removeQuotes( value ) return int( value ) def getStr( self, section, valname ): value = self.getVal( section, valname ) if not value: return False value = self.removeQuotes( value ) return str( value ) def findGmetric(): for dir in os.path.expandvars( '$PATH' ).split( ':' ): guess = '%s/%s' %( dir, 'gmetric' ) if os.path.exists( guess ): return guess return False def loadConfig( filename ): def getlist( cfg_string ): my_list = [ ] for item_txt in cfg_string.split( ',' ): sep_char = None item_txt = item_txt.strip() for s_char in [ "'", '"' ]: if item_txt.find( s_char ) != -1: if item_txt.count( s_char ) != 2: print 'Missing quote: %s' %item_txt sys.exit( 1 ) else: sep_char = s_char break if sep_char: item_txt = item_txt.split( sep_char )[1] my_list.append( item_txt ) return my_list cfg = ConfigParser.ConfigParser() cfg.read( filename ) global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) SYSLOG_LEVEL = -1 SYSLOG_FACILITY = None try: USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' ) except ConfigParser.NoOptionError: USE_SYSLOG = True debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' ) if USE_SYSLOG: try: SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' ) except ConfigParser.NoOptionError: debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' ) SYSLOG_LEVEL = 0 try: SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) ) except ConfigParser.NoOptionError: SYSLOG_FACILITY = syslog.LOG_DAEMON debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' ) try: BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' ) except ConfigParser.NoOptionError: # Backwards compatibility for old configs # BATCH_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' ) api_guess = 'pbs' try: BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' ) except ConfigParser.NoOptionError: # Backwards compatibility for old configs # BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' ) api_guess = 'pbs' try: GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' ) except ConfigParser.NoOptionError: # Not specified: assume /etc/gmond.conf # GMOND_CONF = '/etc/gmond.conf' ganglia_cfg = GangliaConfigParser( GMOND_CONF ) # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF # gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' ) if not gmetric_dest_ip: # Maybe unicast target then # gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'host' ) gmetric_dest_port = ganglia_cfg.getStr( 'udp_send_channel', 'port' ) if gmetric_dest_ip and gmetric_dest_port: GMETRIC_TARGET = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port ) else: debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF ) # Couldn't figure it out: let's see if it's in our jobmond.conf # try: GMETRIC_TARGET = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' ) # Guess not: now just give up # except ConfigParser.NoOptionError: GMETRIC_TARGET = None debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" ) gmetric_bin = findGmetric() if gmetric_bin: GMETRIC_BINARY = gmetric_bin else: debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" ) try: GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' ) except ConfigParser.NoOptionError: debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" ) sys.exit( 1 ) DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' ) BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) ) try: BATCH_API = cfg.get( 'DEFAULT', 'BATCH_API' ) except ConfigParser.NoOptionError, detail: if BATCH_SERVER and api_guess: BATCH_API = api_guess else: debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" ) sys.exit( 1 ) try: QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) ) except ConfigParser.NoOptionError, detail: QUEUE = None return True def fqdn_parts (fqdn): """Return pair of host and domain for fully-qualified domain name arg.""" parts = fqdn.split (".") return (parts[0], string.join(parts[1:], ".")) METRIC_MAX_VAL_LEN = 900 class DataProcessor: """Class for processing of data""" binary = None def __init__( self, binary=None ): """Remember alternate binary location if supplied""" global GMETRIC_BINARY, GMOND_CONF if binary: self.binary = binary if not self.binary: self.binary = GMETRIC_BINARY # Timeout for XML # # From ganglia's documentation: # # 'A metric will be deleted DMAX seconds after it is received, and # DMAX=0 means eternal life.' self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) ) if GMOND_CONF: incompatible = self.checkGmetricVersion() if incompatible: debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' ) sys.exit( 1 ) def checkGmetricVersion( self ): """ Check version of gmetric is at least 3.0.1 for the syntax we use """ global METRIC_MAX_VAL_LEN, GMETRIC_TARGET incompatible = 0 gfp = os.popen( self.binary + ' --version' ) lines = gfp.readlines() gfp.close() for line in lines: line = line.split( ' ' ) if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1: gmetric_version = line[1].split( '\n' )[0] version_major = int( gmetric_version.split( '.' )[0] ) version_minor = int( gmetric_version.split( '.' )[1] ) version_patch = int( gmetric_version.split( '.' )[2] ) incompatible = 0 if version_major < 3: incompatible = 1 elif version_major == 3: if version_minor == 0: if version_patch < 1: incompatible = 1 # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length # if version_patch < 3: METRIC_MAX_VAL_LEN = 900 elif version_patch >= 3: METRIC_MAX_VAL_LEN = 1400 elif version_minor == 1: debug_msg( 0, 'Gmetric 3.1 detected, internal gmetric handling disabled. Failing back to gmetric binary' ) METRIC_MAX_VAL_LEN = 500 # We don't speak 3.1 gmetric so use binary # GMETRIC_TARGET = None return incompatible def multicastGmetric( self, metricname, metricval, valtype='string', units='' ): """Call gmetric binary and multicast""" cmd = self.binary if GMETRIC_TARGET: GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0] GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1] metric_debug = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) ) debug_msg( 10, printTime() + ' ' + metric_debug) gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT ) gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units ) else: try: cmd = cmd + ' -c' + GMOND_CONF except NameError: debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' ) cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax ) if len( units ) > 0: cmd = cmd + ' -u"' + units + '"' debug_msg( 10, printTime() + ' ' + cmd ) os.system( cmd ) class DataGatherer: """Skeleton class for batch system DataGatherer""" def printJobs( self, jobs ): """Print a jobinfo overview""" for name, attrs in self.jobs.items(): print 'job %s' %(name) for name, val in attrs.items(): print '\t%s = %s' %( name, val ) def printJob( self, jobs, job_id ): """Print job with job_id from jobs""" print 'job %s' %(job_id) for name, val in jobs[ job_id ].items(): print '\t%s = %s' %( name, val ) def getAttr( self, attrs, name ): """Return certain attribute from dictionary, if exists""" if attrs.has_key( name ): return attrs[ name ] else: return '' def jobDataChanged( self, jobs, job_id, attrs ): """Check if job with attrs and job_id in jobs has changed""" if jobs.has_key( job_id ): oldData = jobs[ job_id ] else: return 1 for name, val in attrs.items(): if oldData.has_key( name ): if oldData[ name ] != attrs[ name ]: return 1 else: return 1 return 0 def submitJobData( self ): """Submit job info list""" global BATCH_API self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) running_jobs = 0 queued_jobs = 0 # Count how many running/queued jobs we found # for jobid, jobattrs in self.jobs.items(): if jobattrs[ 'status' ] == 'Q': queued_jobs += 1 elif jobattrs[ 'status' ] == 'R': running_jobs += 1 # Report running/queued jobs as seperate metric for a nice RRD graph # self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' ) self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' ) # Report down/offline nodes in batch (PBS only ATM) # if BATCH_API == 'pbs': domain = fqdn_parts( socket.getfqdn() )[1] downed_nodes = list() offline_nodes = list() l = ['state'] for name, node in self.pq.getnodes().items(): if ( node[ 'state' ].find( "down" ) != -1 ): downed_nodes.append( name ) if ( node[ 'state' ].find( "offline" ) != -1 ): offline_nodes.append( name ) downnodeslist = do_nodelist( downed_nodes ) offlinenodeslist = do_nodelist( offline_nodes ) down_str = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) offl_str = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) self.dp.multicastGmetric( 'MONARCH-DOWN' , down_str ) self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str ) # Now let's spread the knowledge # for jobid, jobattrs in self.jobs.items(): # Make gmetric values for each job: respect max gmetric value length # gmetric_val = self.compileGmetricVal( jobid, jobattrs ) metric_increment = 0 # If we have more job info than max gmetric value length allows, split it up # amongst multiple metrics # for val in gmetric_val: self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) # Increase follow number if this jobinfo is split up amongst more than 1 gmetric # metric_increment = metric_increment + 1 def compileGmetricVal( self, jobid, jobattrs ): """Create a val string for gmetric of jobinfo""" gval_lists = [ ] val_list = { } for val_name, val_value in jobattrs.items(): # These are our own metric names, i.e.: status, start_timestamp, etc # val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) # These are their corresponding values # val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) if val_name == 'nodes' and jobattrs['status'] == 'R': node_str = None for node in val_value: if node_str: node_str = node_str + ';' + node else: node_str = node # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN # if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: # It's too big, we need to make a new gmetric for the additional info # val_list[ val_name ] = node_str gval_lists.append( val_list ) val_list = { } node_str = None val_list[ val_name ] = node_str gval_lists.append( val_list ) val_list = { } elif val_value != '': # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN # if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: # It's too big, we need to make a new gmetric for the additional info # gval_lists.append( val_list ) val_list = { } val_list[ val_name ] = val_value if len( val_list ) > 0: gval_lists.append( val_list ) str_list = [ ] # Now append the value names and values together, i.e.: stop_timestamp=value, etc # for val_list in gval_lists: my_val_str = None for val_name, val_value in val_list.items(): if type(val_value) == list: val_value = val_value.join( ',' ) if my_val_str: try: # fixme: It's getting # ('nodes', None) items my_val_str = my_val_str + ' ' + val_name + '=' + val_value except: pass else: my_val_str = val_name + '=' + val_value str_list.append( my_val_str ) return str_list def daemon( self ): """Run as daemon forever""" # Fork the first child # pid = os.fork() if pid > 0: sys.exit(0) # end parent # creates a session and sets the process group ID # os.setsid() # Fork the second child # pid = os.fork() if pid > 0: sys.exit(0) # end parent write_pidfile() # Go to the root directory and set the umask # os.chdir('/') os.umask(0) sys.stdin.close() sys.stdout.close() sys.stderr.close() os.open('/dev/null', os.O_RDWR) os.dup2(0, 1) os.dup2(0, 2) self.run() def run( self ): """Main thread""" while ( 1 ): self.getJobData() self.submitJobData() time.sleep( BATCH_POLL_INTERVAL ) # SGE code by Dave Love . Tested with SGE 6.0u8 and 6.0u11. May # work with SGE 6.1 (else should be easily fixable), but definitely doesn't # with 6.2. See also the fixmes. class NoJobs (Exception): """Exception raised by empty job list in qstat output.""" pass class SgeQstatXMLParser(xml.sax.handler.ContentHandler): """SAX handler for XML output from Sun Grid Engine's `qstat'.""" def __init__(self): self.value = "" self.joblist = [] self.job = {} self.queue = "" self.in_joblist = False self.lrequest = False self.eltq = deque() xml.sax.handler.ContentHandler.__init__(self) # The structure of the output is as follows (for SGE 6.0). It's # similar for 6.1, but radically different for 6.2, and is # undocumented generally. Unfortunately it's voluminous, and probably # doesn't scale to large clusters/queues. # # # # ... # # # ... # # # # # ...