Changeset 507 for trunk/jobmond/jobmond.py
- Timestamp:
- 03/07/08 16:46:21 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r500 r507 4 4 # 5 5 # Copyright (C) 2006-2007 Ramon Bastiaans 6 # Copyright (C) 2007 Dave Love (SGE code) 6 7 # 7 8 # Jobmonarch is free software; you can redistribute it and/or modify … … 24 25 import sys, getopt, ConfigParser, time, os, socket, string, re 25 26 import xdrlib, socket, syslog, xml, xml.sax 26 from xml.sax import saxutils, make_parser27 from xml.sax import make_parser28 27 from xml.sax.handler import feature_namespaces 29 28 … … 252 251 else: 253 252 254 debug_msg( 0, "ERROR: GMETRIC_TARGET not set: intern el Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )253 debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" ) 255 254 256 255 return True 256 257 def fqdn_parts (fqdn): 258 """Return pair of host and domain for fully-qualified domain name arg.""" 259 parts = fqdn.split (".") 260 return (parts[0], string.join(parts[1:], ".")) 257 261 258 262 METRIC_MAX_VAL_LEN = 900 … … 383 387 except NameError: 384 388 385 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (om mitting)' )389 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' ) 386 390 387 391 cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax ) … … 420 424 421 425 print '\t%s = %s' %( name, val ) 426 427 def getAttr( self, attrs, name ): 428 429 """Return certain attribute from dictionary, if exists""" 430 431 if attrs.has_key( name ): 432 433 return attrs[ name ] 434 else: 435 return '' 436 437 def jobDataChanged( self, jobs, job_id, attrs ): 438 439 """Check if job with attrs and job_id in jobs has changed""" 440 441 if jobs.has_key( job_id ): 442 443 oldData = jobs[ job_id ] 444 else: 445 return 1 446 447 for name, val in attrs.items(): 448 449 if oldData.has_key( name ): 450 451 if oldData[ name ] != attrs[ name ]: 452 453 return 1 454 455 else: 456 return 1 457 458 return 0 459 460 def submitJobData( self ): 461 462 """Submit job info list""" 463 464 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 465 466 running_jobs = 0 467 queued_jobs = 0 468 469 # Count how many running/queued jobs we found 470 # 471 for jobid, jobattrs in self.jobs.items(): 472 473 if jobattrs[ 'status' ] == 'Q': 474 475 queued_jobs += 1 476 477 elif jobattrs[ 'status' ] == 'R': 478 479 running_jobs += 1 480 481 # Report running/queued jobs as seperate metric for a nice RRD graph 482 # 483 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' ) 484 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' ) 485 486 # Now let's spread the knowledge 487 # 488 for jobid, jobattrs in self.jobs.items(): 489 490 # Make gmetric values for each job: respect max gmetric value length 491 # 492 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 493 metric_increment = 0 494 495 # If we have more job info than max gmetric value length allows, split it up 496 # amongst multiple metrics 497 # 498 for val in gmetric_val: 499 500 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 501 502 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric 503 # 504 metric_increment = metric_increment + 1 505 506 def compileGmetricVal( self, jobid, jobattrs ): 507 508 """Create a val string for gmetric of jobinfo""" 509 510 gval_lists = [ ] 511 val_list = { } 512 513 for val_name, val_value in jobattrs.items(): 514 515 # These are our own metric names, i.e.: status, start_timestamp, etc 516 # 517 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 518 519 # These are their corresponding values 520 # 521 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 522 523 if val_name == 'nodes' and jobattrs['status'] == 'R': 524 525 node_str = None 526 527 for node in val_value: 528 529 if node_str: 530 531 node_str = node_str + ';' + node 532 else: 533 node_str = node 534 535 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 536 # 537 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 538 539 # It's too big, we need to make a new gmetric for the additional info 540 # 541 val_list[ val_name ] = node_str 542 543 gval_lists.append( val_list ) 544 545 val_list = { } 546 node_str = None 547 548 val_list[ val_name ] = node_str 549 550 gval_lists.append( val_list ) 551 552 val_list = { } 553 554 elif val_value != '': 555 556 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 557 # 558 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 559 560 # It's too big, we need to make a new gmetric for the additional info 561 # 562 gval_lists.append( val_list ) 563 564 val_list = { } 565 566 val_list[ val_name ] = val_value 567 568 if len( val_list ) > 0: 569 570 gval_lists.append( val_list ) 571 572 str_list = [ ] 573 574 # Now append the value names and values together, i.e.: stop_timestamp=value, etc 575 # 576 for val_list in gval_lists: 577 578 my_val_str = None 579 580 for val_name, val_value in val_list.items(): 581 582 if my_val_str: 583 584 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 585 else: 586 my_val_str = val_name + '=' + val_value 587 588 str_list.append( my_val_str ) 589 590 return str_list 422 591 423 592 def daemon( self ): … … 468 637 time.sleep( BATCH_POLL_INTERVAL ) 469 638 639 # SGE code by Dave Love <fx@gnu.org>. Tested with SGE 6.0u8 and 6.0u11. 640 # Probably needs modification for SGE 6.1. See also the fixmes. 641 642 class NoJobs (Exception): 643 """Exception raised by empty job list in qstat output.""" 644 pass 645 470 646 class SgeQstatXMLParser(xml.sax.handler.ContentHandler): 471 472 """Babu Sundaram's experimental SGE qstat XML parser""" 473 474 def __init__(self, qstatinxml): 475 476 self.qstatfile = qstatinxml 477 self.attribs = {} 478 self.value = '' 479 self.jobID = '' 480 self.currentJobInfo = '' 481 self.job_list = [] 482 self.EOFFlag = 0 483 self.jobinfoCount = 0 484 647 """SAX handler for XML output from Sun Grid Engine's `qstat'.""" 648 649 def __init__(self): 650 self.value = "" 651 self.joblist = [] 652 self.job = {} 653 self.queue = "" 654 self.in_joblist = False 655 self.lrequest = False 656 xml.sax.handler.ContentHandler.__init__(self) 657 658 # The structure of the output is as follows. Unfortunately 659 # it's voluminous, and probably doesn't scale to large 660 # clusters/queues. 661 662 # <detailed_job_info xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 663 # <djob_info> 664 # <qmaster_response> <!-- job --> 665 # ... 666 # <JB_ja_template> 667 # <ulong_sublist> 668 # ... <!-- start_time, state ... --> 669 # </ulong_sublist> 670 # </JB_ja_template> 671 # <JB_ja_tasks> 672 # <ulong_sublist> 673 # ... <!-- task info 674 # </ulong_sublist> 675 # ... 676 # </JB_ja_tasks> 677 # ... 678 # </qmaster_response> 679 # </djob_info> 680 # <messages> 681 # ... 682 683 # NB. We might treat each task as a separate job, like 684 # straight qstat output, but the web interface expects jobs to 685 # be identified by integers, not, say, <job number>.<task>. 686 687 # So, I lied. If the job list is empty, we get invalid XML 688 # like this, which we need to defend against: 689 690 # <unknown_jobs xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 691 # <> 692 # <ST_name>*</ST_name> 693 # </> 694 # </unknown_jobs> 485 695 486 696 def startElement(self, name, attrs): 487 488 if name == 'job_list': 489 self.currentJobInfo = 'Status=' + attrs.get('state', None) + ' ' 490 elif name == 'job_info': 491 self.job_list = [] 492 self.jobinfoCount += 1 697 self.value = "" 698 if name == "djob_info": # job list 699 self.in_joblist = True 700 elif name == "qmaster_response" and self.in_joblist: # job 701 self.job = {"job_state": "U", "slots": 0, 702 "nodes": [], "queued_timestamp": "", 703 "queued_timestamp": "", "queue": "", 704 "ppn": "0", "RN_max": 0, 705 # fixme in endElement 706 "requested_memory": 0, "requested_time": 0 707 } 708 self.joblist.append(self.job) 709 elif name == "qstat_l_requests": # resource request 710 self.lrequest = True 711 elif name == "unknown_jobs": 712 raise NoJobs 493 713 494 714 def characters(self, ch): 495 496 self.value = self.value + ch 497 498 def endElement(self, name): 499 500 if len(self.value.strip()) > 0 : 501 502 self.currentJobInfo += name + '=' + self.value.strip() + ' ' 503 elif name != 'job_list': 504 505 self.currentJobInfo += name + '=Unknown ' 506 507 if name == 'JB_job_number': 508 509 self.jobID = self.value.strip() 510 self.job_list.append(self.jobID) 511 512 if name == 'job_list': 513 514 if self.attribs.has_key(self.jobID) == False: 515 self.attribs[self.jobID] = self.currentJobInfo 516 elif self.attribs.has_key(self.jobID) and self.attribs[self.jobID] != self.currentJobInfo: 517 self.attribs[self.jobID] = self.currentJobInfo 518 self.currentJobInfo = '' 519 self.jobID = '' 520 521 elif name == 'job_info' and self.jobinfoCount == 2: 522 523 deljobs = [] 524 for id in self.attribs: 525 try: 526 self.job_list.index(str(id)) 527 except ValueError: 528 deljobs.append(id) 529 for i in deljobs: 530 del self.attribs[i] 531 deljobs = [] 532 self.jobinfoCount = 0 533 534 self.value = '' 715 self.value += ch 716 717 def endElement(self, name): 718 """Snarf job elements contents into job dictionary. 719 Translate keys if appropriate.""" 720 721 name_trans = { 722 "JB_job_number": "number", 723 "JB_job_name": "name", "JB_owner": "owner", 724 "queue_name": "queue", "JAT_start_time": "start_timestamp", 725 "JB_submission_time": "queued_timestamp" 726 } 727 value = self.value 728 729 if name == "djob_info": 730 self.in_joblist = False 731 self.job = {} 732 elif name == "JAT_master_queue": 733 self.job["queue"] = value.split("@")[0] 734 elif name == "JG_qhostname": 735 if not (value in self.job["nodes"]): 736 self.job["nodes"].append(value) 737 elif name == "JG_slots": # slots in use 738 self.job["slots"] += int(value) 739 elif name == "RN_max": # requested slots (tasks or parallel) 740 self.job["RN_max"] = max (self.job["RN_max"], 741 int(value)) 742 elif name == "JAT_state": # job state (bitwise or) 743 value = int (value) 744 # Status values from sge_jobL.h 745 #define JIDLE 0x00000000 746 #define JHELD 0x00000010 747 #define JMIGRATING 0x00000020 748 #define JQUEUED 0x00000040 749 #define JRUNNING 0x00000080 750 #define JSUSPENDED 0x00000100 751 #define JTRANSFERING 0x00000200 752 #define JDELETED 0x00000400 753 #define JWAITING 0x00000800 754 #define JEXITING 0x00001000 755 #define JWRITTEN 0x00002000 756 #define JSUSPENDED_ON_THRESHOLD 0x00010000 757 #define JFINISHED 0x00010000 758 if value & 0x80: 759 self.job["status"] = "R" 760 elif value & 0x40: 761 self.job["status"] = "Q" 762 else: 763 self.job["status"] = "O" # `other' 764 elif name == "CE_name" and self.lrequest and self.value in \ 765 ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"): 766 # We're in a container for an interesting resource 767 # request; record which type. 768 self.lrequest = self.value 769 elif name == "CE_doubleval" and self.lrequest: 770 # if we're in a container for an interesting 771 # resource request, use the maxmimum of the hard 772 # and soft requests to record the requested CPU 773 # or core. Fixme: I'm not sure if this logic is 774 # right. 775 if self.lrequest in ("h_core", "s_core"): 776 self.job["requested_memory"] = \ 777 max (float (value), 778 self.job["requested_memory"]) 779 # Fixme: Check what cpu means, c.f [hs]_cpu. 780 elif self.lrequest in ("h_cpu", "s_cpu", "cpu"): 781 self.job["requested_time"] = \ 782 max (float (value), 783 self.job["requested_time"]) 784 elif name == "qstat_l_requests": 785 self.lrequest = False 786 elif self.job and self.in_joblist: 787 if name in name_trans: 788 name = name_trans[name] 789 self.job[name] = value 790 791 # Abstracted from PBS original. 792 # Fixme: Is it worth (or appropriate for PBS) sorting the result? 793 def do_nodelist (nodes): 794 """Translate node list as appropriate.""" 795 nodeslist = [ ] 796 my_domain = fqdn_parts(socket.getfqdn())[1] 797 for node in nodes: 798 host = node.split( '/' )[0] # not relevant for SGE 799 h, host_domain = fqdn_parts(host) 800 if host_domain == my_domain: 801 host = h 802 if nodeslist.count( host ) == 0: 803 for translate_pattern in BATCH_HOST_TRANSLATE: 804 if translate_pattern.find( '/' ) != -1: 805 translate_orig = \ 806 translate_pattern.split( '/' )[1] 807 translate_new = \ 808 translate_pattern.split( '/' )[2] 809 host = re.sub( translate_orig, 810 translate_new, host ) 811 if not host in nodeslist: 812 nodeslist.append( host ) 813 return nodeslist 535 814 536 815 class SgeDataGatherer(DataGatherer): 537 816 538 jobs = { } 539 SGE_QSTAT_XML_FILE = '/tmp/.jobmonarch.sge.qstat' 817 jobs = {} 540 818 541 819 def __init__( self ): 542 """Setup appropriate variables""" 543 544 self.jobs = { } 820 self.jobs = {} 545 821 self.timeoffset = 0 546 822 self.dp = DataProcessor() 547 self.initSgeJobInfo() 548 549 def initSgeJobInfo( self ): 550 """This is outside the scope of DRMAA; Get the current jobs in SGE""" 551 """This is a hack because we cant get info about jobs beyond""" 552 """those in the current DRMAA session""" 553 554 self.qstatparser = SgeQstatXMLParser( self.SGE_QSTAT_XML_FILE ) 555 556 # Obtain the qstat information from SGE in XML format 557 # This would change to DRMAA-specific calls from 6.0u9 558 559 def getJobData(self): 823 824 def getJobData( self ): 560 825 """Gather all data on current jobs in SGE""" 561 826 562 # Get the information about the current jobs in the SGE queue 563 info = os.popen("qstat -ext -xml").readlines() 564 f = open(self.SGE_QSTAT_XML_FILE,'w') 565 for lines in info: 566 f.write(lines) 567 f.close() 568 569 # Parse the input 570 f = open(self.qstatparser.qstatfile, 'r') 571 xml.sax.parse(f, self.qstatparser) 572 f.close() 573 827 import popen2 828 829 self.cur_time = 0 830 queues = "" 831 if QUEUE: # only for specific queues 832 # Fixme: assumes queue names don't contain single 833 # quote or comma. Don't know what the SGE rules are. 834 queues = " -q '" + string.join (QUEUE, ",") + "'" 835 # Note the comment in SgeQstatXMLParser about scaling with 836 # this method of getting data. I haven't found better one. 837 # Output with args `-xml -ext -f -r' is easier to parse 838 # in some ways, harder in others, but it doesn't provide 839 # the submission time, at least. 840 piping = popen2.Popen3("qstat -u '*' -j '*' -xml" + queues, 841 True) 842 qstatparser = SgeQstatXMLParser() 843 parse_err = 0 844 try: 845 xml.sax.parse(piping.fromchild, qstatparser) 846 except NoJobs: 847 pass 848 except: 849 parse_err = 1 850 if piping.wait(): 851 debug_msg(10, 852 "qstat error, skipping until next polling interval: " 853 + piping.childerr.readline()) 854 return None 855 elif parse_err: 856 debug_msg(10, "Bad XML output from qstat"()) 857 exit (1) 858 for f in piping.fromchild, piping.tochild, piping.childerr: 859 f.close() 574 860 self.cur_time = time.time() 575 576 return self.qstatparser.attribs 577 578 def submitJobData(self): 579 """Submit job info list""" 580 581 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 582 # Now let's spread the knowledge 583 # 584 metric_increment = 0 585 for jobid, jobattrs in self.qstatparser.attribs.items(): 586 587 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), jobattrs) 861 jobs_processed = [] 862 for job in qstatparser.joblist: 863 job_id = job["number"] 864 if job["status"] in [ 'Q', 'R' ]: 865 jobs_processed.append(job_id) 866 if job["status"] == "R": 867 job["nodes"] = do_nodelist (job["nodes"]) 868 # Fixme: Is this right? 869 job["ppn"] = float(job["slots"]) / \ 870 len(job["nodes"]) 871 if DETECT_TIME_DIFFS: 872 # If a job start is later than our 873 # current date, that must mean 874 # the SGE server's time is later 875 # than our local time. 876 start_timestamp = \ 877 int (job["start_timestamp"]) 878 if start_timestamp > \ 879 int(self.cur_time) + \ 880 int(self.timeoffset): 881 882 self.timeoffset = \ 883 start_timestamp - \ 884 int(self.cur_time) 885 else: 886 # fixme: Note sure what this should be: 887 job["ppn"] = job["RN_max"] 888 job["nodes"] = "1" 889 890 myAttrs = {} 891 for attr in ["name", "queue", "owner", 892 "requested_time", "status", 893 "requested_memory", "ppn", 894 "start_timestamp", "queued_timestamp"]: 895 myAttrs[attr] = str(job[attr]) 896 myAttrs["nodes"] = job["nodes"] 897 myAttrs["reported"] = str(int(self.cur_time) + \ 898 int(self.timeoffset)) 899 myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1] 900 myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL) 901 902 if self.jobDataChanged(self.jobs, job_id, myAttrs) \ 903 and myAttrs["status"] in ["R", "Q"]: 904 self.jobs[job_id] = myAttrs 905 for id, attrs in self.jobs.items(): 906 if id not in jobs_processed: 907 del self.jobs[id] 588 908 589 909 class PbsDataGatherer( DataGatherer ): … … 613 933 self.pq = PBSQuery() 614 934 615 def getAttr( self, attrs, name ):616 617 """Return certain attribute from dictionary, if exists"""618 619 if attrs.has_key( name ):620 621 return attrs[ name ]622 else:623 return ''624 625 def jobDataChanged( self, jobs, job_id, attrs ):626 627 """Check if job with attrs and job_id in jobs has changed"""628 629 if jobs.has_key( job_id ):630 631 oldData = jobs[ job_id ]632 else:633 return 1634 635 for name, val in attrs.items():636 637 if oldData.has_key( name ):638 639 if oldData[ name ] != attrs[ name ]:640 641 return 1642 643 else:644 return 1645 646 return 0647 648 935 def getJobData( self ): 649 936 … … 664 951 jobs_processed = [ ] 665 952 666 my_domain = string.join( socket.getfqdn().split( '.' )[1:], '.' )667 668 953 for name, attrs in joblist.items(): 669 954 … … 710 995 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' ) 711 996 712 nodeslist = [ ] 713 714 for node in nodes: 715 716 host = node.split( '/' )[0] 717 718 host_domain = string.join( host.split( '.' )[1:], '.' ) 719 720 if host_domain == my_domain: 721 722 host = host.split( '.' )[0] 723 724 if nodeslist.count( host ) == 0: 725 726 for translate_pattern in BATCH_HOST_TRANSLATE: 727 728 if translate_pattern.find( '/' ) != -1: 729 730 translate_orig = translate_pattern.split( '/' )[1] 731 translate_new = translate_pattern.split( '/' )[2] 732 733 host = re.sub( translate_orig, translate_new, host ) 734 735 if not host in nodeslist: 736 737 nodeslist.append( host ) 997 nodeslist = do_nodelist( nodes ) 738 998 739 999 if DETECT_TIME_DIFFS: … … 826 1086 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 827 1087 myAttrs[ 'nodes' ] = nodeslist 828 myAttrs[ 'domain' ] = string.join( socket.getfqdn().split( '.' )[1:], '.' )1088 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 829 1089 myAttrs[ 'poll_interval' ] = str( BATCH_POLL_INTERVAL ) 830 1090 … … 840 1100 # 841 1101 del self.jobs[ id ] 842 843 def submitJobData( self ):844 845 """Submit job info list"""846 847 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )848 849 running_jobs = 0850 queued_jobs = 0851 852 # Count how many running/queued jobs we found853 #854 for jobid, jobattrs in self.jobs.items():855 856 if jobattrs[ 'status' ] == 'Q':857 858 queued_jobs += 1859 860 elif jobattrs[ 'status' ] == 'R':861 862 running_jobs += 1863 864 # Report running/queued jobs as seperate metric for a nice RRD graph865 #866 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )867 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )868 869 # Now let's spread the knowledge870 #871 for jobid, jobattrs in self.jobs.items():872 873 # Make gmetric values for each job: respect max gmetric value length874 #875 gmetric_val = self.compileGmetricVal( jobid, jobattrs )876 metric_increment = 0877 878 # If we have more job info than max gmetric value length allows, split it up879 # amongst multiple metrics880 #881 for val in gmetric_val:882 883 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )884 885 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric886 #887 metric_increment = metric_increment + 1888 889 def compileGmetricVal( self, jobid, jobattrs ):890 891 """Create a val string for gmetric of jobinfo"""892 893 gval_lists = [ ]894 mystr = None895 val_list = { }896 897 for val_name, val_value in jobattrs.items():898 899 # These are our own metric names, i.e.: status, start_timestamp, etc900 #901 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys())902 903 # These are their corresponding values904 #905 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values())906 907 if val_name == 'nodes' and jobattrs['status'] == 'R':908 909 node_str = None910 911 for node in val_value:912 913 if node_str:914 915 node_str = node_str + ';' + node916 else:917 node_str = node918 919 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN920 #921 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:922 923 # It's too big, we need to make a new gmetric for the additional info924 #925 val_list[ val_name ] = node_str926 927 gval_lists.append( val_list )928 929 val_list = { }930 node_str = None931 932 val_list[ val_name ] = node_str933 934 gval_lists.append( val_list )935 936 val_list = { }937 938 elif val_value != '':939 940 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN941 #942 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:943 944 # It's too big, we need to make a new gmetric for the additional info945 #946 gval_lists.append( val_list )947 948 val_list = { }949 950 val_list[ val_name ] = val_value951 952 if len( val_list ) > 0:953 954 gval_lists.append( val_list )955 956 str_list = [ ]957 958 # Now append the value names and values together, i.e.: stop_timestamp=value, etc959 #960 for val_list in gval_lists:961 962 my_val_str = None963 964 for val_name, val_value in val_list.items():965 966 if my_val_str:967 968 my_val_str = my_val_str + ' ' + val_name + '=' + val_value969 else:970 my_val_str = val_name + '=' + val_value971 972 str_list.append( my_val_str )973 974 return str_list975 1102 976 1103 # … … 1135 1262 elif BATCH_API == 'sge': 1136 1263 1137 debug_msg( 0, "FATAL ERROR: BATCH_API 'sge' implementation is currently broken, check future releases" ) 1138 1139 sys.exit( 1 ) 1264 # Tested with SGE 6.0u11. 1265 # debug_msg( 0, "FATAL ERROR: BATCH_API 'sge' implementation is currently broken, check future releases" ) 1266 1267 # sys.exit( 1 ) 1140 1268 1141 1269 gather = SgeDataGatherer()
Note: See TracChangeset
for help on using the changeset viewer.