Changeset 354 for trunk/jobmond/jobmond.py
- Timestamp:
- 05/03/07 14:47:18 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r353 r354 23 23 24 24 import sys, getopt, ConfigParser 25 25 import time, os, socket, string, re 26 26 import xml, xml.sax 27 27 from xml.sax import saxutils, make_parser … … 39 39 print 40 40 41 42 41 def processArgs( args ): 43 42 44 SHORT_L = 'c:'45 LONG_L = 'config='43 SHORT_L = 'hc:' 44 LONG_L = [ 'help', 'config=' ] 46 45 47 46 global PIDFILE 48 PIDFILE = None 49 config_filename = '/etc/jobmond.conf' 47 PIDFILE = None 48 49 config_filename = '/etc/jobmond.conf' 50 50 51 51 try: 52 52 53 opts, args 53 opts, args = getopt.getopt( args, SHORT_L, LONG_L ) 54 54 55 55 except getopt.GetoptError, detail: … … 57 57 print detail 58 58 usage() 59 sys.exit( 1)59 sys.exit( 1 ) 60 60 61 61 for opt, value in opts: … … 63 63 if opt in [ '--config', '-c' ]: 64 64 65 config_filename 65 config_filename = value 66 66 67 67 if opt in [ '--pidfile', '-p' ]: 68 68 69 PIDFILE 69 PIDFILE = value 70 70 71 71 if opt in [ '--help', '-h' ]: 72 72 73 73 usage() 74 sys.exit( 1)74 sys.exit( 0 ) 75 75 76 76 return loadConfig( config_filename ) … … 110 110 return my_list 111 111 112 cfg 112 cfg = ConfigParser.ConfigParser() 113 113 114 114 cfg.read( filename ) 115 115 116 global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL, GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE, BATCH_API, QUEUE, GMETRIC_TARGET 117 118 DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) 119 120 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) 116 global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL 117 global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE 118 global BATCH_API, QUEUE, GMETRIC_TARGET 119 120 DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) 121 122 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) 121 123 122 124 try: 123 125 124 BATCH_SERVER 126 BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' ) 125 127 126 128 except ConfigParser.NoOptionError: … … 129 131 # 130 132 131 BATCH_SERVER 132 api_guess 133 BATCH_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' ) 134 api_guess = 'pbs' 133 135 134 136 try: 135 137 136 BATCH_POLL_INTERVAL 138 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' ) 137 139 138 140 except ConfigParser.NoOptionError: … … 141 143 # 142 144 143 BATCH_POLL_INTERVAL 144 api_guess 145 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' ) 146 api_guess = 'pbs' 145 147 146 148 try: 147 149 148 GMOND_CONF 150 GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' ) 149 151 150 152 except ConfigParser.NoOptionError: 151 153 152 GMOND_CONF 153 154 DETECT_TIME_DIFFS 155 156 BATCH_HOST_TRANSLATE 154 GMOND_CONF = None 155 156 DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' ) 157 158 BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) ) 157 159 158 160 try: 159 161 160 BATCH_API 162 BATCH_API = cfg.get( 'DEFAULT', 'BATCH_API' ) 161 163 162 164 except ConfigParser.NoOptionError, detail: 163 165 164 166 if BATCH_SERVER and api_guess: 165 BATCH_API = api_guess 167 168 BATCH_API = api_guess 166 169 else: 167 170 debug_msg( 0, "fatal error: BATCH_API not set and can't make guess" ) … … 170 173 try: 171 174 172 QUEUE 175 QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) ) 173 176 174 177 except ConfigParser.NoOptionError, detail: 175 178 176 QUEUE 179 QUEUE = None 177 180 178 181 try: 179 182 180 GMETRIC_TARGET 183 GMETRIC_TARGET = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' ) 181 184 182 185 except ConfigParser.NoOptionError: 183 186 184 GMETRIC_TARGET 187 GMETRIC_TARGET = None 185 188 186 189 if not GMOND_CONF: … … 193 196 194 197 return True 195 196 197 import time, os, socket, string, re198 198 199 199 METRIC_MAX_VAL_LEN = 900 … … 220 220 221 221 if GMOND_CONF: 222 222 223 try: 223 224 gmond_file = GMOND_CONF … … 513 514 514 515 def __init__( self ): 516 515 517 """Setup appropriate variables""" 516 518 517 self.jobs = { } 518 self.timeoffset = 0 519 self.dp = DataProcessor() 519 self.jobs = { } 520 self.timeoffset = 0 521 self.dp = DataProcessor() 522 520 523 self.initPbsQuery() 521 524 522 525 def initPbsQuery( self ): 523 526 524 self.pq = None 527 self.pq = None 528 525 529 if( BATCH_SERVER ): 526 self.pq = PBSQuery( BATCH_SERVER ) 530 531 self.pq = PBSQuery( BATCH_SERVER ) 527 532 else: 528 self.pq 533 self.pq = PBSQuery() 529 534 530 535 def getAttr( self, attrs, name ): 536 531 537 """Return certain attribute from dictionary, if exists""" 532 538 533 539 if attrs.has_key( name ): 534 return attrs[name] 540 541 return attrs[ name ] 535 542 else: 536 543 return '' 537 544 538 545 def jobDataChanged( self, jobs, job_id, attrs ): 546 539 547 """Check if job with attrs and job_id in jobs has changed""" 540 548 541 549 if jobs.has_key( job_id ): 550 542 551 oldData = jobs[ job_id ] 543 552 else: … … 558 567 559 568 def getJobData( self ): 569 560 570 """Gather all data on current jobs in Torque""" 561 571 562 #self.initPbsQuery() 563 564 #print self.pq.getnodes() 565 566 joblist = {} 567 568 while len(joblist) == 0: 572 joblist = {} 573 574 while len( joblist ) == 0: 575 569 576 try: 570 577 joblist = self.pq.getjobs() 578 571 579 except PBSError, detail: 572 debug_msg( 10, "Caught PBS unavaible, skipping until next polling interval: " + str( detail ) ) 580 581 debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) ) 573 582 return None 574 583 575 self.cur_time = time.time() 576 577 jobs_processed = [ ] 578 579 #self.printJobs( joblist ) 584 self.cur_time = time.time() 585 586 jobs_processed = [ ] 580 587 581 588 for name, attrs in joblist.items(): 582 589 583 job_id 590 job_id = name.split( '.' )[0] 584 591 585 592 jobs_processed.append( job_id ) 586 593 587 name 588 queue 594 name = self.getAttr( attrs, 'Job_Name' ) 595 queue = self.getAttr( attrs, 'queue' ) 589 596 590 597 if QUEUE: … … 594 601 continue 595 602 596 owner 597 requested_time 598 requested_memory 599 600 mynoderequest 601 602 ppn 603 owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0] 604 requested_time = self.getAttr( attrs, 'Resource_List.walltime' ) 605 requested_memory = self.getAttr( attrs, 'Resource_List.mem' ) 606 607 mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' ) 608 609 ppn = '' 603 610 604 611 if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1: 605 612 606 mynoderequest_fields 613 mynoderequest_fields = mynoderequest.split( ':' ) 607 614 608 615 for mynoderequest_field in mynoderequest_fields: … … 610 617 if mynoderequest_field.find( 'ppn' ) != -1: 611 618 612 ppn 613 614 status 615 616 queued_timestamp 619 ppn = mynoderequest_field.split( 'ppn=' )[1] 620 621 status = self.getAttr( attrs, 'job_state' ) 622 623 queued_timestamp = self.getAttr( attrs, 'ctime' ) 617 624 618 625 if status == 'R': 619 start_timestamp = self.getAttr( attrs, 'mtime' ) 620 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' ) 621 622 nodeslist = [ ] 626 627 start_timestamp = self.getAttr( attrs, 'mtime' ) 628 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' ) 629 630 nodeslist = [ ] 623 631 624 632 for node in nodes: 625 host = node.split( '/' )[0] 633 634 host = node.split( '/' )[0] 626 635 627 636 if nodeslist.count( host ) == 0: … … 631 640 if translate_pattern.find( '/' ) != -1: 632 641 633 translate_orig 634 translate_new 635 636 host 642 translate_orig = translate_pattern.split( '/' )[1] 643 translate_new = translate_pattern.split( '/' )[2] 644 645 host = re.sub( translate_orig, translate_new, host ) 637 646 638 647 if not host in nodeslist: … … 646 655 # than our local time. 647 656 648 if int( start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):649 650 self.timeoffset 657 if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ): 658 659 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 651 660 652 661 elif status == 'Q': 653 start_timestamp = '' 654 count_mynodes = 0 655 numeric_node = 1 662 663 start_timestamp = '' 664 count_mynodes = 0 665 numeric_node = 1 656 666 657 667 for node in mynoderequest.split( '+' ): 658 668 659 nodepart 669 nodepart = node.split( ':' )[0] 660 670 661 671 for letter in nodepart: … … 663 673 if letter not in string.digits: 664 674 665 numeric_node 675 numeric_node = 0 666 676 667 677 if not numeric_node: 668 count_mynodes = count_mynodes + 1 678 679 count_mynodes = count_mynodes + 1 669 680 else: 670 681 try: 671 count_mynodes = count_mynodes + int( nodepart ) 682 count_mynodes = count_mynodes + int( nodepart ) 683 672 684 except ValueError, detail: 685 673 686 debug_msg( 10, str( detail ) ) 674 687 debug_msg( 10, "Encountered weird node in Resources_List?!" ) … … 677 690 debug_msg( 10, 'attrs = ' + str( attrs ) ) 678 691 679 nodeslist 692 nodeslist = str( count_mynodes ) 680 693 else: 681 start_timestamp = '' 682 nodeslist = '' 683 684 myAttrs = { } 685 myAttrs['name'] = str( name ) 686 myAttrs['queue'] = str( queue ) 687 myAttrs['owner'] = str( owner ) 688 myAttrs['requested_time'] = str( requested_time ) 689 myAttrs['requested_memory'] = str( requested_memory ) 690 myAttrs['ppn'] = str( ppn ) 691 myAttrs['status'] = str( status ) 692 myAttrs['start_timestamp'] = str( start_timestamp ) 693 myAttrs['queued_timestamp'] = str( queued_timestamp ) 694 myAttrs['reported'] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 695 myAttrs['nodes'] = nodeslist 696 myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' ) 697 myAttrs['poll_interval'] = str( BATCH_POLL_INTERVAL ) 694 start_timestamp = '' 695 nodeslist = '' 696 697 myAttrs = { } 698 699 myAttrs[ 'name' ] = str( name ) 700 myAttrs[ 'queue' ] = str( queue ) 701 myAttrs[ 'owner' ] = str( owner ) 702 myAttrs[ 'requested_time' ] = str( requested_time ) 703 myAttrs[ 'requested_memory' ] = str( requested_memory ) 704 myAttrs[ 'ppn' ] = str( ppn ) 705 myAttrs[ 'status' ] = str( status ) 706 myAttrs[ 'start_timestamp' ] = str( start_timestamp ) 707 myAttrs[ 'queued_timestamp' ] = str( queued_timestamp ) 708 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 709 myAttrs[ 'nodes' ] = nodeslist 710 myAttrs[ 'domain' ] = string.join( socket.getfqdn().split( '.' )[1:], '.' ) 711 myAttrs[ 'poll_interval' ] = str( BATCH_POLL_INTERVAL ) 698 712 699 713 if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 700 self.jobs[ job_id ] = myAttrs 701 702 #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) ) 714 715 self.jobs[ job_id ] = myAttrs 703 716 704 717 for id, attrs in self.jobs.items(): … … 711 724 712 725 def submitJobData( self ): 726 713 727 """Submit job info list""" 714 728 … … 719 733 for jobid, jobattrs in self.jobs.items(): 720 734 721 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 722 723 metric_increment = 0 735 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 736 metric_increment = 0 724 737 725 738 for val in gmetric_val: 739 726 740 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 727 metric_increment = metric_increment + 1 741 742 metric_increment = metric_increment + 1 728 743 729 744 def compileGmetricVal( self, jobid, jobattrs ): 745 730 746 """Create a val string for gmetric of jobinfo""" 731 747 732 gval_lists = [ ] 733 734 mystr = None 735 736 val_list = { } 748 gval_lists = [ ] 749 mystr = None 750 val_list = { } 737 751 738 752 for val_name, val_value in jobattrs.items(): … … 748 762 749 763 if node_str: 764 750 765 node_str = node_str + ';' + node 751 766 else: … … 754 769 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 755 770 756 val_list[ val_name ] = node_str 771 val_list[ val_name ] = node_str 772 757 773 gval_lists.append( val_list ) 758 val_list = { } 759 node_str = None 760 761 val_list[ val_name ] = node_str 774 775 val_list = { } 776 node_str = None 777 778 val_list[ val_name ] = node_str 779 762 780 gval_lists.append( val_list ) 763 val_list = { } 781 782 val_list = { } 764 783 765 784 elif val_value != '': … … 768 787 769 788 gval_lists.append( val_list ) 770 val_list = { } 771 772 val_list[ val_name ] = val_value 773 774 if len(val_list) > 0: 789 790 val_list = { } 791 792 val_list[ val_name ] = val_value 793 794 if len( val_list ) > 0: 795 775 796 gval_lists.append( val_list ) 776 797 777 str_list 798 str_list = [ ] 778 799 779 800 for val_list in gval_lists: 780 801 781 my_val_str 802 my_val_str = None 782 803 783 804 for val_name, val_value in val_list.items(): … … 794 815 795 816 def printTime( ): 817 796 818 """Print current time/date in human readable format for log/debug""" 797 819 … … 799 821 800 822 def debug_msg( level, msg ): 823 801 824 """Print msg if at or above current debug level""" 802 825 … … 808 831 # Write pidfile if PIDFILE exists 809 832 if PIDFILE: 810 pid = os.getpid() 811 812 pidfile = open(PIDFILE, 'w') 813 pidfile.write(str(pid)) 833 834 pid = os.getpid() 835 836 pidfile = open(PIDFILE, 'w') 837 838 pidfile.write( str( pid ) ) 814 839 pidfile.close() 815 840 816 841 def main(): 842 817 843 """Application start""" 818 844 … … 820 846 821 847 if not processArgs( sys.argv[1:] ): 848 822 849 sys.exit( 1 ) 823 850 … … 840 867 else: 841 868 debug_msg( 0, "fatal error: unknown BATCH_API '" + BATCH_API + "' is not supported" ) 869 842 870 sys.exit( 1 ) 843 871 844 872 if DAEMONIZE: 873 845 874 gather.daemon() 846 875 else:
Note: See TracChangeset
for help on using the changeset viewer.