Changeset 660
- Timestamp:
- 09/03/12 15:16:36 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r659 r660 27 27 from xml.sax.handler import feature_namespaces 28 28 from collections import deque 29 from types import * 29 30 30 31 VERSION='0.3.1' … … 181 182 def loadConfig( filename ): 182 183 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 184 def getlist( cfg_string ): 185 186 my_list = [ ] 187 188 for item_txt in cfg_string.split( ',' ): 189 190 sep_char = None 191 192 item_txt = item_txt.strip() 193 194 for s_char in [ "'", '"' ]: 195 196 if item_txt.find( s_char ) != -1: 197 198 if item_txt.count( s_char ) != 2: 199 200 print 'Missing quote: %s' %item_txt 201 sys.exit( 1 ) 202 203 else: 204 205 sep_char = s_char 206 break 207 208 if sep_char: 209 210 item_txt = item_txt.split( sep_char )[1] 211 212 my_list.append( item_txt ) 213 214 return my_list 214 215 215 216 cfg = ConfigParser.ConfigParser() … … 304 305 gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'host' ) 305 306 306 gmetric_dest_port = ganglia_cfg.getStr( 'udp_send_channel', 'port' )307 gmetric_dest_port = ganglia_cfg.getStr( 'udp_send_channel', 'port' ) 307 308 308 309 if gmetric_dest_ip and gmetric_dest_port: … … 534 535 print '\t%s = %s' %( name, val ) 535 536 536 def getAttr( self, attrs, name ):537 def getAttr( self, d, name ): 537 538 538 539 """Return certain attribute from dictionary, if exists""" 539 540 540 if attrs.has_key( name ): 541 542 return attrs[ name ] 543 else: 544 return '' 541 if d.has_key( name ): 542 543 if type( d[ name ] ) == ListType: 544 545 return string.join( d[ name ], ' ' ) 546 547 return d[ name ] 548 549 return '' 545 550 546 551 def jobDataChanged( self, jobs, job_id, attrs ): … … 608 613 for name, node in self.pq.getnodes().items(): 609 614 610 if ( node[ 'state' ].find( "down" ) != -1 ):615 if 'down' in node[ 'state' ]: 611 616 612 617 downed_nodes.append( name ) 613 618 614 if ( node[ 'state' ].find( "offline" ) != -1 ):619 if 'offline' in node[ 'state' ]: 615 620 616 621 offline_nodes.append( name ) … … 629 634 630 635 # Make gmetric values for each job: respect max gmetric value length 631 #632 gmetric_val = self.compileGmetricVal( jobid, jobattrs )633 metric_increment = 0634 635 # If we have more job info than max gmetric value length allows, split it up636 # amongst multiple metrics637 636 # 638 for val in gmetric_val: 639 640 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 641 642 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric 643 # 644 metric_increment = metric_increment + 1 637 gmetrics = self.compileGmetricVal( jobid, jobattrs ) 638 639 for g_name, g_val in gmetrics.items(): 640 641 self.dp.multicastGmetric( g_name, g_val ) 645 642 646 643 def compileGmetricVal( self, jobid, jobattrs ): 647 644 648 """Create a val string for gmetric of jobinfo""" 649 650 gval_lists = [ ] 651 val_list = { } 645 """Create gmetric name/value pairs of jobinfo""" 646 647 gmetrics = { } 652 648 653 649 for val_name, val_value in jobattrs.items(): 654 650 655 # These are our own metric names, i.e.: status, start_timestamp, etc 656 # 657 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 658 659 # These are their corresponding values 660 # 661 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 662 663 if val_name == 'nodes' and jobattrs['status'] == 'R': 664 665 node_str = None 666 667 for node in val_value: 668 669 if node_str: 670 671 node_str = node_str + ';' + node 672 else: 673 node_str = node 674 675 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 676 # 677 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 678 679 # It's too big, we need to make a new gmetric for the additional info 680 # 681 val_list[ val_name ] = node_str 682 683 gval_lists.append( val_list ) 684 685 val_list = { } 686 node_str = None 687 688 val_list[ val_name ] = node_str 689 690 gval_lists.append( val_list ) 691 692 val_list = { } 693 694 elif val_value != '': 695 696 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 697 # 698 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 699 700 # It's too big, we need to make a new gmetric for the additional info 701 # 702 gval_lists.append( val_list ) 703 704 val_list = { } 705 706 val_list[ val_name ] = val_value 707 708 if len( val_list ) > 0: 709 710 gval_lists.append( val_list ) 711 712 str_list = [ ] 713 714 # Now append the value names and values together, i.e.: stop_timestamp=value, etc 715 # 716 for val_list in gval_lists: 717 718 my_val_str = None 719 720 for val_name, val_value in val_list.items(): 721 722 if type(val_value) == list: 723 724 val_value = val_value.join( ',' ) 725 726 if my_val_str: 727 728 try: 729 # fixme: It's getting 730 # ('nodes', None) items 731 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 732 except: 733 pass 734 735 else: 736 my_val_str = val_name + '=' + val_value 737 738 str_list.append( my_val_str ) 739 740 return str_list 741 742 def daemon( self ): 743 744 """Run as daemon forever""" 745 746 # Fork the first child 747 # 748 pid = os.fork() 749 if pid > 0: 750 sys.exit(0) # end parent 751 752 # creates a session and sets the process group ID 753 # 754 os.setsid() 755 756 # Fork the second child 757 # 758 pid = os.fork() 759 if pid > 0: 760 sys.exit(0) # end parent 651 gmetric_sequence = 0 652 653 if len( val_value ) > METRIC_MAX_VAL_LEN: 654 655 while len( val_value ) > METRIC_MAX_VAL_LEN: 656 657 gmetric_value = val_value[:METRIC_MAX_VAL_LEN] 658 val_value = val_value[METRIC_MAX_VAL_LEN:] 659 660 gmetric_name = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence ) 661 662 gmetrics[ gmetric_name ] = gmetric_value 663 664 gmetric_sequence = gmetric_sequence + 1 665 else: 666 gmetric_value = val_value 667 668 gmetric_name = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence ) 669 670 gmetrics[ gmetric_name ] = gmetric_value 671 672 return gmetrics 673 674 def daemon( self ): 675 676 """Run as daemon forever""" 677 678 # Fork the first child 679 # 680 pid = os.fork() 681 if pid > 0: 682 sys.exit(0) # end parent 683 684 # creates a session and sets the process group ID 685 # 686 os.setsid() 687 688 # Fork the second child 689 # 690 pid = os.fork() 691 if pid > 0: 692 sys.exit(0) # end parent 761 693 762 694 write_pidfile() 763 695 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 696 # Go to the root directory and set the umask 697 # 698 os.chdir('/') 699 os.umask(0) 700 701 sys.stdin.close() 702 sys.stdout.close() 703 sys.stderr.close() 704 705 os.open('/dev/null', os.O_RDWR) 706 os.dup2(0, 1) 707 os.dup2(0, 2) 708 709 self.run() 710 711 def run( self ): 712 713 """Main thread""" 714 715 while ( 1 ): 784 716 785 717 self.getJobData() … … 1022 954 parse_err = 1 1023 955 if piping.wait(): 1024 debug_msg(10,956 debug_msg(10, 1025 957 "qstat error, skipping until next polling interval: " 1026 958 + piping.childerr.readline()) 1027 return None1028 elif parse_err:1029 debug_msg(10, "Bad XML output from qstat"())1030 exit (1)959 return None 960 elif parse_err: 961 debug_msg(10, "Bad XML output from qstat"()) 962 exit (1) 1031 963 for f in piping.fromchild, piping.tochild, piping.childerr: 1032 964 f.close() … … 1104 1036 def _countDuplicatesInList( self, dupedList ): 1105 1037 1106 countDupes = { }1107 1108 for item in dupedList:1109 1110 if not countDupes.has_key( item ):1111 1112 countDupes[ item ] = 11113 else:1114 countDupes[ item ] = countDupes[ item ] + 11115 1116 dupeCountList = [ ]1117 1118 for item, count in countDupes.items():1119 1120 dupeCountList.append( ( item, count ) )1121 1122 1038 countDupes = { } 1039 1040 for item in dupedList: 1041 1042 if not countDupes.has_key( item ): 1043 1044 countDupes[ item ] = 1 1045 else: 1046 countDupes[ item ] = countDupes[ item ] + 1 1047 1048 dupeCountList = [ ] 1049 1050 for item, count in countDupes.items(): 1051 1052 dupeCountList.append( ( item, count ) ) 1053 1054 return dupeCountList 1123 1055 # 1124 1056 #lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7'] … … 1180 1112 requested_cpus = 1 1181 1113 1182 if QUEUE:1183 for q in QUEUE:1184 if q == queue:1185 display_queue = 11186 break1187 else:1188 display_queue = 01189 continue1190 if display_queue == 0:1191 continue1114 if QUEUE: 1115 for q in QUEUE: 1116 if q == queue: 1117 display_queue = 1 1118 break 1119 else: 1120 display_queue = 0 1121 continue 1122 if display_queue == 0: 1123 continue 1192 1124 1193 1125 runState = self.getAttr( attrs, 'status' ) … … 1236 1168 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1237 1169 myAttrs[ 'nodes' ] = do_nodelist( nodelist ) 1238 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1]1170 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1239 1171 myAttrs[ 'poll_interval' ] = str(BATCH_POLL_INTERVAL) 1240 1172 … … 1249 1181 # 1250 1182 del jobs[ id ] 1251 self.jobs=jobs 1252 1183 1184 self.jobs = jobs 1253 1185 1254 1186 class PbsDataGatherer( DataGatherer ): … … 1316 1248 1317 1249 owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0] 1318 requested_time = self.getAttr( attrs , 'Resource_List.walltime' )1319 requested_memory = self.getAttr( attrs , 'Resource_List.mem' )1320 1321 mynoderequest = self.getAttr( attrs , 'Resource_List.nodes' )1250 requested_time = self.getAttr( attrs['Resource_List'], 'walltime' ) 1251 requested_memory = self.getAttr( attrs['Resource_List'], 'mem' ) 1252 1253 mynoderequest = self.getAttr( attrs['Resource_List'], 'nodes' ) 1322 1254 1323 1255 ppn = '' … … 1566 1498 global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL 1567 1499 1568 1500 if (not DAEMONIZE and DEBUG_LEVEL >= level): 1569 1501 sys.stderr.write( msg + '\n' ) 1570 1502
Note: See TracChangeset
for help on using the changeset viewer.