Changeset 354 for trunk/jobmond


Ignore:
Timestamp:
05/03/07 14:47:18 (15 years ago)
Author:
bastiaans
Message:

jobmond/jobmond.py:

  • code cleanup and rearrangement
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobmond/jobmond.py

    r353 r354  
    2323
    2424import sys, getopt, ConfigParser
    25 
     25import time, os, socket, string, re
    2626import xml, xml.sax
    2727from xml.sax import saxutils, make_parser
     
    3939        print
    4040
    41 
    4241def processArgs( args ):
    4342
    44         SHORT_L = 'c:'
    45         LONG_L = 'config='
     43        SHORT_L         = 'hc:'
     44        LONG_L          = [ 'help', 'config=' ]
    4645
    4746        global PIDFILE
    48         PIDFILE = None
    49         config_filename = '/etc/jobmond.conf'
     47        PIDFILE         = None
     48
     49        config_filename = '/etc/jobmond.conf'
    5050
    5151        try:
    5252
    53                 opts, args = getopt.getopt( args, SHORT_L, LONG_L )
     53                opts, args      = getopt.getopt( args, SHORT_L, LONG_L )
    5454
    5555        except getopt.GetoptError, detail:
     
    5757                print detail
    5858                usage()
    59                 sys.exit(1)
     59                sys.exit( 1 )
    6060
    6161        for opt, value in opts:
     
    6363                if opt in [ '--config', '-c' ]:
    6464               
    65                         config_filename = value
     65                        config_filename = value
    6666
    6767                if opt in [ '--pidfile', '-p' ]:
    6868
    69                         PIDFILE = value
     69                        PIDFILE         = value
    7070               
    7171                if opt in [ '--help', '-h' ]:
    7272 
    7373                        usage()
    74                         sys.exit(1)
     74                        sys.exit( 0 )
    7575
    7676        return loadConfig( config_filename )
     
    110110                return my_list
    111111
    112         cfg = ConfigParser.ConfigParser()
     112        cfg             = ConfigParser.ConfigParser()
    113113
    114114        cfg.read( filename )
    115115
    116         global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL, GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE, BATCH_API, QUEUE, GMETRIC_TARGET
    117 
    118         DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
    119 
    120         DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
     116        global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
     117        global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
     118        global BATCH_API, QUEUE, GMETRIC_TARGET
     119
     120        DEBUG_LEVEL     = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
     121
     122        DAEMONIZE       = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
    121123
    122124        try:
    123125
    124                 BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
     126                BATCH_SERVER            = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
    125127
    126128        except ConfigParser.NoOptionError:
     
    129131                #
    130132
    131                 BATCH_SERVER = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
    132                 api_guess = 'pbs'
     133                BATCH_SERVER            = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
     134                api_guess               = 'pbs'
    133135       
    134136        try:
    135137       
    136                 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
     138                BATCH_POLL_INTERVAL     = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
    137139
    138140        except ConfigParser.NoOptionError:
     
    141143                #
    142144
    143                 BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
    144                 api_guess = 'pbs'
     145                BATCH_POLL_INTERVAL     = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
     146                api_guess               = 'pbs'
    145147       
    146148        try:
    147149
    148                 GMOND_CONF = cfg.get( 'DEFAULT', 'GMOND_CONF' )
     150                GMOND_CONF              = cfg.get( 'DEFAULT', 'GMOND_CONF' )
    149151
    150152        except ConfigParser.NoOptionError:
    151153
    152                 GMOND_CONF = None
    153 
    154         DETECT_TIME_DIFFS = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
    155 
    156         BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
     154                GMOND_CONF              = None
     155
     156        DETECT_TIME_DIFFS       = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
     157
     158        BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
    157159
    158160        try:
    159161
    160                 BATCH_API = cfg.get( 'DEFAULT', 'BATCH_API' )
     162                BATCH_API       = cfg.get( 'DEFAULT', 'BATCH_API' )
    161163
    162164        except ConfigParser.NoOptionError, detail:
    163165
    164166                if BATCH_SERVER and api_guess:
    165                         BATCH_API = api_guess
     167
     168                        BATCH_API       = api_guess
    166169                else:
    167170                        debug_msg( 0, "fatal error: BATCH_API not set and can't make guess" )
     
    170173        try:
    171174
    172                 QUEUE = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
     175                QUEUE           = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
    173176
    174177        except ConfigParser.NoOptionError, detail:
    175178
    176                 QUEUE = None
     179                QUEUE           = None
    177180
    178181        try:
    179182
    180                 GMETRIC_TARGET = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
     183                GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
    181184
    182185        except ConfigParser.NoOptionError:
    183186
    184                 GMETRIC_TARGET = None
     187                GMETRIC_TARGET  = None
    185188
    186189                if not GMOND_CONF:
     
    193196
    194197        return True
    195 
    196 
    197 import time, os, socket, string, re
    198198
    199199METRIC_MAX_VAL_LEN = 900
     
    220220
    221221                if GMOND_CONF:
     222
    222223                        try:
    223224                                gmond_file = GMOND_CONF
     
    513514
    514515        def __init__( self ):
     516
    515517                """Setup appropriate variables"""
    516518
    517                 self.jobs = { }
    518                 self.timeoffset = 0
    519                 self.dp = DataProcessor()
     519                self.jobs       = { }
     520                self.timeoffset = 0
     521                self.dp         = DataProcessor()
     522
    520523                self.initPbsQuery()
    521524
    522525        def initPbsQuery( self ):
    523526
    524                 self.pq = None
     527                self.pq         = None
     528
    525529                if( BATCH_SERVER ):
    526                         self.pq = PBSQuery( BATCH_SERVER )
     530
     531                        self.pq         = PBSQuery( BATCH_SERVER )
    527532                else:
    528                         self.pq = PBSQuery()
     533                        self.pq         = PBSQuery()
    529534
    530535        def getAttr( self, attrs, name ):
     536
    531537                """Return certain attribute from dictionary, if exists"""
    532538
    533539                if attrs.has_key( name ):
    534                         return attrs[name]
     540
     541                        return attrs[ name ]
    535542                else:
    536543                        return ''
    537544
    538545        def jobDataChanged( self, jobs, job_id, attrs ):
     546
    539547                """Check if job with attrs and job_id in jobs has changed"""
    540548
    541549                if jobs.has_key( job_id ):
     550
    542551                        oldData = jobs[ job_id ]       
    543552                else:
     
    558567
    559568        def getJobData( self ):
     569
    560570                """Gather all data on current jobs in Torque"""
    561571
    562                 #self.initPbsQuery()
    563        
    564                 #print self.pq.getnodes()
    565        
    566                 joblist = {}
    567 
    568                 while len(joblist) == 0:
     572                joblist         = {}
     573
     574                while len( joblist ) == 0:
     575
    569576                        try:
    570577                                joblist = self.pq.getjobs()
     578
    571579                        except PBSError, detail:
    572                                 debug_msg( 10, "Caught PBS unavaible, skipping until next polling interval: " + str( detail ) )
     580
     581                                debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
    573582                                return None
    574583
    575                 self.cur_time = time.time()
    576 
    577                 jobs_processed = [ ]
    578 
    579                 #self.printJobs( joblist )
     584                self.cur_time   = time.time()
     585
     586                jobs_processed  = [ ]
    580587
    581588                for name, attrs in joblist.items():
    582589
    583                         job_id = name.split( '.' )[0]
     590                        job_id                  = name.split( '.' )[0]
    584591
    585592                        jobs_processed.append( job_id )
    586593
    587                         name = self.getAttr( attrs, 'Job_Name' )
    588                         queue = self.getAttr( attrs, 'queue' )
     594                        name                    = self.getAttr( attrs, 'Job_Name' )
     595                        queue                   = self.getAttr( attrs, 'queue' )
    589596
    590597                        if QUEUE:
     
    594601                                        continue
    595602
    596                         owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
    597                         requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
    598                         requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
    599 
    600                         mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
    601 
    602                         ppn = ''
     603                        owner                   = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
     604                        requested_time          = self.getAttr( attrs, 'Resource_List.walltime' )
     605                        requested_memory        = self.getAttr( attrs, 'Resource_List.mem' )
     606
     607                        mynoderequest           = self.getAttr( attrs, 'Resource_List.nodes' )
     608
     609                        ppn                     = ''
    603610
    604611                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
    605612
    606                                 mynoderequest_fields = mynoderequest.split( ':' )
     613                                mynoderequest_fields    = mynoderequest.split( ':' )
    607614
    608615                                for mynoderequest_field in mynoderequest_fields:
     
    610617                                        if mynoderequest_field.find( 'ppn' ) != -1:
    611618
    612                                                 ppn = mynoderequest_field.split( 'ppn=' )[1]
    613 
    614                         status = self.getAttr( attrs, 'job_state' )
    615 
    616                         queued_timestamp = self.getAttr( attrs, 'ctime' )
     619                                                ppn     = mynoderequest_field.split( 'ppn=' )[1]
     620
     621                        status                  = self.getAttr( attrs, 'job_state' )
     622
     623                        queued_timestamp        = self.getAttr( attrs, 'ctime' )
    617624
    618625                        if status == 'R':
    619                                 start_timestamp = self.getAttr( attrs, 'mtime' )
    620                                 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' )
    621 
    622                                 nodeslist = [ ]
     626
     627                                start_timestamp         = self.getAttr( attrs, 'mtime' )
     628                                nodes                   = self.getAttr( attrs, 'exec_host' ).split( '+' )
     629
     630                                nodeslist               = [ ]
    623631
    624632                                for node in nodes:
    625                                         host = node.split( '/' )[0]
     633
     634                                        host            = node.split( '/' )[0]
    626635
    627636                                        if nodeslist.count( host ) == 0:
     
    631640                                                        if translate_pattern.find( '/' ) != -1:
    632641
    633                                                                 translate_orig = translate_pattern.split( '/' )[1]
    634                                                                 translate_new = translate_pattern.split( '/' )[2]
    635 
    636                                                                 host = re.sub( translate_orig, translate_new, host )
     642                                                                translate_orig  = translate_pattern.split( '/' )[1]
     643                                                                translate_new   = translate_pattern.split( '/' )[2]
     644
     645                                                                host            = re.sub( translate_orig, translate_new, host )
    637646                               
    638647                                                if not host in nodeslist:
     
    646655                                        # than our local time.
    647656                               
    648                                         if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
    649 
    650                                                 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
     657                                        if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
     658
     659                                                self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
    651660
    652661                        elif status == 'Q':
    653                                 start_timestamp = ''
    654                                 count_mynodes = 0
    655                                 numeric_node = 1
     662
     663                                start_timestamp         = ''
     664                                count_mynodes           = 0
     665                                numeric_node            = 1
    656666
    657667                                for node in mynoderequest.split( '+' ):
    658668
    659                                         nodepart = node.split( ':' )[0]
     669                                        nodepart        = node.split( ':' )[0]
    660670
    661671                                        for letter in nodepart:
     
    663673                                                if letter not in string.digits:
    664674
    665                                                         numeric_node = 0
     675                                                        numeric_node    = 0
    666676
    667677                                        if not numeric_node:
    668                                                 count_mynodes = count_mynodes + 1
     678
     679                                                count_mynodes   = count_mynodes + 1
    669680                                        else:
    670681                                                try:
    671                                                         count_mynodes = count_mynodes + int( nodepart )
     682                                                        count_mynodes   = count_mynodes + int( nodepart )
     683
    672684                                                except ValueError, detail:
     685
    673686                                                        debug_msg( 10, str( detail ) )
    674687                                                        debug_msg( 10, "Encountered weird node in Resources_List?!" )
     
    677690                                                        debug_msg( 10, 'attrs = ' + str( attrs ) )
    678691                                               
    679                                 nodeslist = str( count_mynodes )
     692                                nodeslist       = str( count_mynodes )
    680693                        else:
    681                                 start_timestamp = ''
    682                                 nodeslist = ''
    683 
    684                         myAttrs = { }
    685                         myAttrs['name'] = str( name )
    686                         myAttrs['queue'] = str( queue )
    687                         myAttrs['owner'] = str( owner )
    688                         myAttrs['requested_time'] = str( requested_time )
    689                         myAttrs['requested_memory'] = str( requested_memory )
    690                         myAttrs['ppn'] = str( ppn )
    691                         myAttrs['status'] = str( status )
    692                         myAttrs['start_timestamp'] = str( start_timestamp )
    693                         myAttrs['queued_timestamp'] = str( queued_timestamp )
    694                         myAttrs['reported'] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
    695                         myAttrs['nodes'] = nodeslist
    696                         myAttrs['domain'] = string.join( socket.getfqdn().split( '.' )[1:], '.' )
    697                         myAttrs['poll_interval'] = str( BATCH_POLL_INTERVAL )
     694                                start_timestamp = ''
     695                                nodeslist       = ''
     696
     697                        myAttrs                         = { }
     698
     699                        myAttrs[ 'name' ]                       = str( name )
     700                        myAttrs[ 'queue' ]              = str( queue )
     701                        myAttrs[ 'owner' ]              = str( owner )
     702                        myAttrs[ 'requested_time' ]     = str( requested_time )
     703                        myAttrs[ 'requested_memory' ]   = str( requested_memory )
     704                        myAttrs[ 'ppn' ]                = str( ppn )
     705                        myAttrs[ 'status' ]             = str( status )
     706                        myAttrs[ 'start_timestamp' ]    = str( start_timestamp )
     707                        myAttrs[ 'queued_timestamp' ]   = str( queued_timestamp )
     708                        myAttrs[ 'reported' ]           = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
     709                        myAttrs[ 'nodes' ]              = nodeslist
     710                        myAttrs[ 'domain' ]             = string.join( socket.getfqdn().split( '.' )[1:], '.' )
     711                        myAttrs[ 'poll_interval' ]      = str( BATCH_POLL_INTERVAL )
    698712
    699713                        if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
    700                                 self.jobs[ job_id ] = myAttrs
    701 
    702                                 #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
     714
     715                                self.jobs[ job_id ]     = myAttrs
    703716
    704717                for id, attrs in self.jobs.items():
     
    711724
    712725        def submitJobData( self ):
     726
    713727                """Submit job info list"""
    714728
     
    719733                for jobid, jobattrs in self.jobs.items():
    720734
    721                         gmetric_val = self.compileGmetricVal( jobid, jobattrs )
    722 
    723                         metric_increment = 0
     735                        gmetric_val             = self.compileGmetricVal( jobid, jobattrs )
     736                        metric_increment        = 0
    724737
    725738                        for val in gmetric_val:
     739
    726740                                self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )
    727                                 metric_increment = metric_increment + 1
     741
     742                                metric_increment        = metric_increment + 1
    728743
    729744        def compileGmetricVal( self, jobid, jobattrs ):
     745
    730746                """Create a val string for gmetric of jobinfo"""
    731747
    732                 gval_lists = [ ]
    733 
    734                 mystr = None
    735 
    736                 val_list = { }
     748                gval_lists      = [ ]
     749                mystr           = None
     750                val_list        = { }
    737751
    738752                for val_name, val_value in jobattrs.items():
     
    748762
    749763                                        if node_str:
     764
    750765                                                node_str = node_str + ';' + node
    751766                                        else:
     
    754769                                        if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
    755770
    756                                                 val_list[ val_name ] = node_str
     771                                                val_list[ val_name ]    = node_str
     772
    757773                                                gval_lists.append( val_list )
    758                                                 val_list = { }
    759                                                 node_str = None
    760 
    761                                 val_list[ val_name ] = node_str
     774
     775                                                val_list                = { }
     776                                                node_str                = None
     777
     778                                val_list[ val_name ]    = node_str
     779
    762780                                gval_lists.append( val_list )
    763                                 val_list = { }
     781
     782                                val_list                = { }
    764783
    765784                        elif val_value != '':
     
    768787
    769788                                        gval_lists.append( val_list )
    770                                         val_list = { }
    771 
    772                                 val_list[ val_name ] = val_value
    773 
    774                 if len(val_list) > 0:
     789
     790                                        val_list                = { }
     791
     792                                val_list[ val_name ]    = val_value
     793
     794                if len( val_list ) > 0:
     795
    775796                        gval_lists.append( val_list )
    776797
    777                 str_list = [ ]
     798                str_list        = [ ]
    778799
    779800                for val_list in gval_lists:
    780801
    781                         my_val_str = None
     802                        my_val_str      = None
    782803
    783804                        for val_name, val_value in val_list.items():
     
    794815
    795816def printTime( ):
     817
    796818        """Print current time/date in human readable format for log/debug"""
    797819
     
    799821
    800822def debug_msg( level, msg ):
     823
    801824        """Print msg if at or above current debug level"""
    802825
     
    808831        # Write pidfile if PIDFILE exists
    809832        if PIDFILE:
    810                 pid = os.getpid()
    811 
    812                 pidfile = open(PIDFILE, 'w')
    813                 pidfile.write(str(pid))
     833
     834                pid     = os.getpid()
     835
     836                pidfile = open(PIDFILE, 'w')
     837
     838                pidfile.write( str( pid ) )
    814839                pidfile.close()
    815840
    816841def main():
     842
    817843        """Application start"""
    818844
     
    820846
    821847        if not processArgs( sys.argv[1:] ):
     848
    822849                sys.exit( 1 )
    823850
     
    840867        else:
    841868                debug_msg( 0, "fatal error: unknown BATCH_API '" + BATCH_API + "' is not supported" )
     869
    842870                sys.exit( 1 )
    843871
    844872        if DAEMONIZE:
     873
    845874                gather.daemon()
    846875        else:
Note: See TracChangeset for help on using the changeset viewer.