Changeset 782


Ignore:
Timestamp:
03/31/13 21:25:23 (11 years ago)
Author:
ramonb
Message:
  • trying to fix job parsing and detection: not everything seen/stored
  • all job checking should be done post-parsing, not while parsing
  • some cleanup
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/0.4/jobarchived/jobarchived.py

    r774 r782  
    446446    def mutateJob( self, action, job_id, jobattrs ):
    447447
    448         job_values    = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
    449 
    450         insert_col_str    = 'job_id'
    451         insert_val_str    = "'%s'" %job_id
    452         update_str    = None
     448        job_values     = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
     449
     450        insert_col_str = 'job_id'
     451        insert_val_str = "'%s'" %job_id
     452        update_str     = None
    453453
    454454        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
     
    770770        pass
    771771
    772 class TorqueXMLProcessor( XMLProcessor ):
     772class JobXMLProcessor( XMLProcessor ):
    773773    """Main class for processing XML and acting with it"""
    774774
     
    776776        """Setup initial XML connection and handlers"""
    777777
    778         self.myXMLSource    = XMLSource
    779         self.myXMLHandler    = TorqueXMLHandler( DataStore )
    780         self.myXMLError        = XMLErrorHandler()
    781 
    782         self.config        = GangliaConfigParser( GMETAD_CONF )
     778        self.myXMLSource  = XMLSource
     779        self.myXMLHandler = JobXMLHandler( DataStore )
     780        self.myXMLError   = XMLErrorHandler()
     781
     782        self.config       = GangliaConfigParser( GMETAD_CONF )
     783
     784        self.kill_thread  = False
     785
     786    def killThread( self ):
     787
     788        self.kill_thread  = True
    783789
    784790    def run( self ):
    785791        """Main XML processing"""
    786792
    787         debug_msg( 1, 'torque_xml_thread(): started.' )
     793        debug_msg( 1, 'job_xml_thread(): started.' )
    788794
    789795        while( 1 ):
    790796
    791             #self.myXMLSource = self.mXMLGatherer.getFileObject()
    792             debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' )
     797            debug_msg( 1, 'job_xml_thread(): Retrieving XML data..' )
    793798
    794799            my_data    = self.myXMLSource.getData()
    795             #print my_data
    796             #print "size my data: %d" %len( my_data )
    797 
    798             debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
     800
     801            debug_msg( 1, 'job_xml_thread(): Done retrieving: data size %d' %len(my_data) )
    799802
    800803            if my_data:
    801                 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
     804                debug_msg( 1, 'job_xml_thread(): Parsing XML..' )
    802805
    803806                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    804807
    805                 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
     808                debug_msg( 1, 'job_xml_thread(): Done parsing.' )
    806809            else:
    807                 debug_msg( 1, 'torque_xml_thread(): Got no data.' )
    808 
     810                debug_msg( 1, 'job_xml_thread(): Got no data.' )
     811
     812            if self.kill_thread:
     813
     814                debug_msg( 1, 'job_xml_thread(): killed.' )
     815                return None
    809816               
    810             debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
     817            debug_msg( 1, 'job_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
    811818            time.sleep( self.config.getLowestInterval() )
    812819
    813 class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
    814     """Parse Torque's jobinfo XML from our plugin"""
    815 
    816     jobAttrs = { }
     820class JobXMLHandler( xml.sax.handler.ContentHandler ):
     821    """Parse Job's jobinfo XML from our plugin"""
    817822
    818823    def __init__( self, datastore ):
    819824
    820         #self.ds            = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
    821         self.ds            = datastore
    822         self.jobs_processed    = [ ]
    823         self.jobs_to_store    = [ ]
     825        self.ds              = datastore
     826        self.jobs_processed  = [ ]
     827        self.jobs_to_store   = [ ]
     828        self.jobAttrs        = { }
     829        self.jobAttrsSaved   = { }
     830
    824831        debug_msg( 1, "XML: Handler created" )
    825832
    826833    def startDocument( self ):
    827834
    828         self.heartbeat    = 0
    829         self.elementct    = 0
     835        self.jobs_processed = [ ]
     836        self.heartbeat      = 0
     837        self.elementct      = 0
     838
    830839        debug_msg( 1, "XML: Start document" )
    831840
     
    836845        just one XML statement with all info
    837846        """
    838        
     847
    839848        jobinfo = { }
    840849
     
    850859
    851860            if metricname == 'zplugin_monarch_heartbeat':
     861
    852862                self.heartbeat = str( attrs.get( 'VAL', "" ) )
    853863
    854864            elif metricname.find( 'zplugin_monarch_job' ) != -1:
    855865
    856                 job_id    = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
    857                 val    = str( attrs.get( 'VAL', "" ) )
    858 
    859                 if not job_id in self.jobs_processed:
    860 
    861                     self.jobs_processed.append( job_id )
    862 
    863                 check_change = 0
    864 
    865                 if self.jobAttrs.has_key( job_id ):
    866 
    867                     check_change = 1
     866                job_id  = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
     867                val     = str( attrs.get( 'VAL', "" ) )
    868868
    869869                valinfo = val.split( ' ' )
     
    873873                    if len( myval.split( '=' ) ) > 1:
    874874
    875                         valname    = myval.split( '=' )[0]
    876                         value    = myval.split( '=' )[1]
     875                        valname = myval.split( '=' )[0]
     876                        value   = myval.split( '=' )[1]
    877877
    878878                        if valname == 'nodes':
     879
    879880                            value = value.split( ';' )
    880881
    881882                        jobinfo[ valname ] = value
    882883
    883                 if check_change:
    884                     if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
    885                         self.jobAttrs[ job_id ]['stop_timestamp']    = ''
    886                         self.jobAttrs[ job_id ]                = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
    887                         if not job_id in self.jobs_to_store:
    888                             self.jobs_to_store.append( job_id )
    889 
    890                         debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
    891                 else:
    892                     self.jobAttrs[ job_id ] = jobinfo
    893 
    894                     if not job_id in self.jobs_to_store:
    895                         self.jobs_to_store.append( job_id )
    896 
    897                     debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
     884                self.jobAttrs[ job_id ] = jobinfo
     885
     886                self.jobs_processed.append( job_id )
    898887                   
    899888    def endDocument( self ):
    900889        """When all metrics have gone, check if any jobs have finished"""
    901890
    902         debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
    903 
    904         if self.heartbeat:
    905             for jobid, jobinfo in self.jobAttrs.items():
    906 
    907                 # This is an old job, not in current jobinfo list anymore
    908                 # it must have finished, since we _did_ get a new heartbeat
    909                 #
    910                 mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
    911 
    912                 if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
    913 
    914                     if not jobid in self.jobs_processed:
    915                         self.jobs_processed.append( jobid )
    916 
     891        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_processed))+" jobs" )
     892
     893        if self.heartbeat == 0:
     894            return None
     895
     896        for jobid, jobinfo in self.jobAttrs.items():
     897
     898            if jobinfo['reported'] != self.heartbeat:
     899
     900                if (jobinfo['status'] != 'R'):
     901                    debug_msg( 1, 'job %s report time %s does not match current heartbeat %s : ignoring job' %(jobid, jobinfo['reported'], self.heartbeat ) )
     902                    del self.jobAttrs[ jobid ]
     903
     904                    if jobid in self.jobs_to_store:
     905                        del self.jobs_to_store[ jobid ]
     906
     907                    continue
     908
     909                elif jobid not in self.jobs_processed:
     910                    # Was running previous heartbeat but not anymore: must be finished
    917911                    self.jobAttrs[ jobid ]['status'] = 'F'
    918912                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
     913                    debug_msg( 1, 'job %s appears to have finished' %jobid )
    919914
    920915                    if not jobid in self.jobs_to_store:
    921916                        self.jobs_to_store.append( jobid )
    922917
    923             debug_msg( 1, 'torque_xml_thread(): Storing..' )
    924 
    925             for jobid in self.jobs_to_store:
    926                 if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
    927 
    928                     self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
    929 
    930                     if self.jobAttrs[ jobid ]['status'] == 'F':
    931                         del self.jobAttrs[ jobid ]
    932 
    933             debug_msg( 1, 'torque_xml_thread(): Done storing.' )
    934 
    935             self.jobs_processed    = [ ]
    936             self.jobs_to_store    = [ ]
     918                    continue
     919
     920            elif self.jobAttrsSaved.has_key( jobid ):
     921
     922                if self.jobinfoChanged( jobid, jobinfo ):
     923
     924                    self.jobAttrs[ jobid ]['stop_timestamp'] = ''
     925                    self.jobAttrs[ jobid ]                   = self.setJobAttrs( self.jobAttrs[ jobid ], jobinfo )
     926
     927                    if not jobid in self.jobs_to_store:
     928
     929                        self.jobs_to_store.append( jobid )
     930
     931                    debug_msg( 10, 'jobinfo for job %s has changed' %jobid )
     932            else:
     933                debug_msg( 1, 'new job %s' %jobid )
     934
     935                if not jobid in self.jobs_to_store:
     936
     937                    self.jobs_to_store.append( jobid )
     938
     939        debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) )
     940
     941        if len( self.jobs_to_store ) > 0:
     942
     943            debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' )
     944
     945            while len( self.jobs_to_store ) > 0:
     946
     947                jobid = self.jobs_to_store.pop( 0 )
     948
     949                self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
     950
     951                self.jobAttrsSaved[ jobid ] = self.jobAttrs[ jobid ]
     952
     953                if self.jobAttrs[ jobid ]['status'] == 'F':
     954
     955                    del self.jobAttrs[ jobid ]
     956
     957            debug_msg( 1, 'job_xml_thread(): Done storing.' )
     958
     959        else:
     960            debug_msg( 1, 'job_xml_thread(): No jobs to store.' )
     961
     962        self.jobs_processed = [ ]
    937963
    938964    def setJobAttrs( self, old, new ):
     
    948974       
    949975
    950     def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
     976    def jobinfoChanged( self, jobid, jobinfo ):
    951977        """
    952978        Check if jobinfo has changed from jobattrs[jobid]
     
    957983        ignore_changes = [ 'reported' ]
    958984
    959         if jobattrs.has_key( jobid ):
     985        if self.jobAttrsSaved.has_key( jobid ):
    960986
    961987            for valname, value in jobinfo.items():
     
    963989                if valname not in ignore_changes:
    964990
    965                     if jobattrs[ jobid ].has_key( valname ):
    966 
    967                         if value != jobattrs[ jobid ][ valname ]:
    968 
    969                             if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
     991                    if self.jobAttrsSaved[ jobid ].has_key( valname ):
     992
     993                        if value != self.jobAttrsSaved[ jobid ][ valname ]:
     994
     995                            if jobinfo['reported'] > self.jobAttrsSaved[ jobid ][ 'reported' ]:
     996
     997                                debug_msg( 1, "job %s field '%s' changed since saved from: %s to: %s" %( jobid, valname, value, self.jobAttrsSaved[ jobid ][ valname ] ) )
     998
    970999                                return True
    9711000
    9721001                    else:
     1002                        debug_msg( 1, "job %s did not have field '%s'" %( jobid, valname )  )
    9731003                        return True
    9741004
     
    10361066        if name == 'GANGLIA_XML':
    10371067
    1038             self.XMLSource        = str( attrs.get( 'SOURCE', "" ) )
    1039             self.gangliaVersion    = str( attrs.get( 'VERSION', "" ) )
     1068            self.XMLSource      = str( attrs.get( 'SOURCE', "" ) )
     1069            self.gangliaVersion = str( attrs.get( 'VERSION', "" ) )
    10401070
    10411071            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
     
    10501080        elif name == 'CLUSTER':
    10511081
    1052             self.clusterName    = str( attrs.get( 'NAME', "" ) )
     1082            self.clusterName = str( attrs.get( 'NAME',      "" ) )
    10531083            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
    10541084
     
    10611091        elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
    10621092
    1063             self.hostName        = str( attrs.get( 'NAME', "" ) )
    1064             self.hostIp        = str( attrs.get( 'IP', "" ) )
    1065             self.hostReported    = str( attrs.get( 'REPORTED', "" ) )
     1093            self.hostName     = str( attrs.get( 'NAME',    "" ) )
     1094            self.hostIp       = str( attrs.get( 'IP',      "" ) )
     1095            self.hostReported = str( attrs.get( 'REPORTED', "" ) )
    10661096
    10671097            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
     
    10871117            if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
    10881118
    1089                 myMetric        = { }
    1090                 myMetric['name']    = str( attrs.get( 'NAME', "" ) )
    1091                 myMetric['val']        = str( attrs.get( 'VAL', "" ) )
    1092                 myMetric['time']    = self.hostReported
     1119                myMetric         = { }
     1120                myMetric['name'] = str( attrs.get( 'NAME', "" ) )
     1121                myMetric['val']  = str( attrs.get( 'VAL', "" ) )
     1122                myMetric['time'] = self.hostReported
    10931123
    10941124                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
     
    11591189        self.host    = host
    11601190        self.port    = port
    1161         self.slot       = threading.Lock()
     1191        self.slot    = threading.Lock()
    11621192
    11631193        self.retrieveData()
     
    12901320        """Setup initial XML connection and handlers"""
    12911321
    1292         self.config        = GangliaConfigParser( GMETAD_CONF )
    1293 
    1294         #self.myXMLGatherer    = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    1295         #self.myXMLSource    = self.myXMLGatherer.getFileObject()
    1296         self.myXMLSource    = XMLSource
    1297         self.ds            = DataStore
    1298         self.myXMLHandler    = GangliaXMLHandler( self.config, self.ds )
    1299         self.myXMLError        = XMLErrorHandler()
     1322        self.config       = GangliaConfigParser( GMETAD_CONF )
     1323        self.myXMLSource  = XMLSource
     1324        self.ds           = DataStore
     1325        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )
     1326        self.myXMLError   = XMLErrorHandler()
    13001327
    13011328    def run( self ):
    13021329        """Main XML processing; start a xml and storethread"""
    13031330
    1304         xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
     1331        xml_thread   = threading.Thread( None, self.processXML,  'xmlthread' )
    13051332        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
    13061333
     
    13431370        if DEBUG_LEVEL >= 1:
    13441371            STORE_INTERVAL = 60
    1345             #STORE_INTERVAL = random.randint( 360, 640 )
    13461372        else:
    13471373            STORE_INTERVAL = random.randint( 360, 640 )
     
    13621388        if store_metric_thread.isAlive():
    13631389
    1364             debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to terminate..' )
     1390            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
    13651391            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    1366             debug_msg( 1, 'ganglia_store_thread(): Done waiting: terminated storemetricthread()' )
     1392
     1393            if store_metric_thread.isAlive():
     1394
     1395                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' )
     1396            else:
     1397                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' )
    13671398
    13681399        debug_msg( 1, 'ganglia_store_thread(): finished.' )
     
    13751406        debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
    13761407        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
     1408
    13771409        ret = self.myXMLHandler.storeMetrics()
    13781410        if ret > 0:
    13791411            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
     1412
    13801413        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
    13811414        debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
     
    13851418    def processXML( self ):
    13861419        """Process XML"""
    1387 
    1388         debug_msg( 5, "Entering processXML()")
    13891420
    13901421        try:
     
    14031434        if parsethread.isAlive():
    14041435
    1405             debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to terminate..' %PARSE_TIMEOUT )
     1436            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
    14061437            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    1407             debug_msg( 1, 'ganglia_xml_thread(): Done waiting. parsethread() terminated' )
     1438
     1439            if parsethread.isAlive():
     1440                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' )
     1441            else:
     1442                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' )
    14081443
    14091444        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
    1410 
    1411         debug_msg( 5, "Leaving processXML()")
    14121445
    14131446        return 0
     
    14201453       
    14211454        my_data    = self.myXMLSource.getData()
    1422         debug_msg( 1, 'ganglia_parse_thread(): data size %d.' %len(my_data) )
    1423 
    1424         debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
     1455
     1456        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) )
    14251457
    14261458        if my_data:
     
    16921724        debug_msg( 5, "Entering storeMetrics()")
    16931725
    1694         count_values    = 0
    1695         count_metrics    = 0
     1726        count_values  = 0
     1727        count_metrics = 0
    16961728        count_bits    = 0
    16971729
     
    17061738                    count_values += 1
    17071739
    1708                     count_bits    += len( dmetric['time'] )
    1709                     count_bits    += len( dmetric['val'] )
     1740                    count_bits   += len( dmetric['time'] )
     1741                    count_bits   += len( dmetric['val'] )
    17101742
    17111743        count_bytes    = count_bits / 8
     
    19752007    """Threading start"""
    19762008
    1977     config        = GangliaConfigParser( GMETAD_CONF )
    1978     s_timeout    = int( config.getLowestInterval() - 1 )
     2009    config             = GangliaConfigParser( GMETAD_CONF )
     2010    s_timeout          = int( config.getLowestInterval() - 1 )
    19792011
    19802012    socket.setdefaulttimeout( s_timeout )
     
    19832015    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
    19842016
    1985     myTorqueProcessor    = TorqueXMLProcessor( myXMLSource, myDataStore )
    1986     myGangliaProcessor    = GangliaXMLProcessor( myXMLSource, myDataStore )
     2017    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
     2018    myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore )
    19872019
    19882020    try:
    1989         torque_xml_thread    = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
    1990         ganglia_xml_thread    = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
    1991 
    1992         torque_xml_thread.start()
     2021        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
     2022        ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
     2023
     2024        job_xml_thread.start()
    19932025        ganglia_xml_thread.start()
    19942026       
Note: See TracChangeset for help on using the changeset viewer.