Changeset 691 for branches/0.4/jobmond/jobmond.py
- Timestamp:
- 03/20/13 11:04:07 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/0.4/jobmond/jobmond.py
r658 r691 3 3 # This file is part of Jobmonarch 4 4 # 5 # Copyright (C) 2006-20 07Ramon Bastiaans5 # Copyright (C) 2006-2013 Ramon Bastiaans 6 6 # Copyright (C) 2007, 2009 Dave Love (SGE code) 7 7 # … … 28 28 from collections import deque 29 29 30 VERSION='0. 3.1+SVN'30 VERSION='0.4+SVN' 31 31 32 32 def usage( ver ): 33 33 34 35 36 37 38 39 40 41 42 43 44 print 'Usage:jobmond [OPTIONS]'45 46 print ' -c, --config=FILEThe configuration file to use (default: /etc/jobmond.conf)'47 print ' -p, --pidfile=FILEUse pid file to store the process id'48 print ' -h, --helpPrint help and exit'49 print ' -v, --versionPrint version and exit'50 34 print 'jobmond %s' %VERSION 35 36 if ver: 37 return 0 38 39 print 40 print 'Purpose:' 41 print ' The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics' 42 print ' to Ganglia, which can be viewed with Job Monarch web frontend' 43 print 44 print 'Usage: jobmond [OPTIONS]' 45 print 46 print ' -c, --config=FILE The configuration file to use (default: /etc/jobmond.conf)' 47 print ' -p, --pidfile=FILE Use pid file to store the process id' 48 print ' -h, --help Print help and exit' 49 print ' -v, --version Print version and exit' 50 print 51 51 52 52 def processArgs( args ): 53 53 54 SHORT_L= 'p:hvc:'55 LONG_L= [ 'help', 'config=', 'pidfile=', 'version' ]56 57 58 PIDFILE= None59 60 config_filename= '/etc/jobmond.conf'61 62 63 64 opts, args= getopt.getopt( args, SHORT_L, LONG_L )65 66 67 68 69 70 71 72 73 74 75 76 config_filename= value77 78 79 80 PIDFILE= value81 82 54 SHORT_L = 'p:hvc:' 55 LONG_L = [ 'help', 'config=', 'pidfile=', 'version' ] 56 57 global PIDFILE 58 PIDFILE = None 59 60 config_filename = '/etc/jobmond.conf' 61 62 try: 63 64 opts, args = getopt.getopt( args, SHORT_L, LONG_L ) 65 66 except getopt.GetoptError, detail: 67 68 print detail 69 usage() 70 sys.exit( 1 ) 71 72 for opt, value in opts: 73 74 if opt in [ '--config', '-c' ]: 75 76 config_filename = value 77 78 if opt in [ '--pidfile', '-p' ]: 79 80 PIDFILE = value 81 82 if opt in [ '--help', '-h' ]: 83 83 84 85 86 87 88 89 90 91 92 84 usage( False ) 85 sys.exit( 0 ) 86 87 if opt in [ '--version', '-v' ]: 88 89 usage( True ) 90 sys.exit( 0 ) 91 92 return loadConfig( config_filename ) 93 93 94 94 # Fixme: This doesn't DTRT with commented-out bits of the file. E.g. … … 97 97 class GangliaConfigParser: 98 98 99 100 101 self.config_file= config_file102 103 104 105 106 107 108 109 110 clean_value= value111 clean_value= clean_value.replace( "'", "" )112 clean_value= clean_value.replace( '"', '' )113 clean_value= clean_value.strip()114 115 116 117 118 119 cfg_fp= open( self.config_file )120 section_start= False121 section_found= False122 value= None123 124 125 126 127 128 section_found= True129 130 131 132 section_start= True133 134 135 136 section_start= False137 section_found= False138 139 140 141 value= string.join( line.split( '=' )[1:], '' ).strip()142 143 144 145 146 147 148 149 value= self.getVal( section, valname )150 151 152 153 154 value= self.removeQuotes( value )155 156 157 158 159 160 value= self.getVal( section, valname )161 162 163 164 165 value= self.removeQuotes( value )166 167 99 def __init__( self, config_file ): 100 101 self.config_file = config_file 102 103 if not os.path.exists( self.config_file ): 104 105 debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" ) 106 sys.exit( 1 ) 107 108 def removeQuotes( self, value ): 109 110 clean_value = value 111 clean_value = clean_value.replace( "'", "" ) 112 clean_value = clean_value.replace( '"', '' ) 113 clean_value = clean_value.strip() 114 115 return clean_value 116 117 def getVal( self, section, valname ): 118 119 cfg_fp = open( self.config_file ) 120 section_start = False 121 section_found = False 122 value = None 123 124 for line in cfg_fp.readlines(): 125 126 if line.find( section ) != -1: 127 128 section_found = True 129 130 if line.find( '{' ) != -1 and section_found: 131 132 section_start = True 133 134 if line.find( '}' ) != -1 and section_found: 135 136 section_start = False 137 section_found = False 138 139 if line.find( valname ) != -1 and section_start: 140 141 value = string.join( line.split( '=' )[1:], '' ).strip() 142 143 cfg_fp.close() 144 145 return value 146 147 def getInt( self, section, valname ): 148 149 value = self.getVal( section, valname ) 150 151 if not value: 152 return False 153 154 value = self.removeQuotes( value ) 155 156 return int( value ) 157 158 def getStr( self, section, valname ): 159 160 value = self.getVal( section, valname ) 161 162 if not value: 163 return False 164 165 value = self.removeQuotes( value ) 166 167 return str( value ) 168 168 169 169 def findGmetric(): 170 170 171 172 173 guess= '%s/%s' %( dir, 'gmetric' )174 175 176 177 178 179 171 for dir in os.path.expandvars( '$PATH' ).split( ':' ): 172 173 guess = '%s/%s' %( dir, 'gmetric' ) 174 175 if os.path.exists( guess ): 176 177 return guess 178 179 return False 180 180 181 181 def loadConfig( filename ): 182 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 cfg= ConfigParser.ConfigParser()216 217 218 219 220 221 222 223 224 DEBUG_LEVEL= cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )225 226 DAEMONIZE= cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )227 228 SYSLOG_LEVEL= -1229 SYSLOG_FACILITY= None230 231 232 USE_SYSLOG= cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )233 234 235 236 USE_SYSLOG= True237 238 239 240 241 242 243 SYSLOG_LEVEL= cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )244 245 246 247 248 SYSLOG_LEVEL= 0249 250 251 252 253 254 255 256 257 258 259 260 261 262 BATCH_SERVER= cfg.get( 'DEFAULT', 'BATCH_SERVER' )263 264 265 266 267 268 269 BATCH_SERVER= cfg.get( 'DEFAULT', 'TORQUE_SERVER' )270 api_guess= 'pbs'271 272 273 274 BATCH_POLL_INTERVAL= cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )275 276 277 278 279 280 281 BATCH_POLL_INTERVAL= cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )282 api_guess= 'pbs'283 284 285 286 GMOND_CONF= cfg.get( 'DEFAULT', 'GMOND_CONF' )287 288 289 290 291 292 GMOND_CONF= '/etc/gmond.conf'293 294 ganglia_cfg= GangliaConfigParser( GMOND_CONF )295 296 297 298 gmetric_dest_ip= ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )299 300 301 302 303 304 gmetric_dest_ip= ganglia_cfg.getStr( 'udp_send_channel', 'host' )305 306 gmetric_dest_port= ganglia_cfg.getStr( 'udp_send_channel', 'port' )307 308 309 310 GMETRIC_TARGET= '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )311 312 313 314 315 316 317 318 319 GMETRIC_TARGET= cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )320 321 322 323 324 325 GMETRIC_TARGET= None326 327 328 329 gmetric_bin= findGmetric()330 331 332 333 GMETRIC_BINARY= gmetric_bin334 335 336 337 338 339 GMETRIC_BINARY= cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )340 341 342 343 344 345 346 DETECT_TIME_DIFFS= cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )347 348 BATCH_HOST_TRANSLATE= getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )349 350 351 352 BATCH_API= cfg.get( 'DEFAULT', 'BATCH_API' )353 354 355 356 357 358 BATCH_API= api_guess359 360 361 362 363 364 365 QUEUE= getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )366 367 368 369 QUEUE= None370 371 183 def getlist( cfg_string ): 184 185 my_list = [ ] 186 187 for item_txt in cfg_string.split( ',' ): 188 189 sep_char = None 190 191 item_txt = item_txt.strip() 192 193 for s_char in [ "'", '"' ]: 194 195 if item_txt.find( s_char ) != -1: 196 197 if item_txt.count( s_char ) != 2: 198 199 print 'Missing quote: %s' %item_txt 200 sys.exit( 1 ) 201 202 else: 203 204 sep_char = s_char 205 break 206 207 if sep_char: 208 209 item_txt = item_txt.split( sep_char )[1] 210 211 my_list.append( item_txt ) 212 213 return my_list 214 215 cfg = ConfigParser.ConfigParser() 216 217 cfg.read( filename ) 218 219 global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL 220 global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE 221 global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG 222 global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY 223 224 DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) 225 226 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) 227 228 SYSLOG_LEVEL = -1 229 SYSLOG_FACILITY = None 230 231 try: 232 USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' ) 233 234 except ConfigParser.NoOptionError: 235 236 USE_SYSLOG = True 237 238 debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' ) 239 240 if USE_SYSLOG: 241 242 try: 243 SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' ) 244 245 except ConfigParser.NoOptionError: 246 247 debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' ) 248 SYSLOG_LEVEL = 0 249 250 try: 251 252 SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) ) 253 254 except ConfigParser.NoOptionError: 255 256 SYSLOG_FACILITY = syslog.LOG_DAEMON 257 258 debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' ) 259 260 try: 261 262 BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' ) 263 264 except ConfigParser.NoOptionError: 265 266 # Backwards compatibility for old configs 267 # 268 269 BATCH_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' ) 270 api_guess = 'pbs' 271 272 try: 273 274 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' ) 275 276 except ConfigParser.NoOptionError: 277 278 # Backwards compatibility for old configs 279 # 280 281 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' ) 282 api_guess = 'pbs' 283 284 try: 285 286 GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' ) 287 288 except ConfigParser.NoOptionError: 289 290 # Not specified: assume /etc/gmond.conf 291 # 292 GMOND_CONF = '/etc/gmond.conf' 293 294 ganglia_cfg = GangliaConfigParser( GMOND_CONF ) 295 296 # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF 297 # 298 gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' ) 299 300 if not gmetric_dest_ip: 301 302 # Maybe unicast target then 303 # 304 gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'host' ) 305 306 gmetric_dest_port = ganglia_cfg.getStr( 'udp_send_channel', 'port' ) 307 308 if gmetric_dest_ip and gmetric_dest_port: 309 310 GMETRIC_TARGET = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port ) 311 else: 312 313 debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF ) 314 315 # Couldn't figure it out: let's see if it's in our jobmond.conf 316 # 317 try: 318 319 GMETRIC_TARGET = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' ) 320 321 # Guess not: now just give up 322 # 323 except ConfigParser.NoOptionError: 324 325 GMETRIC_TARGET = None 326 327 debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" ) 328 329 gmetric_bin = findGmetric() 330 331 if gmetric_bin: 332 333 GMETRIC_BINARY = gmetric_bin 334 else: 335 debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" ) 336 337 try: 338 339 GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' ) 340 341 except ConfigParser.NoOptionError: 342 343 debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" ) 344 sys.exit( 1 ) 345 346 DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' ) 347 348 BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) ) 349 350 try: 351 352 BATCH_API = cfg.get( 'DEFAULT', 'BATCH_API' ) 353 354 except ConfigParser.NoOptionError, detail: 355 356 if BATCH_SERVER and api_guess: 357 358 BATCH_API = api_guess 359 else: 360 debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" ) 361 sys.exit( 1 ) 362 363 try: 364 365 QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) ) 366 367 except ConfigParser.NoOptionError, detail: 368 369 QUEUE = None 370 371 return True 372 372 373 373 def fqdn_parts (fqdn): 374 374 375 376 377 378 379 375 """Return pair of host and domain for fully-qualified domain name arg.""" 376 377 parts = fqdn.split (".") 378 379 return (parts[0], string.join(parts[1:], ".")) 380 380 381 381 METRIC_MAX_VAL_LEN = 900 … … 383 383 class DataProcessor: 384 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 incompatible= 0429 430 gfp= os.popen( self.binary + ' --version' )431 lines= gfp.readlines()432 433 434 435 436 437 438 439 440 441 gmetric_version= line[1].split( '\n' )[0]442 443 version_major= int( gmetric_version.split( '.' )[0] )444 version_minor= int( gmetric_version.split( '.' )[1] )445 version_patch= int( gmetric_version.split( '.' )[2] )446 447 incompatible= 0448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 GMETRIC_TARGET_HOST= GMETRIC_TARGET.split( ':' )[0]492 GMETRIC_TARGET_PORT= GMETRIC_TARGET.split( ':' )[1]493 494 metric_debug= "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 385 """Class for processing of data""" 386 387 binary = None 388 389 def __init__( self, binary=None ): 390 391 """Remember alternate binary location if supplied""" 392 393 global GMETRIC_BINARY, GMOND_CONF 394 395 if binary: 396 self.binary = binary 397 398 if not self.binary: 399 self.binary = GMETRIC_BINARY 400 401 # Timeout for XML 402 # 403 # From ganglia's documentation: 404 # 405 # 'A metric will be deleted DMAX seconds after it is received, and 406 # DMAX=0 means eternal life.' 407 408 self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) ) 409 410 if GMOND_CONF: 411 412 incompatible = self.checkGmetricVersion() 413 414 if incompatible: 415 416 debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' ) 417 sys.exit( 1 ) 418 419 def checkGmetricVersion( self ): 420 421 """ 422 Check version of gmetric is at least 3.0.1 423 for the syntax we use 424 """ 425 426 global METRIC_MAX_VAL_LEN, GMETRIC_TARGET 427 428 incompatible = 0 429 430 gfp = os.popen( self.binary + ' --version' ) 431 lines = gfp.readlines() 432 433 gfp.close() 434 435 for line in lines: 436 437 line = line.split( ' ' ) 438 439 if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1: 440 441 gmetric_version = line[1].split( '\n' )[0] 442 443 version_major = int( gmetric_version.split( '.' )[0] ) 444 version_minor = int( gmetric_version.split( '.' )[1] ) 445 version_patch = int( gmetric_version.split( '.' )[2] ) 446 447 incompatible = 0 448 449 if version_major < 3: 450 451 incompatible = 1 452 453 elif version_major == 3: 454 455 if version_minor == 0: 456 457 if version_patch < 1: 458 459 incompatible = 1 460 461 # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length 462 # 463 if version_patch < 3: 464 465 METRIC_MAX_VAL_LEN = 900 466 467 elif version_patch >= 3: 468 469 METRIC_MAX_VAL_LEN = 1400 470 471 elif version_minor == 1: 472 473 debug_msg( 0, 'Gmetric 3.1 detected, internal gmetric handling disabled. Failing back to gmetric binary' ) 474 475 METRIC_MAX_VAL_LEN = 500 476 477 # We don't speak 3.1 gmetric so use binary 478 # 479 GMETRIC_TARGET = None 480 481 return incompatible 482 483 def multicastGmetric( self, metricname, metricval, valtype='string', units='' ): 484 485 """Call gmetric binary and multicast""" 486 487 cmd = self.binary 488 489 if GMETRIC_TARGET: 490 491 GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0] 492 GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1] 493 494 metric_debug = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) ) 495 496 debug_msg( 10, printTime() + ' ' + metric_debug) 497 498 gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT ) 499 500 gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units ) 501 502 else: 503 try: 504 cmd = cmd + ' -c' + GMOND_CONF 505 506 except NameError: 507 508 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' ) 509 510 cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax ) 511 512 if len( units ) > 0: 513 514 cmd = cmd + ' -u"' + units + '"' 515 516 debug_msg( 10, printTime() + ' ' + cmd ) 517 518 os.system( cmd ) 519 519 520 520 class DataGatherer: 521 521 522 """Skeleton class for batch system DataGatherer""" 523 524 def printJobs( self, jobs ): 525 526 """Print a jobinfo overview""" 527 528 for name, attrs in self.jobs.items(): 529 530 print 'job %s' %(name) 531 532 for name, val in attrs.items(): 533 534 print '\t%s = %s' %( name, val ) 535 536 def printJob( self, jobs, job_id ): 537 538 """Print job with job_id from jobs""" 539 540 print 'job %s' %(job_id) 541 542 for name, val in jobs[ job_id ].items(): 543 544 print '\t%s = %s' %( name, val ) 545 546 def getAttr( self, attrs, name ): 547 548 """Return certain attribute from dictionary, if exists""" 549 550 if attrs.has_key( name ): 551 552 return attrs[ name ] 553 else: 554 return '' 555 556 def jobDataChanged( self, jobs, job_id, attrs ): 557 558 """Check if job with attrs and job_id in jobs has changed""" 559 560 if jobs.has_key( job_id ): 561 562 oldData = jobs[ job_id ] 563 else: 564 return 1 565 566 for name, val in attrs.items(): 567 568 if oldData.has_key( name ): 569 570 if oldData[ name ] != attrs[ name ]: 571 572 return 1 573 574 else: 575 return 1 576 577 return 0 578 579 def submitJobData( self ): 580 581 """Submit job info list""" 582 583 global BATCH_API 584 585 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 586 587 running_jobs = 0 588 queued_jobs = 0 589 590 # Count how many running/queued jobs we found 522 """Skeleton class for batch system DataGatherer""" 523 524 def printJobs( self, jobs ): 525 526 """Print a jobinfo overview""" 527 528 for name, attrs in self.jobs.items(): 529 530 print 'job %s' %(name) 531 532 for name, val in attrs.items(): 533 534 print '\t%s = %s' %( name, val ) 535 536 def printJob( self, jobs, job_id ): 537 538 """Print job with job_id from jobs""" 539 540 print 'job %s' %(job_id) 541 542 for name, val in jobs[ job_id ].items(): 543 544 print '\t%s = %s' %( name, val ) 545 546 def getAttr( self, attrs, name ): 547 548 """Return certain attribute from dictionary, if exists""" 549 550 if attrs.has_key( name ): 551 552 return attrs[ name ] 553 else: 554 return '' 555 556 def jobDataChanged( self, jobs, job_id, attrs ): 557 558 """Check if job with attrs and job_id in jobs has changed""" 559 560 if jobs.has_key( job_id ): 561 562 oldData = jobs[ job_id ] 563 else: 564 return 1 565 566 for name, val in attrs.items(): 567 568 if oldData.has_key( name ): 569 570 if oldData[ name ] != attrs[ name ]: 571 572 return 1 573 574 else: 575 return 1 576 577 return 0 578 579 def submitJobData( self ): 580 581 """Submit job info list""" 582 583 global BATCH_API 584 585 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 586 587 running_jobs = 0 588 queued_jobs = 0 589 590 # Count how many running/queued jobs we found 591 # 592 for jobid, jobattrs in self.jobs.items(): 593 594 if jobattrs[ 'status' ] == 'Q': 595 596 queued_jobs += 1 597 598 elif jobattrs[ 'status' ] == 'R': 599 600 running_jobs += 1 601 602 # Report running/queued jobs as seperate metric for a nice RRD graph 603 # 604 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' ) 605 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' ) 606 607 # Report down/offline nodes in batch (PBS only ATM) 608 # 609 if BATCH_API == 'pbs': 610 611 domain = fqdn_parts( socket.getfqdn() )[1] 612 613 downed_nodes = list() 614 offline_nodes = list() 615 616 l = ['state'] 617 618 for name, node in self.pq.getnodes().items(): 619 620 if ( node[ 'state' ].find( "down" ) != -1 ): 621 622 downed_nodes.append( name ) 623 624 if ( node[ 'state' ].find( "offline" ) != -1 ): 625 626 offline_nodes.append( name ) 627 628 downnodeslist = do_nodelist( downed_nodes ) 629 offlinenodeslist = do_nodelist( offline_nodes ) 630 631 down_str = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 632 offl_str = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 633 self.dp.multicastGmetric( 'MONARCH-DOWN' , down_str ) 634 self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str ) 635 636 # Now let's spread the knowledge 637 # 638 for jobid, jobattrs in self.jobs.items(): 639 640 # Make gmetric values for each job: respect max gmetric value length 641 # 642 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 643 metric_increment = 0 644 645 # If we have more job info than max gmetric value length allows, split it up 646 # amongst multiple metrics 647 # 648 for val in gmetric_val: 649 650 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 651 652 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric 591 653 # 592 for jobid, jobattrs in self.jobs.items(): 593 594 if jobattrs[ 'status' ] == 'Q': 595 596 queued_jobs += 1 597 598 elif jobattrs[ 'status' ] == 'R': 599 600 running_jobs += 1 601 602 # Report running/queued jobs as seperate metric for a nice RRD graph 654 metric_increment = metric_increment + 1 655 656 def compileGmetricVal( self, jobid, jobattrs ): 657 658 """Create a val string for gmetric of jobinfo""" 659 660 gval_lists = [ ] 661 val_list = { } 662 663 for val_name, val_value in jobattrs.items(): 664 665 # These are our own metric names, i.e.: status, start_timestamp, etc 666 # 667 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 668 669 # These are their corresponding values 670 # 671 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 672 673 if val_name == 'nodes' and jobattrs['status'] == 'R': 674 675 node_str = None 676 677 for node in val_value: 678 679 if node_str: 680 681 node_str = node_str + ';' + node 682 else: 683 node_str = node 684 685 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 686 # 687 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 688 689 # It's too big, we need to make a new gmetric for the additional info 690 # 691 val_list[ val_name ] = node_str 692 693 gval_lists.append( val_list ) 694 695 val_list = { } 696 node_str = None 697 698 val_list[ val_name ] = node_str 699 700 gval_lists.append( val_list ) 701 702 val_list = { } 703 704 elif val_value != '': 705 706 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 603 707 # 604 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' ) 605 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' ) 606 607 # Report down/offline nodes in batch (PBS only ATM) 608 # 609 if BATCH_API == 'pbs': 610 611 domain = fqdn_parts( socket.getfqdn() )[1] 612 613 downed_nodes = list() 614 offline_nodes = list() 615 616 l = ['state'] 617 618 for name, node in self.pq.getnodes().items(): 619 620 if ( node[ 'state' ].find( "down" ) != -1 ): 621 622 downed_nodes.append( name ) 623 624 if ( node[ 'state' ].find( "offline" ) != -1 ): 625 626 offline_nodes.append( name ) 627 628 downnodeslist = do_nodelist( downed_nodes ) 629 offlinenodeslist = do_nodelist( offline_nodes ) 630 631 down_str = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 632 offl_str = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 633 self.dp.multicastGmetric( 'MONARCH-DOWN' , down_str ) 634 self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str ) 635 636 # Now let's spread the knowledge 637 # 638 for jobid, jobattrs in self.jobs.items(): 639 640 # Make gmetric values for each job: respect max gmetric value length 641 # 642 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 643 metric_increment = 0 644 645 # If we have more job info than max gmetric value length allows, split it up 646 # amongst multiple metrics 647 # 648 for val in gmetric_val: 649 650 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 651 652 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric 653 # 654 metric_increment = metric_increment + 1 655 656 def compileGmetricVal( self, jobid, jobattrs ): 657 658 """Create a val string for gmetric of jobinfo""" 659 660 gval_lists = [ ] 661 val_list = { } 662 663 for val_name, val_value in jobattrs.items(): 664 665 # These are our own metric names, i.e.: status, start_timestamp, etc 666 # 667 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 668 669 # These are their corresponding values 670 # 671 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 672 673 if val_name == 'nodes' and jobattrs['status'] == 'R': 674 675 node_str = None 676 677 for node in val_value: 678 679 if node_str: 680 681 node_str = node_str + ';' + node 682 else: 683 node_str = node 684 685 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 686 # 687 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 688 689 # It's too big, we need to make a new gmetric for the additional info 690 # 691 val_list[ val_name ] = node_str 692 693 gval_lists.append( val_list ) 694 695 val_list = { } 696 node_str = None 697 698 val_list[ val_name ] = node_str 699 700 gval_lists.append( val_list ) 701 702 val_list = { } 703 704 elif val_value != '': 705 706 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 707 # 708 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 709 710 # It's too big, we need to make a new gmetric for the additional info 711 # 712 gval_lists.append( val_list ) 713 714 val_list = { } 715 716 val_list[ val_name ] = val_value 717 718 if len( val_list ) > 0: 719 720 gval_lists.append( val_list ) 721 722 str_list = [ ] 723 724 # Now append the value names and values together, i.e.: stop_timestamp=value, etc 725 # 726 for val_list in gval_lists: 727 728 my_val_str = None 729 730 for val_name, val_value in val_list.items(): 731 732 if type(val_value) == list: 733 734 val_value = val_value.join( ',' ) 735 736 if my_val_str: 737 738 try: 739 # fixme: It's getting 740 # ('nodes', None) items 741 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 742 except: 743 pass 744 745 else: 746 my_val_str = val_name + '=' + val_value 747 748 str_list.append( my_val_str ) 749 750 return str_list 751 752 def daemon( self ): 753 754 """Run as daemon forever""" 755 756 # Fork the first child 757 # 758 pid = os.fork() 759 if pid > 0: 760 sys.exit(0) # end parent 761 762 # creates a session and sets the process group ID 763 # 764 os.setsid() 765 766 # Fork the second child 767 # 768 pid = os.fork() 769 if pid > 0: 770 sys.exit(0) # end parent 771 772 write_pidfile() 773 774 # Go to the root directory and set the umask 775 # 776 os.chdir('/') 777 os.umask(0) 778 779 sys.stdin.close() 780 sys.stdout.close() 781 sys.stderr.close() 782 783 os.open('/dev/null', os.O_RDWR) 784 os.dup2(0, 1) 785 os.dup2(0, 2) 786 787 self.run() 788 789 def run( self ): 790 791 """Main thread""" 792 793 while ( 1 ): 794 795 self.getJobData() 796 self.submitJobData() 797 time.sleep( BATCH_POLL_INTERVAL ) 708 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 709 710 # It's too big, we need to make a new gmetric for the additional info 711 # 712 gval_lists.append( val_list ) 713 714 val_list = { } 715 716 val_list[ val_name ] = val_value 717 718 if len( val_list ) > 0: 719 720 gval_lists.append( val_list ) 721 722 str_list = [ ] 723 724 # Now append the value names and values together, i.e.: stop_timestamp=value, etc 725 # 726 for val_list in gval_lists: 727 728 my_val_str = None 729 730 for val_name, val_value in val_list.items(): 731 732 if type(val_value) == list: 733 734 val_value = val_value.join( ',' ) 735 736 if my_val_str: 737 738 try: 739 # fixme: It's getting 740 # ('nodes', None) items 741 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 742 except: 743 pass 744 745 else: 746 my_val_str = val_name + '=' + val_value 747 748 str_list.append( my_val_str ) 749 750 return str_list 751 752 def daemon( self ): 753 754 """Run as daemon forever""" 755 756 # Fork the first child 757 # 758 pid = os.fork() 759 if pid > 0: 760 sys.exit(0) # end parent 761 762 # creates a session and sets the process group ID 763 # 764 os.setsid() 765 766 # Fork the second child 767 # 768 pid = os.fork() 769 if pid > 0: 770 sys.exit(0) # end parent 771 772 write_pidfile() 773 774 # Go to the root directory and set the umask 775 # 776 os.chdir('/') 777 os.umask(0) 778 779 sys.stdin.close() 780 sys.stdout.close() 781 sys.stderr.close() 782 783 os.open('/dev/null', os.O_RDWR) 784 os.dup2(0, 1) 785 os.dup2(0, 2) 786 787 self.run() 788 789 def run( self ): 790 791 """Main thread""" 792 793 while ( 1 ): 794 795 self.getJobData() 796 self.submitJobData() 797 time.sleep( BATCH_POLL_INTERVAL ) 798 798 799 799 # SGE code by Dave Love <fx@gnu.org>. Tested with SGE 6.0u8 and 6.0u11. May … … 802 802 803 803 class NoJobs (Exception): 804 805 804 """Exception raised by empty job list in qstat output.""" 805 pass 806 806 807 807 class SgeQstatXMLParser(xml.sax.handler.ContentHandler): 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 #<ulong_sublist>831 # ...<!-- start_time, state ... -->832 #</ulong_sublist>833 834 835 #<ulong_sublist>836 # ...<!-- task info837 #</ulong_sublist>838 #...839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 if name == "djob_info":# job list862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 #define JIDLE0x00000000915 #define JHELD0x00000010916 #define JMIGRATING0x00000020917 #define JQUEUED0x00000040918 #define JRUNNING0x00000080919 #define JSUSPENDED0x00000100920 #define JTRANSFERING0x00000200921 #define JDELETED0x00000400922 #define JWAITING0x00000800923 #define JEXITING0x00001000924 #define JWRITTEN0x00002000925 926 #define JFINISHED0x00010000927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 808 """SAX handler for XML output from Sun Grid Engine's `qstat'.""" 809 810 def __init__(self): 811 self.value = "" 812 self.joblist = [] 813 self.job = {} 814 self.queue = "" 815 self.in_joblist = False 816 self.lrequest = False 817 self.eltq = deque() 818 xml.sax.handler.ContentHandler.__init__(self) 819 820 # The structure of the output is as follows (for SGE 6.0). It's 821 # similar for 6.1, but radically different for 6.2, and is 822 # undocumented generally. Unfortunately it's voluminous, and probably 823 # doesn't scale to large clusters/queues. 824 825 # <detailed_job_info xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 826 # <djob_info> 827 # <qmaster_response> <!-- job --> 828 # ... 829 # <JB_ja_template> 830 # <ulong_sublist> 831 # ... <!-- start_time, state ... --> 832 # </ulong_sublist> 833 # </JB_ja_template> 834 # <JB_ja_tasks> 835 # <ulong_sublist> 836 # ... <!-- task info 837 # </ulong_sublist> 838 # ... 839 # </JB_ja_tasks> 840 # ... 841 # </qmaster_response> 842 # </djob_info> 843 # <messages> 844 # ... 845 846 # NB. We might treat each task as a separate job, like 847 # straight qstat output, but the web interface expects jobs to 848 # be identified by integers, not, say, <job number>.<task>. 849 850 # So, I lied. If the job list is empty, we get invalid XML 851 # like this, which we need to defend against: 852 853 # <unknown_jobs xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 854 # <> 855 # <ST_name>*</ST_name> 856 # </> 857 # </unknown_jobs> 858 859 def startElement(self, name, attrs): 860 self.value = "" 861 if name == "djob_info": # job list 862 self.in_joblist = True 863 # The job container is "qmaster_response" in SGE 6.0 864 # and 6.1, but "element" in 6.2. This is only the very 865 # start of what's necessary for 6.2, though (sigh). 866 elif (name == "qmaster_response" or name == "element") \ 867 and self.eltq[-1] == "djob_info": # job 868 self.job = {"job_state": "U", "slots": 0, 869 "nodes": [], "queued_timestamp": "", 870 "queued_timestamp": "", "queue": "", 871 "ppn": "0", "RN_max": 0, 872 # fixme in endElement 873 "requested_memory": 0, "requested_time": 0 874 } 875 self.joblist.append(self.job) 876 elif name == "qstat_l_requests": # resource request 877 self.lrequest = True 878 elif name == "unknown_jobs": 879 raise NoJobs 880 self.eltq.append (name) 881 882 def characters(self, ch): 883 self.value += ch 884 885 def endElement(self, name): 886 """Snarf job elements contents into job dictionary. 887 Translate keys if appropriate.""" 888 889 name_trans = { 890 "JB_job_number": "number", 891 "JB_job_name": "name", "JB_owner": "owner", 892 "queue_name": "queue", "JAT_start_time": "start_timestamp", 893 "JB_submission_time": "queued_timestamp" 894 } 895 value = self.value 896 self.eltq.pop () 897 898 if name == "djob_info": 899 self.in_joblist = False 900 self.job = {} 901 elif name == "JAT_master_queue": 902 self.job["queue"] = value.split("@")[0] 903 elif name == "JG_qhostname": 904 if not (value in self.job["nodes"]): 905 self.job["nodes"].append(value) 906 elif name == "JG_slots": # slots in use 907 self.job["slots"] += int(value) 908 elif name == "RN_max": # requested slots (tasks or parallel) 909 self.job["RN_max"] = max (self.job["RN_max"], 910 int(value)) 911 elif name == "JAT_state": # job state (bitwise or) 912 value = int (value) 913 # Status values from sge_jobL.h 914 #define JIDLE 0x00000000 915 #define JHELD 0x00000010 916 #define JMIGRATING 0x00000020 917 #define JQUEUED 0x00000040 918 #define JRUNNING 0x00000080 919 #define JSUSPENDED 0x00000100 920 #define JTRANSFERING 0x00000200 921 #define JDELETED 0x00000400 922 #define JWAITING 0x00000800 923 #define JEXITING 0x00001000 924 #define JWRITTEN 0x00002000 925 #define JSUSPENDED_ON_THRESHOLD 0x00010000 926 #define JFINISHED 0x00010000 927 if value & 0x80: 928 self.job["status"] = "R" 929 elif value & 0x40: 930 self.job["status"] = "Q" 931 else: 932 self.job["status"] = "O" # `other' 933 elif name == "CE_name" and self.lrequest and self.value in \ 934 ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"): 935 # We're in a container for an interesting resource 936 # request; record which type. 937 self.lrequest = self.value 938 elif name == "CE_doubleval" and self.lrequest: 939 # if we're in a container for an interesting 940 # resource request, use the maxmimum of the hard 941 # and soft requests to record the requested CPU 942 # or core. Fixme: I'm not sure if this logic is 943 # right. 944 if self.lrequest in ("h_core", "s_core"): 945 self.job["requested_memory"] = \ 946 max (float (value), 947 self.job["requested_memory"]) 948 # Fixme: Check what cpu means, c.f [hs]_cpu. 949 elif self.lrequest in ("h_cpu", "s_cpu", "cpu"): 950 self.job["requested_time"] = \ 951 max (float (value), 952 self.job["requested_time"]) 953 elif name == "qstat_l_requests": 954 self.lrequest = False 955 elif self.job and self.in_joblist: 956 if name in name_trans: 957 name = name_trans[name] 958 self.job[name] = value 959 959 960 960 # Abstracted from PBS original. … … 963 963 def do_nodelist( nodes ): 964 964 965 966 967 nodeslist= [ ]968 my_domain= fqdn_parts( socket.getfqdn() )[1]969 970 971 972 host= node.split( '/' )[0] # not relevant for SGE973 h, host_domain= fqdn_parts(host)974 975 976 977 host= h978 979 980 981 982 983 984 985 translate_orig= \986 987 translate_new= \988 989 990 991 992 993 965 """Translate node list as appropriate.""" 966 967 nodeslist = [ ] 968 my_domain = fqdn_parts( socket.getfqdn() )[1] 969 970 for node in nodes: 971 972 host = node.split( '/' )[0] # not relevant for SGE 973 h, host_domain = fqdn_parts(host) 974 975 if host_domain == my_domain: 976 977 host = h 978 979 if nodeslist.count( host ) == 0: 980 981 for translate_pattern in BATCH_HOST_TRANSLATE: 982 983 if translate_pattern.find( '/' ) != -1: 984 985 translate_orig = \ 986 translate_pattern.split( '/' )[1] 987 translate_new = \ 988 translate_pattern.split( '/' )[2] 989 host = re.sub( translate_orig, 990 translate_new, host ) 991 if not host in nodeslist: 992 nodeslist.append( host ) 993 return nodeslist 994 994 995 995 class SgeDataGatherer(DataGatherer): 996 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 if QUEUE:# only for specific queues1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 997 jobs = {} 998 999 def __init__( self ): 1000 self.jobs = {} 1001 self.timeoffset = 0 1002 self.dp = DataProcessor() 1003 1004 def getJobData( self ): 1005 """Gather all data on current jobs in SGE""" 1006 1007 import popen2 1008 1009 self.cur_time = 0 1010 queues = "" 1011 if QUEUE: # only for specific queues 1012 # Fixme: assumes queue names don't contain single 1013 # quote or comma. Don't know what the SGE rules are. 1014 queues = " -q '" + string.join (QUEUE, ",") + "'" 1015 # Note the comment in SgeQstatXMLParser about scaling with 1016 # this method of getting data. I haven't found better one. 1017 # Output with args `-xml -ext -f -r' is easier to parse 1018 # in some ways, harder in others, but it doesn't provide 1019 # the submission time (at least SGE 6.0). The pipeline 1020 # into sed corrects bogus XML observed with a configuration 1021 # of SGE 6.0u8, which otherwise causes the parsing to hang. 1022 piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \ 1023 1023 sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \ 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 self.timeoffset= \1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1024 + queues, True) 1025 qstatparser = SgeQstatXMLParser() 1026 parse_err = 0 1027 try: 1028 xml.sax.parse(piping.fromchild, qstatparser) 1029 except NoJobs: 1030 pass 1031 except: 1032 parse_err = 1 1033 if piping.wait(): 1034 debug_msg(10, 1035 "qstat error, skipping until next polling interval: " 1036 + piping.childerr.readline()) 1037 return None 1038 elif parse_err: 1039 debug_msg(10, "Bad XML output from qstat"()) 1040 exit (1) 1041 for f in piping.fromchild, piping.tochild, piping.childerr: 1042 f.close() 1043 self.cur_time = time.time() 1044 jobs_processed = [] 1045 for job in qstatparser.joblist: 1046 job_id = job["number"] 1047 if job["status"] in [ 'Q', 'R' ]: 1048 jobs_processed.append(job_id) 1049 if job["status"] == "R": 1050 job["nodes"] = do_nodelist (job["nodes"]) 1051 # Fixme: why is job["nodes"] sometimes null? 1052 try: 1053 # Fixme: Is this sensible? The 1054 # PBS-type PPN isn't something you use 1055 # with SGE. 1056 job["ppn"] = float(job["slots"]) / \ 1057 len(job["nodes"]) 1058 except: 1059 job["ppn"] = 0 1060 if DETECT_TIME_DIFFS: 1061 # If a job start is later than our 1062 # current date, that must mean 1063 # the SGE server's time is later 1064 # than our local time. 1065 start_timestamp = \ 1066 int (job["start_timestamp"]) 1067 if start_timestamp > \ 1068 int(self.cur_time) + \ 1069 int(self.timeoffset): 1070 1071 self.timeoffset = \ 1072 start_timestamp - \ 1073 int(self.cur_time) 1074 else: 1075 # fixme: Note sure what this should be: 1076 job["ppn"] = job["RN_max"] 1077 job["nodes"] = "1" 1078 1079 myAttrs = {} 1080 for attr in ["name", "queue", "owner", 1081 "requested_time", "status", 1082 "requested_memory", "ppn", 1083 "start_timestamp", "queued_timestamp"]: 1084 myAttrs[attr] = str(job[attr]) 1085 myAttrs["nodes"] = job["nodes"] 1086 myAttrs["reported"] = str(int(self.cur_time) + \ 1087 int(self.timeoffset)) 1088 myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1] 1089 myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL) 1090 1091 if self.jobDataChanged(self.jobs, job_id, myAttrs) \ 1092 and myAttrs["status"] in ["R", "Q"]: 1093 self.jobs[job_id] = myAttrs 1094 for id, attrs in self.jobs.items(): 1095 if id not in jobs_processed: 1096 del self.jobs[id] 1097 1097 1098 1098 # LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt> … … 1101 1101 class LsfDataGatherer(DataGatherer): 1102 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 countDupes= { }1117 1118 1119 1120 1121 1122 countDupes[ item ]= 11123 1124 countDupes[ item ]= countDupes[ item ] + 11125 1126 dupeCountList= [ ]1127 1128 1129 1130 1131 1132 1103 """This is the DataGatherer for LSf""" 1104 1105 global lsfObject 1106 1107 def __init__( self ): 1108 1109 self.jobs = { } 1110 self.timeoffset = 0 1111 self.dp = DataProcessor() 1112 self.initLsfQuery() 1113 1114 def _countDuplicatesInList( self, dupedList ): 1115 1116 countDupes = { } 1117 1118 for item in dupedList: 1119 1120 if not countDupes.has_key( item ): 1121 1122 countDupes[ item ] = 1 1123 else: 1124 countDupes[ item ] = countDupes[ item ] + 1 1125 1126 dupeCountList = [ ] 1127 1128 for item, count in countDupes.items(): 1129 1130 dupeCountList.append( ( item, count ) ) 1131 1132 return dupeCountList 1133 1133 # 1134 1134 #lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7'] … … 1137 1137 ######################## 1138 1138 1139 def initLsfQuery( self ): 1140 self.pq = None 1141 self.pq = lsfObject.jobInfoEntObject() 1142 1143 def getJobData( self, known_jobs="" ): 1144 """Gather all data on current jobs in LSF""" 1145 if len( known_jobs ) > 0: 1146 jobs = known_jobs 1147 else: 1148 jobs = { } 1149 joblist = {} 1150 joblist = self.pq.getJobInfo() 1139 def initLsfQuery( self ): 1140 self.pq = None 1141 self.pq = lsfObject.jobInfoEntObject() 1142 1143 def getJobData( self, known_jobs="" ): 1144 """Gather all data on current jobs in LSF""" 1145 if len( known_jobs ) > 0: 1146 jobs = known_jobs 1147 else: 1148 jobs = { } 1149 joblist = {} 1150 joblist = self.pq.getJobInfo() 1151 nodelist = '' 1152 1153 self.cur_time = time.time() 1154 1155 jobs_processed = [ ] 1156 1157 for name, attrs in joblist.items(): 1158 job_id = str(name) 1159 jobs_processed.append( job_id ) 1160 name = self.getAttr( attrs, 'jobName' ) 1161 queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' ) 1162 owner = self.getAttr( attrs, 'user' ) 1163 1164 ### THIS IS THE rLimit List index values 1165 #define LSF_RLIMIT_CPU 0 /* cpu time in milliseconds */ 1166 #define LSF_RLIMIT_FSIZE 1 /* maximum file size */ 1167 #define LSF_RLIMIT_DATA 2 /* data size */ 1168 #define LSF_RLIMIT_STACK 3 /* stack size */ 1169 #define LSF_RLIMIT_CORE 4 /* core file size */ 1170 #define LSF_RLIMIT_RSS 5 /* resident set size */ 1171 #define LSF_RLIMIT_NOFILE 6 /* open files */ 1172 #define LSF_RLIMIT_OPEN_MAX 7 /* (from HP-UX) */ 1173 #define LSF_RLIMIT_VMEM 8 /* maximum swap mem */ 1174 #define LSF_RLIMIT_SWAP 8 1175 #define LSF_RLIMIT_RUN 9 /* max wall-clock time limit */ 1176 #define LSF_RLIMIT_PROCESS 10 /* process number limit */ 1177 #define LSF_RLIMIT_THREAD 11 /* thread number limit (introduced in LSF6.0) */ 1178 #define LSF_RLIM_NLIMITS 12 /* number of resource limits */ 1179 1180 requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9] 1181 if requested_time == -1: 1182 requested_time = "" 1183 requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8] 1184 if requested_memory == -1: 1185 requested_memory = "" 1186 # This tries to get proc per node. We don't support this right now 1187 ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' ) 1188 requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' ) 1189 if requested_cpus == None or requested_cpus == "": 1190 requested_cpus = 1 1191 1192 if QUEUE: 1193 for q in QUEUE: 1194 if q == queue: 1195 display_queue = 1 1196 break 1197 else: 1198 display_queue = 0 1199 continue 1200 if display_queue == 0: 1201 continue 1202 1203 runState = self.getAttr( attrs, 'status' ) 1204 if runState == 4: 1205 status = 'R' 1206 else: 1207 status = 'Q' 1208 queued_timestamp = self.getAttr( attrs, 'submitTime' ) 1209 1210 if status == 'R': 1211 start_timestamp = self.getAttr( attrs, 'startTime' ) 1212 nodesCpu = dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' ))) 1213 nodelist = nodesCpu.keys() 1214 1215 if DETECT_TIME_DIFFS: 1216 1217 # If a job start if later than our current date, 1218 # that must mean the Torque server's time is later 1219 # than our local time. 1220 1221 if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ): 1222 1223 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 1224 1225 elif status == 'Q': 1226 start_timestamp = '' 1227 count_mynodes = 0 1228 numeric_node = 1 1151 1229 nodelist = '' 1152 1230 1153 self.cur_time = time.time() 1154 1155 jobs_processed = [ ] 1156 1157 for name, attrs in joblist.items(): 1158 job_id = str(name) 1159 jobs_processed.append( job_id ) 1160 name = self.getAttr( attrs, 'jobName' ) 1161 queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' ) 1162 owner = self.getAttr( attrs, 'user' ) 1163 1164 ### THIS IS THE rLimit List index values 1165 #define LSF_RLIMIT_CPU 0 /* cpu time in milliseconds */ 1166 #define LSF_RLIMIT_FSIZE 1 /* maximum file size */ 1167 #define LSF_RLIMIT_DATA 2 /* data size */ 1168 #define LSF_RLIMIT_STACK 3 /* stack size */ 1169 #define LSF_RLIMIT_CORE 4 /* core file size */ 1170 #define LSF_RLIMIT_RSS 5 /* resident set size */ 1171 #define LSF_RLIMIT_NOFILE 6 /* open files */ 1172 #define LSF_RLIMIT_OPEN_MAX 7 /* (from HP-UX) */ 1173 #define LSF_RLIMIT_VMEM 8 /* maximum swap mem */ 1174 #define LSF_RLIMIT_SWAP 8 1175 #define LSF_RLIMIT_RUN 9 /* max wall-clock time limit */ 1176 #define LSF_RLIMIT_PROCESS 10 /* process number limit */ 1177 #define LSF_RLIMIT_THREAD 11 /* thread number limit (introduced in LSF6.0) */ 1178 #define LSF_RLIM_NLIMITS 12 /* number of resource limits */ 1179 1180 requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9] 1181 if requested_time == -1: 1182 requested_time = "" 1183 requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8] 1184 if requested_memory == -1: 1185 requested_memory = "" 1186 # This tries to get proc per node. We don't support this right now 1187 ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' ) 1188 requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' ) 1189 if requested_cpus == None or requested_cpus == "": 1190 requested_cpus = 1 1191 1192 if QUEUE: 1193 for q in QUEUE: 1194 if q == queue: 1195 display_queue = 1 1196 break 1197 else: 1198 display_queue = 0 1199 continue 1200 if display_queue == 0: 1201 continue 1202 1203 runState = self.getAttr( attrs, 'status' ) 1204 if runState == 4: 1205 status = 'R' 1206 else: 1207 status = 'Q' 1208 queued_timestamp = self.getAttr( attrs, 'submitTime' ) 1209 1210 if status == 'R': 1211 start_timestamp = self.getAttr( attrs, 'startTime' ) 1212 nodesCpu = dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' ))) 1213 nodelist = nodesCpu.keys() 1214 1215 if DETECT_TIME_DIFFS: 1216 1217 # If a job start if later than our current date, 1218 # that must mean the Torque server's time is later 1219 # than our local time. 1220 1221 if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ): 1222 1223 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 1224 1225 elif status == 'Q': 1226 start_timestamp = '' 1227 count_mynodes = 0 1228 numeric_node = 1 1229 nodelist = '' 1230 1231 myAttrs = { } 1232 if name == "": 1233 myAttrs['name'] = "none" 1234 else: 1235 myAttrs['name'] = name 1236 1237 myAttrs[ 'owner' ] = owner 1238 myAttrs[ 'requested_time' ] = str(requested_time) 1239 myAttrs[ 'requested_memory' ] = str(requested_memory) 1240 myAttrs[ 'requested_cpus' ] = str(requested_cpus) 1241 myAttrs[ 'ppn' ] = str( ppn ) 1242 myAttrs[ 'status' ] = status 1243 myAttrs[ 'start_timestamp' ] = str(start_timestamp) 1244 myAttrs[ 'queue' ] = str(queue) 1245 myAttrs[ 'queued_timestamp' ] = str(queued_timestamp) 1246 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1247 myAttrs[ 'nodes' ] = do_nodelist( nodelist ) 1248 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1249 myAttrs[ 'poll_interval' ] = str(BATCH_POLL_INTERVAL) 1250 1251 if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 1252 jobs[ job_id ] = myAttrs 1253 1254 debug_msg( 10, printTime() + ' job %s state changed' %(job_id) ) 1255 1256 for id, attrs in jobs.items(): 1257 if id not in jobs_processed: 1258 # This one isn't there anymore 1259 # 1260 del jobs[ id ] 1261 self.jobs=jobs 1231 myAttrs = { } 1232 if name == "": 1233 myAttrs['name'] = "none" 1234 else: 1235 myAttrs['name'] = name 1236 1237 myAttrs[ 'owner' ] = owner 1238 myAttrs[ 'requested_time' ] = str(requested_time) 1239 myAttrs[ 'requested_memory' ] = str(requested_memory) 1240 myAttrs[ 'requested_cpus' ] = str(requested_cpus) 1241 myAttrs[ 'ppn' ] = str( ppn ) 1242 myAttrs[ 'status' ] = status 1243 myAttrs[ 'start_timestamp' ] = str(start_timestamp) 1244 myAttrs[ 'queue' ] = str(queue) 1245 myAttrs[ 'queued_timestamp' ] = str(queued_timestamp) 1246 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1247 myAttrs[ 'nodes' ] = do_nodelist( nodelist ) 1248 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1249 myAttrs[ 'poll_interval' ] = str(BATCH_POLL_INTERVAL) 1250 1251 if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 1252 jobs[ job_id ] = myAttrs 1253 1254 debug_msg( 10, printTime() + ' job %s state changed' %(job_id) ) 1255 1256 for id, attrs in jobs.items(): 1257 if id not in jobs_processed: 1258 # This one isn't there anymore 1259 # 1260 del jobs[ id ] 1261 self.jobs=jobs 1262 1262 1263 1263 1264 1264 class PbsDataGatherer( DataGatherer ): 1265 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 self.jobs= { }1275 self.timeoffset= 01276 self.dp= DataProcessor()1277 1278 1279 1280 1281 1282 self.pq= None1283 1284 1285 1286 self.pq= PBSQuery( BATCH_SERVER )1287 1288 self.pq= PBSQuery()1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 joblist= {}1304 self.cur_time= 01305 1306 1307 joblist= self.pq.getjobs()1308 self.cur_time= time.time()1309 1310 1311 1312 1313 1314 1315 jobs_processed= [ ]1316 1317 1318 display_queue= 11319 job_id= name.split( '.' )[0]1320 1321 name= self.getAttr( attrs, 'Job_Name' )1322 queue= self.getAttr( attrs, 'queue' )1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 owner= self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]1337 requested_time= self.getAttr( attrs, 'Resource_List.walltime' )1338 requested_memory= self.getAttr( attrs, 'Resource_List.mem' )1339 1340 mynoderequest= self.getAttr( attrs, 'Resource_List.nodes' )1341 1342 ppn= ''1343 1344 1345 1346 mynoderequest_fields= mynoderequest.split( ':' )1347 1348 1349 1350 1351 1352 ppn= mynoderequest_field.split( 'ppn=' )[1]1353 1354 status= self.getAttr( attrs, 'job_state' )1355 1356 1357 1358 1359 1360 queued_timestamp= self.getAttr( attrs, 'ctime' )1361 1362 1363 1364 start_timestamp= self.getAttr( attrs, 'mtime' )1365 nodes= self.getAttr( attrs, 'exec_host' ).split( '+' )1366 1367 nodeslist= do_nodelist( nodes )1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 self.timeoffset= int( int(start_timestamp) - int(self.cur_time) )1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 start_timestamp= ''1394 count_mynodes= 01395 1396 1397 1398 1399 1400 nodepart= node.split( ':' )[0]1401 1402 1403 1404 numeric_node= 11405 1406 1407 1408 1409 1410 1411 1412 1413 1414 numeric_node= 01415 1416 1417 1418 1419 1420 count_mynodes= count_mynodes + 11421 1422 1423 1424 1425 1426 1427 count_mynodes= count_mynodes + int( nodepart )1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 nodeslist= str( count_mynodes )1441 1442 start_timestamp= ''1443 nodeslist= ''1444 1445 myAttrs= { }1446 1447 myAttrs[ 'name' ]= str( name )1448 myAttrs[ 'queue' ]= str( queue )1449 myAttrs[ 'owner' ]= str( owner )1450 myAttrs[ 'requested_time' ]= str( requested_time )1451 myAttrs[ 'requested_memory' ]= str( requested_memory )1452 myAttrs[ 'ppn' ]= str( ppn )1453 myAttrs[ 'status' ]= str( status )1454 myAttrs[ 'start_timestamp' ]= str( start_timestamp )1455 myAttrs[ 'queued_timestamp' ]= str( queued_timestamp )1456 myAttrs[ 'reported' ]= str( int( int( self.cur_time ) + int( self.timeoffset ) ) )1457 myAttrs[ 'nodes' ]= nodeslist1458 myAttrs[ 'domain' ]= fqdn_parts( socket.getfqdn() )[1]1459 myAttrs[ 'poll_interval' ]= str( BATCH_POLL_INTERVAL )1460 1461 1462 1463 self.jobs[ job_id ]= myAttrs1464 1465 1466 1467 1468 1469 1470 1471 1266 """This is the DataGatherer for PBS and Torque""" 1267 1268 global PBSQuery, PBSError 1269 1270 def __init__( self ): 1271 1272 """Setup appropriate variables""" 1273 1274 self.jobs = { } 1275 self.timeoffset = 0 1276 self.dp = DataProcessor() 1277 1278 self.initPbsQuery() 1279 1280 def initPbsQuery( self ): 1281 1282 self.pq = None 1283 1284 if( BATCH_SERVER ): 1285 1286 self.pq = PBSQuery( BATCH_SERVER ) 1287 else: 1288 self.pq = PBSQuery() 1289 1290 try: 1291 self.pq.old_data_structure() 1292 1293 except AttributeError: 1294 1295 # pbs_query is older 1296 # 1297 pass 1298 1299 def getJobData( self ): 1300 1301 """Gather all data on current jobs in Torque""" 1302 1303 joblist = {} 1304 self.cur_time = 0 1305 1306 try: 1307 joblist = self.pq.getjobs() 1308 self.cur_time = time.time() 1309 1310 except PBSError, detail: 1311 1312 debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) ) 1313 return None 1314 1315 jobs_processed = [ ] 1316 1317 for name, attrs in joblist.items(): 1318 display_queue = 1 1319 job_id = name.split( '.' )[0] 1320 1321 name = self.getAttr( attrs, 'Job_Name' ) 1322 queue = self.getAttr( attrs, 'queue' ) 1323 1324 if QUEUE: 1325 for q in QUEUE: 1326 if q == queue: 1327 display_queue = 1 1328 break 1329 else: 1330 display_queue = 0 1331 continue 1332 if display_queue == 0: 1333 continue 1334 1335 1336 owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0] 1337 requested_time = self.getAttr( attrs, 'Resource_List.walltime' ) 1338 requested_memory = self.getAttr( attrs, 'Resource_List.mem' ) 1339 1340 mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' ) 1341 1342 ppn = '' 1343 1344 if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1: 1345 1346 mynoderequest_fields = mynoderequest.split( ':' ) 1347 1348 for mynoderequest_field in mynoderequest_fields: 1349 1350 if mynoderequest_field.find( 'ppn' ) != -1: 1351 1352 ppn = mynoderequest_field.split( 'ppn=' )[1] 1353 1354 status = self.getAttr( attrs, 'job_state' ) 1355 1356 if status in [ 'Q', 'R' ]: 1357 1358 jobs_processed.append( job_id ) 1359 1360 queued_timestamp = self.getAttr( attrs, 'ctime' ) 1361 1362 if status == 'R': 1363 1364 start_timestamp = self.getAttr( attrs, 'mtime' ) 1365 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' ) 1366 1367 nodeslist = do_nodelist( nodes ) 1368 1369 if DETECT_TIME_DIFFS: 1370 1371 # If a job start if later than our current date, 1372 # that must mean the Torque server's time is later 1373 # than our local time. 1374 1375 if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ): 1376 1377 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 1378 1379 elif status == 'Q': 1380 1381 # 'mynodequest' can be a string in the following syntax according to the 1382 # Torque Administator's manual: 1383 # 1384 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...] 1385 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...] 1386 # etc 1387 # 1388 1389 # 1390 # For now we only count the amount of nodes request and ignore properties 1391 # 1392 1393 start_timestamp = '' 1394 count_mynodes = 0 1395 1396 for node in mynoderequest.split( '+' ): 1397 1398 # Just grab the {node_count|hostname} part and ignore properties 1399 # 1400 nodepart = node.split( ':' )[0] 1401 1402 # Let's assume a node_count value 1403 # 1404 numeric_node = 1 1405 1406 # Chop the value up into characters 1407 # 1408 for letter in nodepart: 1409 1410 # If this char is not a digit (0-9), this must be a hostname 1411 # 1412 if letter not in string.digits: 1413 1414 numeric_node = 0 1415 1416 # If this is a hostname, just count this as one (1) node 1417 # 1418 if not numeric_node: 1419 1420 count_mynodes = count_mynodes + 1 1421 else: 1422 1423 # If this a number, it must be the node_count 1424 # and increase our count with it's value 1425 # 1426 try: 1427 count_mynodes = count_mynodes + int( nodepart ) 1428 1429 except ValueError, detail: 1430 1431 # When we arrive here I must be bugged or very confused 1432 # THIS SHOULD NOT HAPPEN! 1433 # 1434 debug_msg( 10, str( detail ) ) 1435 debug_msg( 10, "Encountered weird node in Resources_List?!" ) 1436 debug_msg( 10, 'nodepart = ' + str( nodepart ) ) 1437 debug_msg( 10, 'job = ' + str( name ) ) 1438 debug_msg( 10, 'attrs = ' + str( attrs ) ) 1439 1440 nodeslist = str( count_mynodes ) 1441 else: 1442 start_timestamp = '' 1443 nodeslist = '' 1444 1445 myAttrs = { } 1446 1447 myAttrs[ 'name' ] = str( name ) 1448 myAttrs[ 'queue' ] = str( queue ) 1449 myAttrs[ 'owner' ] = str( owner ) 1450 myAttrs[ 'requested_time' ] = str( requested_time ) 1451 myAttrs[ 'requested_memory' ] = str( requested_memory ) 1452 myAttrs[ 'ppn' ] = str( ppn ) 1453 myAttrs[ 'status' ] = str( status ) 1454 myAttrs[ 'start_timestamp' ] = str( start_timestamp ) 1455 myAttrs[ 'queued_timestamp' ] = str( queued_timestamp ) 1456 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1457 myAttrs[ 'nodes' ] = nodeslist 1458 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1459 myAttrs[ 'poll_interval' ] = str( BATCH_POLL_INTERVAL ) 1460 1461 if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 1462 1463 self.jobs[ job_id ] = myAttrs 1464 1465 for id, attrs in self.jobs.items(): 1466 1467 if id not in jobs_processed: 1468 1469 # This one isn't there anymore; toedeledoki! 1470 # 1471 del self.jobs[ id ] 1472 1472 1473 1473 # … … 1488 1488 GMETRIC_DEFAULT_HOST = '127.0.0.1' 1489 1489 GMETRIC_DEFAULT_PORT = '8649' 1490 GMETRIC_DEFAULT_UNITS 1490 GMETRIC_DEFAULT_UNITS = '' 1491 1491 1492 1492 class Gmetric: 1493 1493 1494 1495 1496 slope= { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }1497 type= ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )1498 protocol= ( 'udp', 'multicast' )1499 1500 1501 1502 1503 1504 1505 self.msg= xdrlib.Packer()1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 ip_fields= ip.split( '.' )1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 units= GMETRIC_DEFAULT_UNITS1541 1542 1543 typestr= GMETRIC_DEFAULT_TYPE1544 1545 msg= self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1494 global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT 1495 1496 slope = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 } 1497 type = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' ) 1498 protocol = ( 'udp', 'multicast' ) 1499 1500 def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ): 1501 1502 global GMETRIC_DEFAULT_TYPE 1503 1504 self.prot = self.checkHostProtocol( host ) 1505 self.msg = xdrlib.Packer() 1506 self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 1507 1508 if self.prot not in self.protocol: 1509 1510 raise ValueError( "Protocol must be one of: " + str( self.protocol ) ) 1511 1512 if self.prot == 'multicast': 1513 1514 # Set multicast options 1515 # 1516 self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 ) 1517 1518 self.hostport = ( host, int( port ) ) 1519 self.slopestr = 'both' 1520 self.tmax = 60 1521 1522 def checkHostProtocol( self, ip ): 1523 1524 """Detect if a ip adress is a multicast address""" 1525 1526 MULTICAST_ADDRESS_MIN = ( "224", "0", "0", "0" ) 1527 MULTICAST_ADDRESS_MAX = ( "239", "255", "255", "255" ) 1528 1529 ip_fields = ip.split( '.' ) 1530 1531 if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX: 1532 1533 return 'multicast' 1534 else: 1535 return 'udp' 1536 1537 def send( self, name, value, dmax, typestr = '', units = '' ): 1538 1539 if len( units ) == 0: 1540 units = GMETRIC_DEFAULT_UNITS 1541 1542 if len( typestr ) == 0: 1543 typestr = GMETRIC_DEFAULT_TYPE 1544 1545 msg = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax ) 1546 1547 return self.socket.sendto( msg, self.hostport ) 1548 1549 def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ): 1550 1551 if slopestr not in self.slope: 1552 1553 raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) ) 1554 1555 if typestr not in self.type: 1556 1557 raise ValueError( "Type must be one of: " + str( self.type ) ) 1558 1559 if len( name ) == 0: 1560 1561 raise ValueError( "Name must be non-empty" ) 1562 1563 self.msg.reset() 1564 self.msg.pack_int( 0 ) 1565 self.msg.pack_string( typestr ) 1566 self.msg.pack_string( name ) 1567 self.msg.pack_string( str( value ) ) 1568 self.msg.pack_string( unitstr ) 1569 self.msg.pack_int( self.slope[ slopestr ] ) 1570 self.msg.pack_uint( int( tmax ) ) 1571 self.msg.pack_uint( int( dmax ) ) 1572 1573 return self.msg.get_buffer() 1574 1574 1575 1575 def printTime( ): 1576 1576 1577 1578 1579 1577 """Print current time/date in human readable format for log/debug""" 1578 1579 return time.strftime("%a, %d %b %Y %H:%M:%S") 1580 1580 1581 1581 def debug_msg( level, msg ): 1582 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1583 """Print msg if at or above current debug level""" 1584 1585 global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL 1586 1587 if (not DAEMONIZE and DEBUG_LEVEL >= level): 1588 sys.stderr.write( msg + '\n' ) 1589 1590 if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level): 1591 syslog.syslog( msg ) 1592 1592 1593 1593 def write_pidfile(): 1594 1594 1595 1596 1597 1598 1599 pid= os.getpid()1600 1601 pidfile= open( PIDFILE, 'w' )1602 1603 1604 1595 # Write pidfile if PIDFILE is set 1596 # 1597 if PIDFILE: 1598 1599 pid = os.getpid() 1600 1601 pidfile = open( PIDFILE, 'w' ) 1602 1603 pidfile.write( str( pid ) ) 1604 pidfile.close() 1605 1605 1606 1606 def main(): 1607 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1608 """Application start""" 1609 1610 global PBSQuery, PBSError, lsfObject 1611 global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE 1612 1613 if not processArgs( sys.argv[1:] ): 1614 1615 sys.exit( 1 ) 1616 1617 # Load appropriate DataGatherer depending on which BATCH_API is set 1618 # and any required modules for the Gatherer 1619 # 1620 if BATCH_API == 'pbs': 1621 1622 try: 1623 from PBSQuery import PBSQuery, PBSError 1624 1625 except ImportError: 1626 1627 debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" ) 1628 sys.exit( 1 ) 1629 1630 gather = PbsDataGatherer() 1631 1632 elif BATCH_API == 'sge': 1633 1634 # Tested with SGE 6.0u11. 1635 # 1636 gather = SgeDataGatherer() 1637 1638 elif BATCH_API == 'lsf': 1639 1640 try: 1641 from lsfObject import lsfObject 1642 except: 1643 debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed") 1644 sys.exit( 1) 1645 1646 gather = LsfDataGatherer() 1647 1648 else: 1649 debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" ) 1650 1651 sys.exit( 1 ) 1652 1653 if( DAEMONIZE and USE_SYSLOG ): 1654 1655 syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY ) 1656 1657 if DAEMONIZE: 1658 1659 gather.daemon() 1660 else: 1661 gather.run() 1662 1662 1663 1663 # wh00t? someone started me! :) 1664 1664 # 1665 1665 if __name__ == '__main__': 1666 1666 main()
Note: See TracChangeset
for help on using the changeset viewer.