Changeset 660


Ignore:
Timestamp:
09/03/12 15:16:36 (12 years ago)
Author:
ramonb
Message:
  • updated for latest pbs_python
  • changed metric compilation
  • fixed some more indentation
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobmond/jobmond.py

    r659 r660  
    2727from xml.sax.handler import feature_namespaces
    2828from collections import deque
     29from types import *
    2930
    3031VERSION='0.3.1'
     
    181182def loadConfig( filename ):
    182183
    183         def getlist( cfg_string ):
    184 
    185                 my_list = [ ]
    186 
    187                 for item_txt in cfg_string.split( ',' ):
    188 
    189                         sep_char = None
    190 
    191                         item_txt = item_txt.strip()
    192 
    193                         for s_char in [ "'", '"' ]:
    194 
    195                                 if item_txt.find( s_char ) != -1:
    196 
    197                                         if item_txt.count( s_char ) != 2:
    198 
    199                                                 print 'Missing quote: %s' %item_txt
    200                                                 sys.exit( 1 )
    201 
    202                                         else:
    203 
    204                                                 sep_char = s_char
    205                                                 break
    206 
    207                         if sep_char:
    208 
    209                                 item_txt = item_txt.split( sep_char )[1]
    210 
    211                         my_list.append( item_txt )
    212 
    213                 return my_list
     184    def getlist( cfg_string ):
     185
     186        my_list = [ ]
     187
     188        for item_txt in cfg_string.split( ',' ):
     189
     190                sep_char = None
     191
     192                item_txt = item_txt.strip()
     193
     194                for s_char in [ "'", '"' ]:
     195
     196                        if item_txt.find( s_char ) != -1:
     197
     198                                if item_txt.count( s_char ) != 2:
     199
     200                                        print 'Missing quote: %s' %item_txt
     201                                        sys.exit( 1 )
     202
     203                                else:
     204
     205                                        sep_char = s_char
     206                                        break
     207
     208                if sep_char:
     209
     210                        item_txt = item_txt.split( sep_char )[1]
     211
     212                my_list.append( item_txt )
     213
     214        return my_list
    214215
    215216    cfg     = ConfigParser.ConfigParser()
     
    304305        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
    305306
    306     gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
     307        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
    307308
    308309    if gmetric_dest_ip and gmetric_dest_port:
     
    534535            print '\t%s = %s' %( name, val )
    535536
    536     def getAttr( self, attrs, name ):
     537    def getAttr( self, d, name ):
    537538
    538539        """Return certain attribute from dictionary, if exists"""
    539540
    540         if attrs.has_key( name ):
    541 
    542             return attrs[ name ]
    543         else:
    544             return ''
     541        if d.has_key( name ):
     542
     543            if type( d[ name ] ) == ListType:
     544
     545                return string.join( d[ name ], ' ' )
     546
     547            return d[ name ]
     548       
     549        return ''
    545550
    546551    def jobDataChanged( self, jobs, job_id, attrs ):
     
    608613            for name, node in self.pq.getnodes().items():
    609614
    610                 if ( node[ 'state' ].find( "down" ) != -1 ):
     615                if 'down' in node[ 'state' ]:
    611616
    612617                    downed_nodes.append( name )
    613618
    614                 if ( node[ 'state' ].find( "offline" ) != -1 ):
     619                if 'offline' in node[ 'state' ]:
    615620
    616621                    offline_nodes.append( name )
     
    629634
    630635            # Make gmetric values for each job: respect max gmetric value length
    631                         #
    632             gmetric_val     = self.compileGmetricVal( jobid, jobattrs )
    633             metric_increment    = 0
    634 
    635             # If we have more job info than max gmetric value length allows, split it up
    636                         # amongst multiple metrics
    637636            #
    638             for val in gmetric_val:
    639 
    640                 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )
    641 
    642                 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
    643                                 #
    644                 metric_increment    = metric_increment + 1
     637            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
     638
     639            for g_name, g_val in gmetrics.items():
     640
     641                self.dp.multicastGmetric( g_name, g_val )
    645642
    646643    def compileGmetricVal( self, jobid, jobattrs ):
    647644
    648         """Create a val string for gmetric of jobinfo"""
    649 
    650         gval_lists  = [ ]
    651         val_list    = { }
     645        """Create gmetric name/value pairs of jobinfo"""
     646
     647        gmetrics = { }
    652648
    653649        for val_name, val_value in jobattrs.items():
    654650
    655             # These are our own metric names, i.e.: status, start_timestamp, etc
    656                         #
    657             val_list_names_len  = len( string.join( val_list.keys() ) ) + len(val_list.keys())
    658 
    659             # These are their corresponding values
    660                         #
    661             val_list_vals_len   = len( string.join( val_list.values() ) ) + len(val_list.values())
    662 
    663             if val_name == 'nodes' and jobattrs['status'] == 'R':
    664 
    665                 node_str = None
    666 
    667                 for node in val_value:
    668 
    669                     if node_str:
    670 
    671                         node_str = node_str + ';' + node
    672                     else:
    673                         node_str = node
    674 
    675                     # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    676                                         #
    677                     if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
    678 
    679                         # It's too big, we need to make a new gmetric for the additional info
    680                                                 #
    681                         val_list[ val_name ]    = node_str
    682 
    683                         gval_lists.append( val_list )
    684 
    685                         val_list        = { }
    686                         node_str        = None
    687 
    688                 val_list[ val_name ]    = node_str
    689 
    690                 gval_lists.append( val_list )
    691 
    692                 val_list        = { }
    693 
    694             elif val_value != '':
    695 
    696                 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
    697                                 #
    698                 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
    699 
    700                     # It's too big, we need to make a new gmetric for the additional info
    701                                         #
    702                     gval_lists.append( val_list )
    703 
    704                     val_list        = { }
    705 
    706                 val_list[ val_name ]    = val_value
    707 
    708         if len( val_list ) > 0:
    709 
    710             gval_lists.append( val_list )
    711 
    712         str_list    = [ ]
    713 
    714         # Now append the value names and values together, i.e.: stop_timestamp=value, etc
    715                 #
    716         for val_list in gval_lists:
    717 
    718             my_val_str  = None
    719 
    720             for val_name, val_value in val_list.items():
    721 
    722                 if type(val_value) == list:
    723 
    724                     val_value   = val_value.join( ',' )
    725 
    726                 if my_val_str:
    727 
    728                     try:
    729                         # fixme: It's getting
    730                         # ('nodes', None) items
    731                         my_val_str = my_val_str + ' ' + val_name + '=' + val_value
    732                     except:
    733                         pass
    734 
    735                 else:
    736                     my_val_str = val_name + '=' + val_value
    737 
    738             str_list.append( my_val_str )
    739 
    740         return str_list
    741 
    742         def daemon( self ):
    743 
    744                 """Run as daemon forever"""
    745 
    746                 # Fork the first child
    747                 #
    748                 pid = os.fork()
    749                 if pid > 0:
    750                         sys.exit(0)  # end parent
    751 
    752                 # creates a session and sets the process group ID
    753                 #
    754                 os.setsid()
    755 
    756                 # Fork the second child
    757                 #
    758                 pid = os.fork()
    759                 if pid > 0:
    760                         sys.exit(0)  # end parent
     651            gmetric_sequence = 0
     652
     653            if len( val_value ) > METRIC_MAX_VAL_LEN:
     654
     655                while len( val_value ) > METRIC_MAX_VAL_LEN:
     656
     657                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
     658                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
     659
     660                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
     661
     662                    gmetrics[ gmetric_name ] = gmetric_value
     663
     664                    gmetric_sequence = gmetric_sequence + 1
     665            else:
     666                gmetric_value   = val_value
     667
     668                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
     669
     670                gmetrics[ gmetric_name ] = gmetric_value
     671
     672        return gmetrics
     673
     674    def daemon( self ):
     675
     676        """Run as daemon forever"""
     677
     678        # Fork the first child
     679        #
     680        pid = os.fork()
     681        if pid > 0:
     682                sys.exit(0)  # end parent
     683
     684        # creates a session and sets the process group ID
     685        #
     686        os.setsid()
     687
     688        # Fork the second child
     689        #
     690        pid = os.fork()
     691        if pid > 0:
     692                sys.exit(0)  # end parent
    761693
    762694        write_pidfile()
    763695
    764                 # Go to the root directory and set the umask
    765                 #
    766                 os.chdir('/')
    767                 os.umask(0)
    768 
    769                 sys.stdin.close()
    770                 sys.stdout.close()
    771                 sys.stderr.close()
    772 
    773                 os.open('/dev/null', os.O_RDWR)
    774                 os.dup2(0, 1)
    775                 os.dup2(0, 2)
    776 
    777                 self.run()
    778 
    779         def run( self ):
    780 
    781                 """Main thread"""
    782 
    783                 while ( 1 ):
     696        # Go to the root directory and set the umask
     697        #
     698        os.chdir('/')
     699        os.umask(0)
     700
     701        sys.stdin.close()
     702        sys.stdout.close()
     703        sys.stderr.close()
     704
     705        os.open('/dev/null', os.O_RDWR)
     706        os.dup2(0, 1)
     707        os.dup2(0, 2)
     708
     709        self.run()
     710
     711    def run( self ):
     712
     713        """Main thread"""
     714
     715        while ( 1 ):
    784716       
    785717            self.getJobData()
     
    1022954            parse_err = 1
    1023955            if piping.wait():
    1024             debug_msg(10,
     956                debug_msg(10,
    1025957                  "qstat error, skipping until next polling interval: "
    1026958                  + piping.childerr.readline())
    1027             return None
    1028         elif parse_err:
    1029             debug_msg(10, "Bad XML output from qstat"())
    1030             exit (1)
     959                return None
     960            elif parse_err:
     961                debug_msg(10, "Bad XML output from qstat"())
     962                exit (1)
    1031963        for f in piping.fromchild, piping.tochild, piping.childerr:
    1032964            f.close()
     
    11041036        def _countDuplicatesInList( self, dupedList ):
    11051037
    1106         countDupes  = { }
    1107 
    1108         for item in dupedList:
    1109 
    1110             if not countDupes.has_key( item ):
    1111 
    1112                 countDupes[ item ]  = 1
    1113             else:
    1114                 countDupes[ item ]  = countDupes[ item ] + 1
    1115 
    1116         dupeCountList   = [ ]
    1117 
    1118         for item, count in countDupes.items():
    1119 
    1120             dupeCountList.append( ( item, count ) )
    1121 
    1122                 return dupeCountList
     1038            countDupes  = { }
     1039
     1040            for item in dupedList:
     1041
     1042                if not countDupes.has_key( item ):
     1043
     1044                    countDupes[ item ]  = 1
     1045                else:
     1046                    countDupes[ item ]  = countDupes[ item ] + 1
     1047
     1048            dupeCountList   = [ ]
     1049
     1050            for item, count in countDupes.items():
     1051
     1052                dupeCountList.append( ( item, count ) )
     1053
     1054            return dupeCountList
    11231055#
    11241056#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
     
    11801112                                requested_cpus = 1
    11811113
    1182             if QUEUE:
    1183                 for q in QUEUE:
    1184                     if q == queue:
    1185                         display_queue = 1
    1186                         break
    1187                     else:
    1188                         display_queue = 0
    1189                         continue
    1190             if display_queue == 0:
    1191                 continue
     1114                        if QUEUE:
     1115                            for q in QUEUE:
     1116                                if q == queue:
     1117                                    display_queue = 1
     1118                                    break
     1119                                else:
     1120                                    display_queue = 0
     1121                                    continue
     1122                        if display_queue == 0:
     1123                            continue
    11921124
    11931125                        runState = self.getAttr( attrs, 'status' )
     
    12361168                        myAttrs[ 'reported' ]       = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    12371169                        myAttrs[ 'nodes' ]      = do_nodelist( nodelist )
    1238             myAttrs[ 'domain' ]     = fqdn_parts( socket.getfqdn() )[1]
     1170                        myAttrs[ 'domain' ]     = fqdn_parts( socket.getfqdn() )[1]
    12391171                        myAttrs[ 'poll_interval' ]  = str(BATCH_POLL_INTERVAL)
    12401172
     
    12491181                                #
    12501182                                del jobs[ id ]
    1251                 self.jobs=jobs
    1252 
     1183
     1184                self.jobs = jobs
    12531185
    12541186class PbsDataGatherer( DataGatherer ):
     
    13161248
    13171249            owner           = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
    1318             requested_time      = self.getAttr( attrs, 'Resource_List.walltime' )
    1319             requested_memory    = self.getAttr( attrs, 'Resource_List.mem' )
    1320 
    1321             mynoderequest       = self.getAttr( attrs, 'Resource_List.nodes' )
     1250            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
     1251            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
     1252
     1253            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
    13221254
    13231255            ppn         = ''
     
    15661498    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
    15671499
    1568         if (not DAEMONIZE and DEBUG_LEVEL >= level):
     1500    if (not DAEMONIZE and DEBUG_LEVEL >= level):
    15691501        sys.stderr.write( msg + '\n' )
    15701502
Note: See TracChangeset for help on using the changeset viewer.