Ignore:
Timestamp:
04/05/13 14:35:28 (11 years ago)
Author:
ramonb
Message:
  • prevent that 1 (or more) failed SQL insert/updates hangs the entire job store thread
  • check success/failure of SQL actions and perform commit/rollback accordingly
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/0.4/jobarchived/jobarchived.py

    r783 r792  
    340340
    341341    def Commit(self):
    342         self.SQL.commit()
     342
     343        return self.SQL.commit()
     344
     345    def Rollback( self ):
     346
     347        return self.SQL.rollback()
    343348
    344349class DataSQLStore:
     
    364369
    365370    def setDatabase(self, statement):
     371
    366372        ret = self.doDatabase('set', statement)
    367373        return ret
    368374       
    369375    def getDatabase(self, statement):
     376
    370377        ret = self.doDatabase('get', statement)
    371378        return ret
     379
     380    def doCommit( self ):
     381
     382        return self.dbc.Commit()
     383
     384    def doRollback( self ):
     385
     386        return self.dbc.Rollback()
    372387
    373388    def doDatabase(self, type, statement):
     
    377392            if type == 'set':
    378393                result = self.dbc.Set( statement )
    379                 self.dbc.Commit()
    380394            elif type == 'get':
    381395                result = self.dbc.Get( statement )
     
    383397        except DBError, detail:
    384398            operation = statement.split(' ')[0]
    385             debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
    386             sys.exit(1)
     399            debug_msg( 0, 'ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
     400            return False
    387401
    388402        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
     
    392406
    393407        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
     408        if not id:
     409            return False
     410
    394411        if len( id ) > 0:
    395412
    396413            if len( id[0] ) > 0 and id[0] != '':
    397414           
    398                 return 1
    399 
    400         return 0
     415                return True
     416
     417        return False
    401418
    402419    def getNodeId( self, hostname ):
     
    440457        if not self.getJobId( job_id ):
    441458
    442             self.mutateJob( 'insert', job_id, jobattrs )
     459            return self.mutateJob( 'insert', job_id, jobattrs )
    443460        else:
    444             self.mutateJob( 'update', job_id, jobattrs )
     461            return self.mutateJob( 'update', job_id, jobattrs )
    445462
    446463    def mutateJob( self, action, job_id, jobattrs ):
     
    507524        if action == 'insert':
    508525
    509             self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
     526            db_ret = self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
    510527
    511528        elif action == 'update':
    512529
    513             self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
     530            db_ret = self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
    514531
    515532        if len( ids ) > 0:
    516533            self.addJobNodes( job_id, ids )
     534
     535        return db_ret
    517536
    518537    def addNodes( self, hostnames, domain ):
     
    547566    def storeJobInfo( self, jobid, jobattrs ):
    548567
    549         self.addJob( jobid, jobattrs )
     568        return self.addJob( jobid, jobattrs )
    550569
    551570    def checkStaleJobs( self ):
     
    947966        debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) )
    948967
     968        failed_store = [ ]
     969        succes_store = [ ]
     970
    949971        if len( self.jobs_to_store ) > 0:
    950972
    951973            debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' )
    952974
    953             while len( self.jobs_to_store ) > 0:
     975            for n in range( 0, len(self.jobs_to_store ) ):
     976
     977                if len( self.jobs_to_store ) == 0:
     978                    break
    954979
    955980                jobid = self.jobs_to_store.pop( 0 )
    956981
    957                 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
     982                db_ok = self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
     983
     984                if not db_ok:
     985
     986                    self.ds.doRollback()
     987                    failed_store.append( jobid )
     988                    continue
     989
     990                self.ds.doCommit()
     991                succes_store.append( jobid )
    958992
    959993                if not jobid in jobs_finished:
     
    9691003                    del self.jobAttrs[ jobid ]
    9701004
    971             debug_msg( 1, 'job_xml_thread(): Done storing.' )
     1005            result_str = 'succesfully stored: %s jobs' %str(len(succes_store))
     1006
     1007            if len( failed_store ) > 0:
     1008                result_str = result_str + ' - failed to store: %s jobs - deferred to next interval' %str(len(failed_store))
     1009
     1010            debug_msg( 1, 'job_xml_thread(): Done storing. %s' %result_str )
    9721011
    9731012        else:
Note: See TracChangeset for help on using the changeset viewer.