Changeset 65 for trunk/plugin/togap.py


Ignore:
Timestamp:
04/14/05 12:10:51 (18 years ago)
Author:
bastiaans
Message:

plugin/togap.py:

  • Added version check of gmetric since we only understand 3.0.1 syntax atm
  • Disabled stop_timestamp transmitting should be determined at toga _server_ and not here
  • Will check if gmetrics to be send > 1400 bytes and chop it up in pieces if needed


File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/plugin/togap.py

    r64 r65  
    1919import sys
    2020import time
     21import os
     22import string
    2123
    2224class DataProcessor:
     
    2931                        self.binary = binary
    3032
    31         def multicastGmetric( self, metricname, metricval, tmax ):
    32 
    33                 cmd = binary
     33                self.dmax = TORQUE_POLL_INTERVAL
     34
     35                #incompatible = self.checkGmetricVersion
     36                incompatible = 0
     37
     38                if incompatible:
     39                        debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' )
     40                        sys.exit( 1 )
     41
     42        def checkGmetricVersion( self ):
     43
     44                for line in os.popen( self.binary + ' --version' ).readlines():
     45
     46                        line = line.split( ' ' )
     47
     48                        if len( line ) == 2 and line.find( 'gmetric' ) != -1:
     49                       
     50                                gmetric_version = line[1]
     51
     52                                version_major = int( gemtric_version.split( '.' )[0] )
     53                                version_minor = int( gemtric_version.split( '.' )[1] )
     54                                version_patch = int( gemtric_version.split( '.' )[2] )
     55
     56                                incompatible = 0
     57
     58                                if version_major < 3:
     59
     60                                        incompatible = 1
     61                               
     62                                elif version_major == 3:
     63
     64                                        if version_minor == 0:
     65
     66                                                if version_patch < 1:
     67                                               
     68                                                        incompatbiel = 1
     69
     70                return incompatible
     71
     72        def multicastGmetric( self, metricname, metricval, tmax='15' ):
     73
     74                cmd = self.binary
    3475
    3576                try:
     
    3879                        debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
    3980
    40                 cmd = cmd + ' -n' + metricname + ' -v' + metricval + ' -t' + tmax
     81                cmd = cmd + ' -n' + metricname + ' -v"' + metricval + '" -t' + tmax + ' -d' + str( self.dmax )
    4182
    4283                print cmd
     
    80121                return 0
    81122
    82         def getJobData( self ):
    83 
    84                 jobs = self.jobs[:]
     123        def getJobData( self, known_jobs ):
     124
     125                if len( known_jobs ) > 0:
     126                        jobs = known_jobs
     127                else:
     128                        jobs = { }
    85129
    86130                joblist = self.pq.getjobs()
     
    106150                        status = self.getAttr( attrs, 'job_state' )
    107151                        start_timestamp = self.getAttr( attrs, 'mtime' )
    108                         stop_timestamp = ''
     152                        #stop_timestamp = ''
    109153
    110154                        myAttrs = { }
     
    117161                        myAttrs['status'] = status
    118162                        myAttrs['start_timestamp'] = start_timestamp
    119                         myAttrs['stop_timestamp'] = stop_timestamp
     163                        #myAttrs['stop_timestamp'] = stop_timestamp
    120164
    121165                        if self.jobDataChanged( jobs, job_id, myAttrs ):
     
    126170                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
    127171
    128                 for id, attrs in jobs.items():
    129 
    130                         # This job was there in the last run, and not anymore
    131                         # it must have finished
    132 
    133                         if id not in jobs_processed and attrs['stop_timestamp'] == '':
    134 
    135                                 jobs[ id ]['status'] = 'F'
    136                                 jobs[ id ]['stop_timestamp'] = time.time()
    137                                 debug_msg( 10, printTime() + ' job %s finished' %(id) )
    138                                 self.printJob( jobs, id )
     172                #for id, attrs in jobs.items():
     173
     174                #       # This job was there in the last run, and not anymore
     175                #       # it must have finished
     176
     177                #       if id not in jobs_processed and attrs['stop_timestamp'] == '':
     178
     179                #               jobs[ id ]['status'] = 'F'
     180                #               jobs[ id ]['stop_timestamp'] = time.time()
     181                #               debug_msg( 10, printTime() + ' job %s finished' %(id) )
     182                #               self.printJob( jobs, id )
     183
     184                return jobs
     185
     186        def submitJobData( self, jobs ):
     187                """Submit job info list"""
     188
     189                time_now = time.time()
     190
     191                self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( time_now ) )
    139192
    140193                # Now let's spread the knowledge
     
    142195                for jobid, jobattrs in jobs.items():
    143196
    144                         if ARCHIVE_MODE:
    145 
    146                                 if self.jobDataChanged( self.jobs, jobid, jobattrs ):
    147 
    148                                         self.dp.togaSubmitJob( jobid, jobattrs )
    149 
    150                         self.dp.multicastGmetric( jobid, jobattrs )
    151                                        
    152                 self.jobs = jobs
     197                        gmetric_val = self.compileGmetricVal( jobid, jobattrs )
     198
     199                        for val in gmetric_val:
     200                                self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val )
     201
     202        def compileGmetricVal( self, jobid, jobattrs ):
     203                """Create a val string for gmetric of jobinfo"""
     204
     205                name_str = 'name=' + jobattrs['name']
     206                queue_str = 'queue=' + jobattrs['queue']
     207                owner_str = 'owner=' + jobattrs['owner']
     208                rtime_str = 'rtime=' + jobattrs['requested_time']
     209                rmem_str = 'rmem=' + jobattrs['requested_memory']
     210                ppn_str = 'ppn=' + jobattrs['ppn']
     211                status_str = 'status=' + jobattrs['status']
     212                stime_str = 'stime=' + jobattrs['start_timestamp']
     213
     214                appendList = [ name_str, queue_str, owner_str, rtime_str, rmem_str, ppn_str, status_str, stime_str ]
     215
     216                return self.makeAppendLists( appendList )
     217
     218        def makeAppendLists( self, append_list ):
     219
     220                app_lists = [ ]
     221
     222                mystr = None
     223
     224                for val in append_list:
     225
     226                        if not mystr:
     227                                mystr = val
     228                        else:
     229                                if not self.checkValAppendMaxSize( mystr, val ):
     230                                        mystr = mystr + ' ' + val
     231                                else:
     232                                        # Too big, new appenlist
     233                                        app_lists.append( mystr )
     234                                        mystr = val
     235
     236                app_lists.append( mystr )
     237
     238                return app_lists
     239
     240        def checkValAppendMaxSize( self, val, text ):
     241                """Check if val + text size is not above 1400 (max msg size)"""
     242
     243                if len( val + text ) > 1400:
     244                        return 1
     245                else:
     246                        return 0
    153247
    154248        def printJobs( self, jobs ):
    155        
     249                """Print a jobinfo overview"""
     250
    156251                for name, attrs in self.jobs.items():
    157252
     
    163258
    164259        def printJob( self, jobs, job_id ):
     260                """Print job with job_id from jobs"""
    165261
    166262                print 'job %s' %(job_id)
    167263
    168                 for name, val in self.jobs[ job_id ].items():
     264                for name, val in jobs[ job_id ].items():
    169265
    170266                        print '\t%s = %s' %( name, val )
    171267
    172268        def daemon( self ):
    173                 "Run as daemon forever"
    174 
    175                 self.DAEMON = 1
     269                """Run as daemon forever"""
    176270
    177271                # Fork the first child
     
    207301
    208302        def run( self ):
    209                 "Main thread"
     303                """Main thread"""
    210304
    211305                while ( 1 ):
    212306               
    213                         self.getJobData()
     307                        self.jobs = self.getJobData( self.jobs )
     308                        self.submitJobData( self.jobs )
    214309                        time.sleep( TORQUE_POLL_INTERVAL )     
    215310
    216311def printTime( ):
     312        """Print current time/date in human readable format for log/debug"""
    217313
    218314        return time.strftime("%a, %d %b %Y %H:%M:%S")
    219315
    220316def debug_msg( level, msg ):
     317        """Print msg if at or above current debug level"""
    221318
    222319        if (DEBUG_LEVEL >= level):
     
    224321
    225322def main():
     323        """Application start"""
    226324
    227325        gather = PBSDataGatherer()
     
    231329                gather.run()
    232330
     331# w00t someone started me
     332#
    233333if __name__ == '__main__':
    234334        main()
Note: See TracChangeset for help on using the changeset viewer.