Changeset 253 for trunk/jobmond
- Timestamp:
- 04/26/06 17:02:12 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r244 r253 112 112 import time, os, socket, string, re 113 113 114 METRIC_MAX_VAL_LEN = 900 115 114 116 class DataProcessor: 115 117 """Class for processing of data""" … … 130 132 # DMAX=0 means eternal life.' 131 133 132 self.dmax = str( int( int( TORQUE_POLL_INTERVAL ) +2 ) )134 self.dmax = str( int( int( TORQUE_POLL_INTERVAL ) * 2 ) ) 133 135 134 136 try: … … 349 351 350 352 myAttrs = { } 351 myAttrs['name'] = name352 myAttrs['queue'] = queue353 myAttrs['owner'] = owner354 myAttrs['requested_time'] = requested_time355 myAttrs['requested_memory'] = requested_memory356 myAttrs['ppn'] = ppn357 myAttrs['status'] = st atus358 myAttrs['start_timestamp'] = st art_timestamp359 myAttrs['queued_timestamp'] = queued_timestamp353 myAttrs['name'] = str( name ) 354 myAttrs['queue'] = str( queue ) 355 myAttrs['owner'] = str( owner ) 356 myAttrs['requested_time'] = str( requested_time ) 357 myAttrs['requested_memory'] = str( requested_memory ) 358 myAttrs['ppn'] = str( ppn ) 359 myAttrs['status'] = str( status ) 360 myAttrs['start_timestamp'] = str( start_timestamp ) 361 myAttrs['queued_timestamp'] = str( queued_timestamp ) 360 362 myAttrs['reported'] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 361 363 myAttrs['nodes'] = nodeslist 362 364 myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' ) 363 myAttrs['poll_interval'] = TORQUE_POLL_INTERVAL365 myAttrs['poll_interval'] = str( TORQUE_POLL_INTERVAL ) 364 366 365 367 if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: … … 389 391 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 390 392 393 metric_increment = 0 394 391 395 for val in gmetric_val: 392 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid, val ) 393 394 def makeNodeString( self, nodelist ): 395 """Make one big string of all hosts""" 396 397 node_str = None 398 399 for node in nodelist: 400 if not node_str: 401 node_str = node 402 else: 403 node_str = node_str + ';' + node 404 405 return node_str 396 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 397 metric_increment = metric_increment + 1 406 398 407 399 def compileGmetricVal( self, jobid, jobattrs ): 408 400 """Create a val string for gmetric of jobinfo""" 409 401 410 appendList = [ ] 411 appendList.append( 'name=' + jobattrs['name'] ) 412 appendList.append( 'queue=' + jobattrs['queue'] ) 413 appendList.append( 'owner=' + jobattrs['owner'] ) 414 appendList.append( 'requested_time=' + jobattrs['requested_time'] ) 415 416 if jobattrs['requested_memory'] != '': 417 appendList.append( 'requested_memory=' + jobattrs['requested_memory'] ) 418 419 if jobattrs['ppn'] != '': 420 appendList.append( 'ppn=' + jobattrs['ppn'] ) 421 422 appendList.append( 'status=' + jobattrs['status'] ) 423 424 if jobattrs['start_timestamp'] != '': 425 appendList.append( 'start_timestamp=' + jobattrs['start_timestamp'] ) 426 427 if jobattrs['queued_timestamp'] != '': 428 appendList.append( 'queued_timestamp=' + jobattrs['queued_timestamp'] ) 429 430 appendList.append( 'reported=' + jobattrs['reported'] ) 431 appendList.append( 'poll_interval=' + str( jobattrs['poll_interval'] ) ) 432 appendList.append( 'domain=' + jobattrs['domain'] ) 433 434 if jobattrs['status'] == 'R': 435 if len( jobattrs['nodes'] ) > 0: 436 appendList.append( 'nodes=' + self.makeNodeString( jobattrs['nodes'] ) ) 437 elif jobattrs['status'] == 'Q': 438 appendList.append( 'nodes=' + str(jobattrs['nodes']) ) 439 440 return self.makeAppendLists( appendList ) 441 442 def makeAppendLists( self, append_list ): 443 """ 444 Divide all values from append_list over strings with a maximum 445 size of 1400 446 """ 447 448 app_lists = [ ] 402 gval_lists = [ ] 449 403 450 404 mystr = None 451 405 452 for val in append_list: 453 454 if not mystr: 455 mystr = val 456 else: 457 if not self.checkValAppendMaxSize( mystr, val ): 458 mystr = mystr + ' ' + val 406 val_list = { } 407 408 for val_name, val_value in jobattrs.items(): 409 410 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 411 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 412 413 if (val_name != 'nodes' and val_value != '') or (val_name == 'nodes' and jobattrs['status'] == 'Q'): 414 415 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 416 417 gval_lists.append( val_list ) 418 val_list = { } 419 420 val_list[ val_name ] = val_value 421 422 elif val_name == 'nodes' and jobattrs['status'] == 'R': 423 424 node_str = None 425 426 for node in val_value: 427 428 if node_str: 429 node_str = node_str + ';' + node 430 else: 431 node_str = node 432 433 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 434 435 val_list[ val_name ] = node_str 436 gval_lists.append( val_list ) 437 val_list = { } 438 node_str = None 439 440 val_list[ val_name ] = node_str 441 gval_lists.append( val_list ) 442 val_list = { } 443 444 str_list = [ ] 445 446 for val_list in gval_lists: 447 448 my_val_str = None 449 450 for val_name, val_value in val_list.items(): 451 452 if my_val_str: 453 454 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 459 455 else: 460 # Too big, new appenlist 461 app_lists.append( mystr ) 462 mystr = val 463 464 app_lists.append( mystr ) 465 466 return app_lists 467 468 def checkValAppendMaxSize( self, val, text ): 469 """Check if val + text size is not above 1400 (max msg size)""" 470 471 # Max frame size of a udp datagram is 1500 bytes 472 # removing misc header and gmetric stuff leaves about 1300 bytes 473 # 474 if len( val + text ) > 900: 475 return 1 476 else: 477 return 0 456 my_val_str = val_name + '=' + val_value 457 458 str_list.append( my_val_str ) 459 460 return str_list 478 461 479 462 def printJobs( self, jobs ):
Note: See TracChangeset
for help on using the changeset viewer.