Changeset 659 for trunk/jobmond/jobmond.py
- Timestamp:
- 09/03/12 12:08:10 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r622 r659 32 32 def usage( ver ): 33 33 34 35 36 37 38 39 40 41 42 43 44 print 'Usage:jobmond [OPTIONS]'45 46 print ' -c, --config=FILEThe configuration file to use (default: /etc/jobmond.conf)'47 print ' -p, --pidfile=FILEUse pid file to store the process id'48 print ' -h, --helpPrint help and exit'49 50 34 print 'jobmond %s' %VERSION 35 36 if ver: 37 return 0 38 39 print 40 print 'Purpose:' 41 print ' The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics' 42 print ' to Ganglia, which can be viewed with Job Monarch web frontend' 43 print 44 print 'Usage: jobmond [OPTIONS]' 45 print 46 print ' -c, --config=FILE The configuration file to use (default: /etc/jobmond.conf)' 47 print ' -p, --pidfile=FILE Use pid file to store the process id' 48 print ' -h, --help Print help and exit' 49 print ' -v, --version Print version and exit' 50 print 51 51 52 52 def processArgs( args ): 53 53 54 SHORT_L= 'p:hvc:'55 LONG_L= [ 'help', 'config=', 'pidfile=', 'version' ]56 57 58 PIDFILE= None59 60 config_filename= '/etc/jobmond.conf'61 62 63 64 opts, args= getopt.getopt( args, SHORT_L, LONG_L )65 66 67 68 69 70 71 72 73 74 75 76 config_filename= value77 78 79 80 PIDFILE= value81 82 54 SHORT_L = 'p:hvc:' 55 LONG_L = [ 'help', 'config=', 'pidfile=', 'version' ] 56 57 global PIDFILE 58 PIDFILE = None 59 60 config_filename = '/etc/jobmond.conf' 61 62 try: 63 64 opts, args = getopt.getopt( args, SHORT_L, LONG_L ) 65 66 except getopt.GetoptError, detail: 67 68 print detail 69 usage() 70 sys.exit( 1 ) 71 72 for opt, value in opts: 73 74 if opt in [ '--config', '-c' ]: 75 76 config_filename = value 77 78 if opt in [ '--pidfile', '-p' ]: 79 80 PIDFILE = value 81 82 if opt in [ '--help', '-h' ]: 83 83 84 85 86 87 88 89 90 91 92 84 usage( False ) 85 sys.exit( 0 ) 86 87 if opt in [ '--version', '-v' ]: 88 89 usage( True ) 90 sys.exit( 0 ) 91 92 return loadConfig( config_filename ) 93 93 94 94 # Fixme: This doesn't DTRT with commented-out bits of the file. E.g. … … 97 97 class GangliaConfigParser: 98 98 99 100 101 self.config_file= config_file102 103 104 105 106 107 108 109 110 clean_value= value111 clean_value= clean_value.replace( "'", "" )112 clean_value= clean_value.replace( '"', '' )113 clean_value= clean_value.strip()114 115 116 117 118 119 cfg_fp= open( self.config_file )120 section_start= False121 section_found= False122 value= None123 124 125 126 127 128 section_found= True129 130 131 132 section_start= True133 134 135 136 section_start= False137 section_found= False138 139 140 141 value= string.join( line.split( '=' )[1:], '' ).strip()142 143 144 145 146 147 148 149 value= self.getVal( section, valname )150 151 152 153 154 value= self.removeQuotes( value )155 156 157 158 159 160 value= self.getVal( section, valname )161 162 163 164 165 value= self.removeQuotes( value )166 167 99 def __init__( self, config_file ): 100 101 self.config_file = config_file 102 103 if not os.path.exists( self.config_file ): 104 105 debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" ) 106 sys.exit( 1 ) 107 108 def removeQuotes( self, value ): 109 110 clean_value = value 111 clean_value = clean_value.replace( "'", "" ) 112 clean_value = clean_value.replace( '"', '' ) 113 clean_value = clean_value.strip() 114 115 return clean_value 116 117 def getVal( self, section, valname ): 118 119 cfg_fp = open( self.config_file ) 120 section_start = False 121 section_found = False 122 value = None 123 124 for line in cfg_fp.readlines(): 125 126 if line.find( section ) != -1: 127 128 section_found = True 129 130 if line.find( '{' ) != -1 and section_found: 131 132 section_start = True 133 134 if line.find( '}' ) != -1 and section_found: 135 136 section_start = False 137 section_found = False 138 139 if line.find( valname ) != -1 and section_start: 140 141 value = string.join( line.split( '=' )[1:], '' ).strip() 142 143 cfg_fp.close() 144 145 return value 146 147 def getInt( self, section, valname ): 148 149 value = self.getVal( section, valname ) 150 151 if not value: 152 return False 153 154 value = self.removeQuotes( value ) 155 156 return int( value ) 157 158 def getStr( self, section, valname ): 159 160 value = self.getVal( section, valname ) 161 162 if not value: 163 return False 164 165 value = self.removeQuotes( value ) 166 167 return str( value ) 168 168 169 169 def findGmetric(): 170 170 171 172 173 guess= '%s/%s' %( dir, 'gmetric' )174 175 176 177 178 179 171 for dir in os.path.expandvars( '$PATH' ).split( ':' ): 172 173 guess = '%s/%s' %( dir, 'gmetric' ) 174 175 if os.path.exists( guess ): 176 177 return guess 178 179 return False 180 180 181 181 def loadConfig( filename ): … … 213 213 return my_list 214 214 215 cfg= ConfigParser.ConfigParser()216 217 218 219 220 221 222 223 224 DEBUG_LEVEL= cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )225 226 DAEMONIZE= cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )227 228 SYSLOG_LEVEL= -1229 SYSLOG_FACILITY= None230 231 232 USE_SYSLOG= cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )233 234 235 236 USE_SYSLOG= True237 238 239 240 241 242 243 SYSLOG_LEVEL= cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )244 245 246 247 248 SYSLOG_LEVEL= 0249 250 251 252 253 254 255 256 257 258 259 260 261 262 BATCH_SERVER= cfg.get( 'DEFAULT', 'BATCH_SERVER' )263 264 265 266 267 268 269 BATCH_SERVER= cfg.get( 'DEFAULT', 'TORQUE_SERVER' )270 api_guess= 'pbs'271 272 273 274 BATCH_POLL_INTERVAL= cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )275 276 277 278 279 280 281 BATCH_POLL_INTERVAL= cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )282 api_guess= 'pbs'283 284 285 286 GMOND_CONF= cfg.get( 'DEFAULT', 'GMOND_CONF' )287 288 289 290 291 292 GMOND_CONF= '/etc/gmond.conf'293 294 ganglia_cfg= GangliaConfigParser( GMOND_CONF )295 296 297 298 gmetric_dest_ip= ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )299 300 301 302 303 304 gmetric_dest_ip= ganglia_cfg.getStr( 'udp_send_channel', 'host' )305 306 gmetric_dest_port= ganglia_cfg.getStr( 'udp_send_channel', 'port' )307 308 309 310 GMETRIC_TARGET= '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )311 312 313 314 315 316 317 318 319 GMETRIC_TARGET= cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )320 321 322 323 324 325 GMETRIC_TARGET= None326 327 328 329 gmetric_bin= findGmetric()330 331 332 333 GMETRIC_BINARY= gmetric_bin334 335 336 337 338 339 GMETRIC_BINARY= cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )340 341 342 343 344 345 346 DETECT_TIME_DIFFS= cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )347 348 BATCH_HOST_TRANSLATE= getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )349 350 351 352 BATCH_API= cfg.get( 'DEFAULT', 'BATCH_API' )353 354 355 356 357 358 BATCH_API= api_guess359 360 361 362 363 364 365 QUEUE= getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )366 367 368 369 QUEUE= None370 371 215 cfg = ConfigParser.ConfigParser() 216 217 cfg.read( filename ) 218 219 global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL 220 global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE 221 global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG 222 global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY 223 224 DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) 225 226 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) 227 228 SYSLOG_LEVEL = -1 229 SYSLOG_FACILITY = None 230 231 try: 232 USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' ) 233 234 except ConfigParser.NoOptionError: 235 236 USE_SYSLOG = True 237 238 debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' ) 239 240 if USE_SYSLOG: 241 242 try: 243 SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' ) 244 245 except ConfigParser.NoOptionError: 246 247 debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' ) 248 SYSLOG_LEVEL = 0 249 250 try: 251 252 SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) ) 253 254 except ConfigParser.NoOptionError: 255 256 SYSLOG_FACILITY = syslog.LOG_DAEMON 257 258 debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' ) 259 260 try: 261 262 BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' ) 263 264 except ConfigParser.NoOptionError: 265 266 # Backwards compatibility for old configs 267 # 268 269 BATCH_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' ) 270 api_guess = 'pbs' 271 272 try: 273 274 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' ) 275 276 except ConfigParser.NoOptionError: 277 278 # Backwards compatibility for old configs 279 # 280 281 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' ) 282 api_guess = 'pbs' 283 284 try: 285 286 GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' ) 287 288 except ConfigParser.NoOptionError: 289 290 # Not specified: assume /etc/gmond.conf 291 # 292 GMOND_CONF = '/etc/gmond.conf' 293 294 ganglia_cfg = GangliaConfigParser( GMOND_CONF ) 295 296 # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF 297 # 298 gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' ) 299 300 if not gmetric_dest_ip: 301 302 # Maybe unicast target then 303 # 304 gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'host' ) 305 306 gmetric_dest_port = ganglia_cfg.getStr( 'udp_send_channel', 'port' ) 307 308 if gmetric_dest_ip and gmetric_dest_port: 309 310 GMETRIC_TARGET = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port ) 311 else: 312 313 debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF ) 314 315 # Couldn't figure it out: let's see if it's in our jobmond.conf 316 # 317 try: 318 319 GMETRIC_TARGET = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' ) 320 321 # Guess not: now just give up 322 # 323 except ConfigParser.NoOptionError: 324 325 GMETRIC_TARGET = None 326 327 debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" ) 328 329 gmetric_bin = findGmetric() 330 331 if gmetric_bin: 332 333 GMETRIC_BINARY = gmetric_bin 334 else: 335 debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" ) 336 337 try: 338 339 GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' ) 340 341 except ConfigParser.NoOptionError: 342 343 debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" ) 344 sys.exit( 1 ) 345 346 DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' ) 347 348 BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) ) 349 350 try: 351 352 BATCH_API = cfg.get( 'DEFAULT', 'BATCH_API' ) 353 354 except ConfigParser.NoOptionError, detail: 355 356 if BATCH_SERVER and api_guess: 357 358 BATCH_API = api_guess 359 else: 360 debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" ) 361 sys.exit( 1 ) 362 363 try: 364 365 QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) ) 366 367 except ConfigParser.NoOptionError, detail: 368 369 QUEUE = None 370 371 return True 372 372 373 373 def fqdn_parts (fqdn): 374 374 375 376 377 378 379 375 """Return pair of host and domain for fully-qualified domain name arg.""" 376 377 parts = fqdn.split (".") 378 379 return (parts[0], string.join(parts[1:], ".")) 380 380 381 381 METRIC_MAX_VAL_LEN = 900 … … 383 383 class DataProcessor: 384 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 incompatible= 0429 430 gfp= os.popen( self.binary + ' --version' )431 lines= gfp.readlines()432 433 434 435 436 437 438 439 440 441 gmetric_version= line[1].split( '\n' )[0]442 443 version_major= int( gmetric_version.split( '.' )[0] )444 version_minor= int( gmetric_version.split( '.' )[1] )445 version_patch= int( gmetric_version.split( '.' )[2] )446 447 incompatible= 0448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 GMETRIC_TARGET_HOST= GMETRIC_TARGET.split( ':' )[0]482 GMETRIC_TARGET_PORT= GMETRIC_TARGET.split( ':' )[1]483 484 metric_debug= "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 385 """Class for processing of data""" 386 387 binary = None 388 389 def __init__( self, binary=None ): 390 391 """Remember alternate binary location if supplied""" 392 393 global GMETRIC_BINARY 394 395 if binary: 396 self.binary = binary 397 398 if not self.binary: 399 self.binary = GMETRIC_BINARY 400 401 # Timeout for XML 402 # 403 # From ganglia's documentation: 404 # 405 # 'A metric will be deleted DMAX seconds after it is received, and 406 # DMAX=0 means eternal life.' 407 408 self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) ) 409 410 if GMOND_CONF: 411 412 incompatible = self.checkGmetricVersion() 413 414 if incompatible: 415 416 debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' ) 417 sys.exit( 1 ) 418 419 def checkGmetricVersion( self ): 420 421 """ 422 Check version of gmetric is at least 3.0.1 423 for the syntax we use 424 """ 425 426 global METRIC_MAX_VAL_LEN 427 428 incompatible = 0 429 430 gfp = os.popen( self.binary + ' --version' ) 431 lines = gfp.readlines() 432 433 gfp.close() 434 435 for line in lines: 436 437 line = line.split( ' ' ) 438 439 if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1: 440 441 gmetric_version = line[1].split( '\n' )[0] 442 443 version_major = int( gmetric_version.split( '.' )[0] ) 444 version_minor = int( gmetric_version.split( '.' )[1] ) 445 version_patch = int( gmetric_version.split( '.' )[2] ) 446 447 incompatible = 0 448 449 if version_major < 3: 450 451 incompatible = 1 452 453 elif version_major == 3: 454 455 if version_minor == 0: 456 457 if version_patch < 1: 458 459 incompatible = 1 460 461 # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length 462 # 463 if version_patch < 3: 464 465 METRIC_MAX_VAL_LEN = 900 466 467 elif version_patch >= 3: 468 469 METRIC_MAX_VAL_LEN = 1400 470 471 return incompatible 472 473 def multicastGmetric( self, metricname, metricval, valtype='string', units='' ): 474 475 """Call gmetric binary and multicast""" 476 477 cmd = self.binary 478 479 if GMETRIC_TARGET: 480 481 GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0] 482 GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1] 483 484 metric_debug = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) ) 485 486 debug_msg( 10, printTime() + ' ' + metric_debug) 487 488 gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT ) 489 490 gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units ) 491 492 else: 493 try: 494 cmd = cmd + ' -c' + GMOND_CONF 495 496 except NameError: 497 498 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' ) 499 500 cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax ) 501 502 if len( units ) > 0: 503 504 cmd = cmd + ' -u"' + units + '"' 505 506 debug_msg( 10, printTime() + ' ' + cmd ) 507 508 os.system( cmd ) 509 509 510 510 class DataGatherer: 511 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 oldData = jobs[ job_id ] 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 running_jobs= 0578 queued_jobs= 0579 580 512 """Skeleton class for batch system DataGatherer""" 513 514 def printJobs( self, jobs ): 515 516 """Print a jobinfo overview""" 517 518 for name, attrs in self.jobs.items(): 519 520 print 'job %s' %(name) 521 522 for name, val in attrs.items(): 523 524 print '\t%s = %s' %( name, val ) 525 526 def printJob( self, jobs, job_id ): 527 528 """Print job with job_id from jobs""" 529 530 print 'job %s' %(job_id) 531 532 for name, val in jobs[ job_id ].items(): 533 534 print '\t%s = %s' %( name, val ) 535 536 def getAttr( self, attrs, name ): 537 538 """Return certain attribute from dictionary, if exists""" 539 540 if attrs.has_key( name ): 541 542 return attrs[ name ] 543 else: 544 return '' 545 546 def jobDataChanged( self, jobs, job_id, attrs ): 547 548 """Check if job with attrs and job_id in jobs has changed""" 549 550 if jobs.has_key( job_id ): 551 552 oldData = jobs[ job_id ] 553 else: 554 return 1 555 556 for name, val in attrs.items(): 557 558 if oldData.has_key( name ): 559 560 if oldData[ name ] != attrs[ name ]: 561 562 return 1 563 564 else: 565 return 1 566 567 return 0 568 569 def submitJobData( self ): 570 571 """Submit job info list""" 572 573 global BATCH_API 574 575 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 576 577 running_jobs = 0 578 queued_jobs = 0 579 580 # Count how many running/queued jobs we found 581 581 # 582 583 584 585 586 587 588 589 590 591 592 582 for jobid, jobattrs in self.jobs.items(): 583 584 if jobattrs[ 'status' ] == 'Q': 585 586 queued_jobs += 1 587 588 elif jobattrs[ 'status' ] == 'R': 589 590 running_jobs += 1 591 592 # Report running/queued jobs as seperate metric for a nice RRD graph 593 593 # 594 595 596 597 598 599 600 601 domain= fqdn_parts( socket.getfqdn() )[1]602 603 downed_nodes= list()604 offline_nodes= list()605 606 l= ['state']607 608 609 610 611 612 613 614 615 616 617 618 downnodeslist= do_nodelist( downed_nodes )619 offlinenodeslist= do_nodelist( offline_nodes )620 621 down_str= 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )622 offl_str= 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )623 624 625 626 627 628 629 630 594 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' ) 595 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' ) 596 597 # Report down/offline nodes in batch (PBS only ATM) 598 # 599 if BATCH_API == 'pbs': 600 601 domain = fqdn_parts( socket.getfqdn() )[1] 602 603 downed_nodes = list() 604 offline_nodes = list() 605 606 l = ['state'] 607 608 for name, node in self.pq.getnodes().items(): 609 610 if ( node[ 'state' ].find( "down" ) != -1 ): 611 612 downed_nodes.append( name ) 613 614 if ( node[ 'state' ].find( "offline" ) != -1 ): 615 616 offline_nodes.append( name ) 617 618 downnodeslist = do_nodelist( downed_nodes ) 619 offlinenodeslist = do_nodelist( offline_nodes ) 620 621 down_str = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 622 offl_str = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 623 self.dp.multicastGmetric( 'MONARCH-DOWN' , down_str ) 624 self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str ) 625 626 # Now let's spread the knowledge 627 # 628 for jobid, jobattrs in self.jobs.items(): 629 630 # Make gmetric values for each job: respect max gmetric value length 631 631 # 632 gmetric_val= self.compileGmetricVal( jobid, jobattrs )633 metric_increment= 0634 635 632 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 633 metric_increment = 0 634 635 # If we have more job info than max gmetric value length allows, split it up 636 636 # amongst multiple metrics 637 638 639 640 641 642 637 # 638 for val in gmetric_val: 639 640 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 641 642 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric 643 643 # 644 metric_increment= metric_increment + 1645 646 647 648 649 650 gval_lists= [ ]651 val_list= { }652 653 654 655 644 metric_increment = metric_increment + 1 645 646 def compileGmetricVal( self, jobid, jobattrs ): 647 648 """Create a val string for gmetric of jobinfo""" 649 650 gval_lists = [ ] 651 val_list = { } 652 653 for val_name, val_value in jobattrs.items(): 654 655 # These are our own metric names, i.e.: status, start_timestamp, etc 656 656 # 657 val_list_names_len= len( string.join( val_list.keys() ) ) + len(val_list.keys())658 659 657 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 658 659 # These are their corresponding values 660 660 # 661 val_list_vals_len= len( string.join( val_list.values() ) ) + len(val_list.values())662 663 664 665 666 667 668 669 670 671 672 673 674 675 661 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 662 663 if val_name == 'nodes' and jobattrs['status'] == 'R': 664 665 node_str = None 666 667 for node in val_value: 668 669 if node_str: 670 671 node_str = node_str + ';' + node 672 else: 673 node_str = node 674 675 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 676 676 # 677 678 679 677 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 678 679 # It's too big, we need to make a new gmetric for the additional info 680 680 # 681 val_list[ val_name ]= node_str682 683 684 685 val_list= { }686 node_str= None687 688 val_list[ val_name ]= node_str689 690 691 692 val_list= { }693 694 695 696 681 val_list[ val_name ] = node_str 682 683 gval_lists.append( val_list ) 684 685 val_list = { } 686 node_str = None 687 688 val_list[ val_name ] = node_str 689 690 gval_lists.append( val_list ) 691 692 val_list = { } 693 694 elif val_value != '': 695 696 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 697 697 # 698 699 700 698 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 699 700 # It's too big, we need to make a new gmetric for the additional info 701 701 # 702 703 704 val_list= { }705 706 val_list[ val_name ]= val_value707 708 709 710 711 712 str_list= [ ]713 714 702 gval_lists.append( val_list ) 703 704 val_list = { } 705 706 val_list[ val_name ] = val_value 707 708 if len( val_list ) > 0: 709 710 gval_lists.append( val_list ) 711 712 str_list = [ ] 713 714 # Now append the value names and values together, i.e.: stop_timestamp=value, etc 715 715 # 716 717 718 my_val_str= None719 720 721 722 723 724 val_value= val_value.join( ',' )725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 716 for val_list in gval_lists: 717 718 my_val_str = None 719 720 for val_name, val_value in val_list.items(): 721 722 if type(val_value) == list: 723 724 val_value = val_value.join( ',' ) 725 726 if my_val_str: 727 728 try: 729 # fixme: It's getting 730 # ('nodes', None) items 731 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 732 except: 733 pass 734 735 else: 736 my_val_str = val_name + '=' + val_value 737 738 str_list.append( my_val_str ) 739 740 return str_list 741 741 742 742 def daemon( self ): … … 760 760 sys.exit(0) # end parent 761 761 762 762 write_pidfile() 763 763 764 764 # Go to the root directory and set the umask … … 782 782 783 783 while ( 1 ): 784 785 786 787 time.sleep( BATCH_POLL_INTERVAL ) 784 785 self.getJobData() 786 self.submitJobData() 787 time.sleep( BATCH_POLL_INTERVAL ) 788 788 789 789 # SGE code by Dave Love <fx@gnu.org>. Tested with SGE 6.0u8 and 6.0u11. May … … 792 792 793 793 class NoJobs (Exception): 794 795 794 """Exception raised by empty job list in qstat output.""" 795 pass 796 796 797 797 class SgeQstatXMLParser(xml.sax.handler.ContentHandler): 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 if name == "djob_info":# job list852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 798 """SAX handler for XML output from Sun Grid Engine's `qstat'.""" 799 800 def __init__(self): 801 self.value = "" 802 self.joblist = [] 803 self.job = {} 804 self.queue = "" 805 self.in_joblist = False 806 self.lrequest = False 807 self.eltq = deque() 808 xml.sax.handler.ContentHandler.__init__(self) 809 810 # The structure of the output is as follows (for SGE 6.0). It's 811 # similar for 6.1, but radically different for 6.2, and is 812 # undocumented generally. Unfortunately it's voluminous, and probably 813 # doesn't scale to large clusters/queues. 814 815 # <detailed_job_info xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 816 # <djob_info> 817 # <qmaster_response> <!-- job --> 818 # ... 819 # <JB_ja_template> 820 # <ulong_sublist> 821 # ... <!-- start_time, state ... --> 822 # </ulong_sublist> 823 # </JB_ja_template> 824 # <JB_ja_tasks> 825 # <ulong_sublist> 826 # ... <!-- task info 827 # </ulong_sublist> 828 # ... 829 # </JB_ja_tasks> 830 # ... 831 # </qmaster_response> 832 # </djob_info> 833 # <messages> 834 # ... 835 836 # NB. We might treat each task as a separate job, like 837 # straight qstat output, but the web interface expects jobs to 838 # be identified by integers, not, say, <job number>.<task>. 839 840 # So, I lied. If the job list is empty, we get invalid XML 841 # like this, which we need to defend against: 842 843 # <unknown_jobs xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 844 # <> 845 # <ST_name>*</ST_name> 846 # </> 847 # </unknown_jobs> 848 849 def startElement(self, name, attrs): 850 self.value = "" 851 if name == "djob_info": # job list 852 self.in_joblist = True 853 # The job container is "qmaster_response" in SGE 6.0 854 # and 6.1, but "element" in 6.2. This is only the very 855 # start of what's necessary for 6.2, though (sigh). 856 elif (name == "qmaster_response" or name == "element") \ 857 and self.eltq[-1] == "djob_info": # job 858 self.job = {"job_state": "U", "slots": 0, 859 "nodes": [], "queued_timestamp": "", 860 "queued_timestamp": "", "queue": "", 861 "ppn": "0", "RN_max": 0, 862 # fixme in endElement 863 "requested_memory": 0, "requested_time": 0 864 } 865 self.joblist.append(self.job) 866 elif name == "qstat_l_requests": # resource request 867 self.lrequest = True 868 elif name == "unknown_jobs": 869 raise NoJobs 870 self.eltq.append (name) 871 872 def characters(self, ch): 873 self.value += ch 874 875 def endElement(self, name): 876 """Snarf job elements contents into job dictionary. 877 Translate keys if appropriate.""" 878 879 name_trans = { 880 "JB_job_number": "number", 881 "JB_job_name": "name", "JB_owner": "owner", 882 "queue_name": "queue", "JAT_start_time": "start_timestamp", 883 "JB_submission_time": "queued_timestamp" 884 } 885 value = self.value 886 self.eltq.pop () 887 888 if name == "djob_info": 889 self.in_joblist = False 890 self.job = {} 891 elif name == "JAT_master_queue": 892 self.job["queue"] = value.split("@")[0] 893 elif name == "JG_qhostname": 894 if not (value in self.job["nodes"]): 895 self.job["nodes"].append(value) 896 elif name == "JG_slots": # slots in use 897 self.job["slots"] += int(value) 898 elif name == "RN_max": # requested slots (tasks or parallel) 899 self.job["RN_max"] = max (self.job["RN_max"], 900 int(value)) 901 elif name == "JAT_state": # job state (bitwise or) 902 value = int (value) 903 # Status values from sge_jobL.h 904 #define JIDLE 0x00000000 905 #define JHELD 0x00000010 906 #define JMIGRATING 0x00000020 907 #define JQUEUED 0x00000040 908 #define JRUNNING 0x00000080 909 #define JSUSPENDED 0x00000100 910 #define JTRANSFERING 0x00000200 911 #define JDELETED 0x00000400 912 #define JWAITING 0x00000800 913 #define JEXITING 0x00001000 914 #define JWRITTEN 0x00002000 915 #define JSUSPENDED_ON_THRESHOLD 0x00010000 916 #define JFINISHED 0x00010000 917 if value & 0x80: 918 self.job["status"] = "R" 919 elif value & 0x40: 920 self.job["status"] = "Q" 921 else: 922 self.job["status"] = "O" # `other' 923 elif name == "CE_name" and self.lrequest and self.value in \ 924 ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"): 925 # We're in a container for an interesting resource 926 # request; record which type. 927 self.lrequest = self.value 928 elif name == "CE_doubleval" and self.lrequest: 929 # if we're in a container for an interesting 930 # resource request, use the maxmimum of the hard 931 # and soft requests to record the requested CPU 932 # or core. Fixme: I'm not sure if this logic is 933 # right. 934 if self.lrequest in ("h_core", "s_core"): 935 self.job["requested_memory"] = \ 936 max (float (value), 937 self.job["requested_memory"]) 938 # Fixme: Check what cpu means, c.f [hs]_cpu. 939 elif self.lrequest in ("h_cpu", "s_cpu", "cpu"): 940 self.job["requested_time"] = \ 941 max (float (value), 942 self.job["requested_time"]) 943 elif name == "qstat_l_requests": 944 self.lrequest = False 945 elif self.job and self.in_joblist: 946 if name in name_trans: 947 name = name_trans[name] 948 self.job[name] = value 949 949 950 950 # Abstracted from PBS original. … … 953 953 def do_nodelist( nodes ): 954 954 955 956 957 nodeslist= [ ]958 my_domain= fqdn_parts( socket.getfqdn() )[1]959 960 961 962 host= node.split( '/' )[0] # not relevant for SGE963 h, host_domain= fqdn_parts(host)964 965 966 967 host= h968 969 970 971 972 973 974 975 translate_orig= \976 977 translate_new= \978 979 980 981 982 983 955 """Translate node list as appropriate.""" 956 957 nodeslist = [ ] 958 my_domain = fqdn_parts( socket.getfqdn() )[1] 959 960 for node in nodes: 961 962 host = node.split( '/' )[0] # not relevant for SGE 963 h, host_domain = fqdn_parts(host) 964 965 if host_domain == my_domain: 966 967 host = h 968 969 if nodeslist.count( host ) == 0: 970 971 for translate_pattern in BATCH_HOST_TRANSLATE: 972 973 if translate_pattern.find( '/' ) != -1: 974 975 translate_orig = \ 976 translate_pattern.split( '/' )[1] 977 translate_new = \ 978 translate_pattern.split( '/' )[2] 979 host = re.sub( translate_orig, 980 translate_new, host ) 981 if not host in nodeslist: 982 nodeslist.append( host ) 983 return nodeslist 984 984 985 985 class SgeDataGatherer(DataGatherer): 986 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 if QUEUE:# only for specific queues1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 987 jobs = {} 988 989 def __init__( self ): 990 self.jobs = {} 991 self.timeoffset = 0 992 self.dp = DataProcessor() 993 994 def getJobData( self ): 995 """Gather all data on current jobs in SGE""" 996 997 import popen2 998 999 self.cur_time = 0 1000 queues = "" 1001 if QUEUE: # only for specific queues 1002 # Fixme: assumes queue names don't contain single 1003 # quote or comma. Don't know what the SGE rules are. 1004 queues = " -q '" + string.join (QUEUE, ",") + "'" 1005 # Note the comment in SgeQstatXMLParser about scaling with 1006 # this method of getting data. I haven't found better one. 1007 # Output with args `-xml -ext -f -r' is easier to parse 1008 # in some ways, harder in others, but it doesn't provide 1009 # the submission time (at least SGE 6.0). The pipeline 1010 # into sed corrects bogus XML observed with a configuration 1011 # of SGE 6.0u8, which otherwise causes the parsing to hang. 1012 piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \ 1013 1013 sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \ 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 self.timeoffset= \1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1014 + queues, True) 1015 qstatparser = SgeQstatXMLParser() 1016 parse_err = 0 1017 try: 1018 xml.sax.parse(piping.fromchild, qstatparser) 1019 except NoJobs: 1020 pass 1021 except: 1022 parse_err = 1 1023 if piping.wait(): 1024 debug_msg(10, 1025 "qstat error, skipping until next polling interval: " 1026 + piping.childerr.readline()) 1027 return None 1028 elif parse_err: 1029 debug_msg(10, "Bad XML output from qstat"()) 1030 exit (1) 1031 for f in piping.fromchild, piping.tochild, piping.childerr: 1032 f.close() 1033 self.cur_time = time.time() 1034 jobs_processed = [] 1035 for job in qstatparser.joblist: 1036 job_id = job["number"] 1037 if job["status"] in [ 'Q', 'R' ]: 1038 jobs_processed.append(job_id) 1039 if job["status"] == "R": 1040 job["nodes"] = do_nodelist (job["nodes"]) 1041 # Fixme: why is job["nodes"] sometimes null? 1042 try: 1043 # Fixme: Is this sensible? The 1044 # PBS-type PPN isn't something you use 1045 # with SGE. 1046 job["ppn"] = float(job["slots"]) / \ 1047 len(job["nodes"]) 1048 except: 1049 job["ppn"] = 0 1050 if DETECT_TIME_DIFFS: 1051 # If a job start is later than our 1052 # current date, that must mean 1053 # the SGE server's time is later 1054 # than our local time. 1055 start_timestamp = \ 1056 int (job["start_timestamp"]) 1057 if start_timestamp > \ 1058 int(self.cur_time) + \ 1059 int(self.timeoffset): 1060 1061 self.timeoffset = \ 1062 start_timestamp - \ 1063 int(self.cur_time) 1064 else: 1065 # fixme: Note sure what this should be: 1066 job["ppn"] = job["RN_max"] 1067 job["nodes"] = "1" 1068 1069 myAttrs = {} 1070 for attr in ["name", "queue", "owner", 1071 "requested_time", "status", 1072 "requested_memory", "ppn", 1073 "start_timestamp", "queued_timestamp"]: 1074 myAttrs[attr] = str(job[attr]) 1075 myAttrs["nodes"] = job["nodes"] 1076 myAttrs["reported"] = str(int(self.cur_time) + \ 1077 int(self.timeoffset)) 1078 myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1] 1079 myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL) 1080 1081 if self.jobDataChanged(self.jobs, job_id, myAttrs) \ 1082 and myAttrs["status"] in ["R", "Q"]: 1083 self.jobs[job_id] = myAttrs 1084 for id, attrs in self.jobs.items(): 1085 if id not in jobs_processed: 1086 del self.jobs[id] 1087 1087 1088 1088 # LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt> … … 1104 1104 def _countDuplicatesInList( self, dupedList ): 1105 1105 1106 countDupes= { }1107 1108 1109 1110 1111 1112 countDupes[ item ]= 11113 1114 countDupes[ item ]= countDupes[ item ] + 11115 1116 dupeCountList= [ ]1117 1118 1119 1120 1106 countDupes = { } 1107 1108 for item in dupedList: 1109 1110 if not countDupes.has_key( item ): 1111 1112 countDupes[ item ] = 1 1113 else: 1114 countDupes[ item ] = countDupes[ item ] + 1 1115 1116 dupeCountList = [ ] 1117 1118 for item, count in countDupes.items(): 1119 1120 dupeCountList.append( ( item, count ) ) 1121 1121 1122 1122 return dupeCountList … … 1180 1180 requested_cpus = 1 1181 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1182 if QUEUE: 1183 for q in QUEUE: 1184 if q == queue: 1185 display_queue = 1 1186 break 1187 else: 1188 display_queue = 0 1189 continue 1190 if display_queue == 0: 1191 continue 1192 1192 1193 1193 runState = self.getAttr( attrs, 'status' ) … … 1225 1225 myAttrs['name'] = name 1226 1226 1227 myAttrs[ 'owner' ] 1228 myAttrs[ 'requested_time' ] 1229 myAttrs[ 'requested_memory' ] 1230 myAttrs[ 'requested_cpus' ] 1231 myAttrs[ 'ppn' ] 1232 myAttrs[ 'status' ] 1233 myAttrs[ 'start_timestamp' ] 1234 myAttrs[ 'queue' ] 1235 myAttrs[ 'queued_timestamp' ] 1236 myAttrs[ 'reported' ] 1237 myAttrs[ 'nodes' ] 1238 myAttrs[ 'domain' ]= fqdn_parts( socket.getfqdn() )[1]1239 myAttrs[ 'poll_interval' ] 1227 myAttrs[ 'owner' ] = owner 1228 myAttrs[ 'requested_time' ] = str(requested_time) 1229 myAttrs[ 'requested_memory' ] = str(requested_memory) 1230 myAttrs[ 'requested_cpus' ] = str(requested_cpus) 1231 myAttrs[ 'ppn' ] = str( ppn ) 1232 myAttrs[ 'status' ] = status 1233 myAttrs[ 'start_timestamp' ] = str(start_timestamp) 1234 myAttrs[ 'queue' ] = str(queue) 1235 myAttrs[ 'queued_timestamp' ] = str(queued_timestamp) 1236 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1237 myAttrs[ 'nodes' ] = do_nodelist( nodelist ) 1238 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1239 myAttrs[ 'poll_interval' ] = str(BATCH_POLL_INTERVAL) 1240 1240 1241 1241 if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: … … 1254 1254 class PbsDataGatherer( DataGatherer ): 1255 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 self.jobs= { }1265 self.timeoffset= 01266 self.dp= DataProcessor()1267 1268 1269 1270 1271 1272 self.pq= None1273 1274 1275 1276 self.pq= PBSQuery( BATCH_SERVER )1277 1278 self.pq= PBSQuery()1279 1280 1281 1282 1283 1284 joblist= {}1285 self.cur_time= 01286 1287 1288 joblist= self.pq.getjobs()1289 self.cur_time= time.time()1290 1291 1292 1293 1294 1295 1296 jobs_processed= [ ]1297 1298 1299 display_queue= 11300 job_id= name.split( '.' )[0]1301 1302 name= self.getAttr( attrs, 'Job_Name' )1303 queue= self.getAttr( attrs, 'queue' )1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 owner= self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]1318 requested_time= self.getAttr( attrs, 'Resource_List.walltime' )1319 requested_memory= self.getAttr( attrs, 'Resource_List.mem' )1320 1321 mynoderequest= self.getAttr( attrs, 'Resource_List.nodes' )1322 1323 ppn= ''1324 1325 1326 1327 mynoderequest_fields= mynoderequest.split( ':' )1328 1329 1330 1331 1332 1333 ppn= mynoderequest_field.split( 'ppn=' )[1]1334 1335 status= self.getAttr( attrs, 'job_state' )1336 1337 1338 1339 1340 1341 queued_timestamp= self.getAttr( attrs, 'ctime' )1342 1343 1344 1345 start_timestamp= self.getAttr( attrs, 'mtime' )1346 nodes= self.getAttr( attrs, 'exec_host' ).split( '+' )1347 1348 nodeslist= do_nodelist( nodes )1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 self.timeoffset= int( int(start_timestamp) - int(self.cur_time) )1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 start_timestamp= ''1375 count_mynodes= 01376 1377 1378 1379 1380 1381 nodepart= node.split( ':' )[0]1382 1383 1384 1385 numeric_node= 11386 1387 1388 1389 1390 1391 1392 1393 1394 1395 numeric_node= 01396 1397 1398 1399 1400 1401 count_mynodes= count_mynodes + 11402 1403 1404 1405 1406 1407 1408 count_mynodes= count_mynodes + int( nodepart )1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 nodeslist= str( count_mynodes )1422 1423 start_timestamp= ''1424 nodeslist= ''1425 1426 myAttrs= { }1427 1428 myAttrs[ 'name' ]= str( name )1429 myAttrs[ 'queue' ]= str( queue )1430 myAttrs[ 'owner' ]= str( owner )1431 myAttrs[ 'requested_time' ]= str( requested_time )1432 myAttrs[ 'requested_memory' ]= str( requested_memory )1433 myAttrs[ 'ppn' ]= str( ppn )1434 myAttrs[ 'status' ]= str( status )1435 myAttrs[ 'start_timestamp' ]= str( start_timestamp )1436 myAttrs[ 'queued_timestamp' ]= str( queued_timestamp )1437 myAttrs[ 'reported' ]= str( int( int( self.cur_time ) + int( self.timeoffset ) ) )1438 myAttrs[ 'nodes' ]= nodeslist1439 myAttrs[ 'domain' ]= fqdn_parts( socket.getfqdn() )[1]1440 myAttrs[ 'poll_interval' ]= str( BATCH_POLL_INTERVAL )1441 1442 1443 1444 self.jobs[ job_id ]= myAttrs1445 1446 1447 1448 1449 1450 1451 1452 1256 """This is the DataGatherer for PBS and Torque""" 1257 1258 global PBSQuery, PBSError 1259 1260 def __init__( self ): 1261 1262 """Setup appropriate variables""" 1263 1264 self.jobs = { } 1265 self.timeoffset = 0 1266 self.dp = DataProcessor() 1267 1268 self.initPbsQuery() 1269 1270 def initPbsQuery( self ): 1271 1272 self.pq = None 1273 1274 if( BATCH_SERVER ): 1275 1276 self.pq = PBSQuery( BATCH_SERVER ) 1277 else: 1278 self.pq = PBSQuery() 1279 1280 def getJobData( self ): 1281 1282 """Gather all data on current jobs in Torque""" 1283 1284 joblist = {} 1285 self.cur_time = 0 1286 1287 try: 1288 joblist = self.pq.getjobs() 1289 self.cur_time = time.time() 1290 1291 except PBSError, detail: 1292 1293 debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) ) 1294 return None 1295 1296 jobs_processed = [ ] 1297 1298 for name, attrs in joblist.items(): 1299 display_queue = 1 1300 job_id = name.split( '.' )[0] 1301 1302 name = self.getAttr( attrs, 'Job_Name' ) 1303 queue = self.getAttr( attrs, 'queue' ) 1304 1305 if QUEUE: 1306 for q in QUEUE: 1307 if q == queue: 1308 display_queue = 1 1309 break 1310 else: 1311 display_queue = 0 1312 continue 1313 if display_queue == 0: 1314 continue 1315 1316 1317 owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0] 1318 requested_time = self.getAttr( attrs, 'Resource_List.walltime' ) 1319 requested_memory = self.getAttr( attrs, 'Resource_List.mem' ) 1320 1321 mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' ) 1322 1323 ppn = '' 1324 1325 if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1: 1326 1327 mynoderequest_fields = mynoderequest.split( ':' ) 1328 1329 for mynoderequest_field in mynoderequest_fields: 1330 1331 if mynoderequest_field.find( 'ppn' ) != -1: 1332 1333 ppn = mynoderequest_field.split( 'ppn=' )[1] 1334 1335 status = self.getAttr( attrs, 'job_state' ) 1336 1337 if status in [ 'Q', 'R' ]: 1338 1339 jobs_processed.append( job_id ) 1340 1341 queued_timestamp = self.getAttr( attrs, 'ctime' ) 1342 1343 if status == 'R': 1344 1345 start_timestamp = self.getAttr( attrs, 'mtime' ) 1346 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' ) 1347 1348 nodeslist = do_nodelist( nodes ) 1349 1350 if DETECT_TIME_DIFFS: 1351 1352 # If a job start if later than our current date, 1353 # that must mean the Torque server's time is later 1354 # than our local time. 1355 1356 if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ): 1357 1358 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 1359 1360 elif status == 'Q': 1361 1362 # 'mynodequest' can be a string in the following syntax according to the 1363 # Torque Administator's manual: 1364 # 1365 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...] 1366 # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...] 1367 # etc 1368 # 1369 1370 # 1371 # For now we only count the amount of nodes request and ignore properties 1372 # 1373 1374 start_timestamp = '' 1375 count_mynodes = 0 1376 1377 for node in mynoderequest.split( '+' ): 1378 1379 # Just grab the {node_count|hostname} part and ignore properties 1380 # 1381 nodepart = node.split( ':' )[0] 1382 1383 # Let's assume a node_count value 1384 # 1385 numeric_node = 1 1386 1387 # Chop the value up into characters 1388 # 1389 for letter in nodepart: 1390 1391 # If this char is not a digit (0-9), this must be a hostname 1392 # 1393 if letter not in string.digits: 1394 1395 numeric_node = 0 1396 1397 # If this is a hostname, just count this as one (1) node 1398 # 1399 if not numeric_node: 1400 1401 count_mynodes = count_mynodes + 1 1402 else: 1403 1404 # If this a number, it must be the node_count 1405 # and increase our count with it's value 1406 # 1407 try: 1408 count_mynodes = count_mynodes + int( nodepart ) 1409 1410 except ValueError, detail: 1411 1412 # When we arrive here I must be bugged or very confused 1413 # THIS SHOULD NOT HAPPEN! 1414 # 1415 debug_msg( 10, str( detail ) ) 1416 debug_msg( 10, "Encountered weird node in Resources_List?!" ) 1417 debug_msg( 10, 'nodepart = ' + str( nodepart ) ) 1418 debug_msg( 10, 'job = ' + str( name ) ) 1419 debug_msg( 10, 'attrs = ' + str( attrs ) ) 1420 1421 nodeslist = str( count_mynodes ) 1422 else: 1423 start_timestamp = '' 1424 nodeslist = '' 1425 1426 myAttrs = { } 1427 1428 myAttrs[ 'name' ] = str( name ) 1429 myAttrs[ 'queue' ] = str( queue ) 1430 myAttrs[ 'owner' ] = str( owner ) 1431 myAttrs[ 'requested_time' ] = str( requested_time ) 1432 myAttrs[ 'requested_memory' ] = str( requested_memory ) 1433 myAttrs[ 'ppn' ] = str( ppn ) 1434 myAttrs[ 'status' ] = str( status ) 1435 myAttrs[ 'start_timestamp' ] = str( start_timestamp ) 1436 myAttrs[ 'queued_timestamp' ] = str( queued_timestamp ) 1437 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1438 myAttrs[ 'nodes' ] = nodeslist 1439 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1440 myAttrs[ 'poll_interval' ] = str( BATCH_POLL_INTERVAL ) 1441 1442 if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 1443 1444 self.jobs[ job_id ] = myAttrs 1445 1446 for id, attrs in self.jobs.items(): 1447 1448 if id not in jobs_processed: 1449 1450 # This one isn't there anymore; toedeledoki! 1451 # 1452 del self.jobs[ id ] 1453 1453 1454 1454 # … … 1469 1469 GMETRIC_DEFAULT_HOST = '127.0.0.1' 1470 1470 GMETRIC_DEFAULT_PORT = '8649' 1471 GMETRIC_DEFAULT_UNITS 1471 GMETRIC_DEFAULT_UNITS = '' 1472 1472 1473 1473 class Gmetric: 1474 1474 1475 1476 1477 1478 1479 1480 1481 1475 global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT 1476 1477 slope = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 } 1478 type = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' ) 1479 protocol = ( 'udp', 'multicast' ) 1480 1481 def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ): 1482 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 units= GMETRIC_DEFAULT_UNITS1522 1523 1524 typestr= GMETRIC_DEFAULT_TYPE1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1483 global GMETRIC_DEFAULT_TYPE 1484 1485 self.prot = self.checkHostProtocol( host ) 1486 self.msg = xdrlib.Packer() 1487 self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 1488 1489 if self.prot not in self.protocol: 1490 1491 raise ValueError( "Protocol must be one of: " + str( self.protocol ) ) 1492 1493 if self.prot == 'multicast': 1494 1495 # Set multicast options 1496 # 1497 self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 ) 1498 1499 self.hostport = ( host, int( port ) ) 1500 self.slopestr = 'both' 1501 self.tmax = 60 1502 1503 def checkHostProtocol( self, ip ): 1504 1505 """Detect if a ip adress is a multicast address""" 1506 1507 MULTICAST_ADDRESS_MIN = ( "224", "0", "0", "0" ) 1508 MULTICAST_ADDRESS_MAX = ( "239", "255", "255", "255" ) 1509 1510 ip_fields = ip.split( '.' ) 1511 1512 if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX: 1513 1514 return 'multicast' 1515 else: 1516 return 'udp' 1517 1518 def send( self, name, value, dmax, typestr = '', units = '' ): 1519 1520 if len( units ) == 0: 1521 units = GMETRIC_DEFAULT_UNITS 1522 1523 if len( typestr ) == 0: 1524 typestr = GMETRIC_DEFAULT_TYPE 1525 1526 msg = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax ) 1527 1528 return self.socket.sendto( msg, self.hostport ) 1529 1530 def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ): 1531 1532 if slopestr not in self.slope: 1533 1534 raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) ) 1535 1536 if typestr not in self.type: 1537 1538 raise ValueError( "Type must be one of: " + str( self.type ) ) 1539 1540 if len( name ) == 0: 1541 1542 raise ValueError( "Name must be non-empty" ) 1543 1544 self.msg.reset() 1545 self.msg.pack_int( 0 ) 1546 self.msg.pack_string( typestr ) 1547 self.msg.pack_string( name ) 1548 self.msg.pack_string( str( value ) ) 1549 self.msg.pack_string( unitstr ) 1550 self.msg.pack_int( self.slope[ slopestr ] ) 1551 self.msg.pack_uint( int( tmax ) ) 1552 self.msg.pack_uint( int( dmax ) ) 1553 1554 return self.msg.get_buffer() 1555 1555 1556 1556 def printTime( ): 1557 1557 1558 1559 1560 1558 """Print current time/date in human readable format for log/debug""" 1559 1560 return time.strftime("%a, %d %b %Y %H:%M:%S") 1561 1561 1562 1562 def debug_msg( level, msg ): 1563 1563 1564 1565 1566 1564 """Print msg if at or above current debug level""" 1565 1566 global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL 1567 1567 1568 1568 if (not DAEMONIZE and DEBUG_LEVEL >= level): 1569 1570 1571 1572 1569 sys.stderr.write( msg + '\n' ) 1570 1571 if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level): 1572 syslog.syslog( msg ) 1573 1573 1574 1574 def write_pidfile(): 1575 1575 1576 1577 1578 1579 1580 pid= os.getpid()1581 1582 pidfile= open( PIDFILE, 'w' )1583 1584 1585 1576 # Write pidfile if PIDFILE is set 1577 # 1578 if PIDFILE: 1579 1580 pid = os.getpid() 1581 1582 pidfile = open( PIDFILE, 'w' ) 1583 1584 pidfile.write( str( pid ) ) 1585 pidfile.close() 1586 1586 1587 1587 def main(): 1588 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1589 """Application start""" 1590 1591 global PBSQuery, PBSError, lsfObject 1592 global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE 1593 1594 if not processArgs( sys.argv[1:] ): 1595 1596 sys.exit( 1 ) 1597 1598 # Load appropriate DataGatherer depending on which BATCH_API is set 1599 # and any required modules for the Gatherer 1600 # 1601 if BATCH_API == 'pbs': 1602 1603 try: 1604 from PBSQuery import PBSQuery, PBSError 1605 1606 except ImportError: 1607 1608 debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" ) 1609 sys.exit( 1 ) 1610 1611 gather = PbsDataGatherer() 1612 1613 elif BATCH_API == 'sge': 1614 1615 # Tested with SGE 6.0u11. 1616 # 1617 gather = SgeDataGatherer() 1618 1619 elif BATCH_API == 'lsf': 1620 1621 try: 1622 from lsfObject import lsfObject 1623 except: 1624 debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed") 1625 sys.exit( 1) 1626 1627 gather = LsfDataGatherer() 1628 1629 else: 1630 debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" ) 1631 1632 sys.exit( 1 ) 1633 1634 if( DAEMONIZE and USE_SYSLOG ): 1635 1636 syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY ) 1637 1638 if DAEMONIZE: 1639 1640 gather.daemon() 1641 else: 1642 gather.run() 1643 1643 1644 1644 # wh00t? someone started me! :) 1645 1645 # 1646 1646 if __name__ == '__main__': 1647 1647 main()
Note: See TracChangeset
for help on using the changeset viewer.