Changeset 84


Ignore:
Timestamp:
04/18/05 14:28:41 (19 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

  • Added DataSQLStore class now jobinfo is actually stored in SQL dbase
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r82 r84  
    2121# 8  = RRD file activity
    2222# 7  = daemon threading
     23# 6  = SQL
    2324#
    2425DEBUG_LEVEL = 7
     
    8586This is TOrque-GAnglia's data Daemon
    8687"""
     88
     89class DataSQLStore:
     90
     91        db_vars = None
     92        dbc = None
     93
     94        def __init__( self, hostname, database ):
     95
     96                self.db_vars = DBClass.InitVars(DataBaseName=database,
     97                                User='root',
     98                                Host=hostname,
     99                                Password='',
     100                                Dictionary='true')
     101
     102                try:
     103                        self.dbc     = DBClass.DB(self.db_vars)
     104                except DBClass.DBError, details:
     105                        print 'Error in connection to db: %s' %details
     106                        sys.exit(1)
     107
     108        def setDatabase(self, statement):
     109                ret = self.doDatabase('set', statement)
     110                return ret
     111               
     112        def getDatabase(self, statement):
     113                ret = self.doDatabase('get', statement)
     114                return ret
     115
     116        def doDatabase(self, type, statement):
     117
     118                debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) )
     119                try:
     120                        if type == 'set':
     121                                result = self.dbc.Set( statement )
     122                                self.dbc.Commit()
     123                        elif type == 'get':
     124                                result = self.dbc.Get( statement )
     125                               
     126                except DBClass.DBError, detail:
     127                        operation = statement.split(' ')[0]
     128                        print "%s operation on database failed while performing\n'%s'\n%s"\
     129                                %(operation, statement, detail)
     130                        sys.exit(1)
     131
     132                debug_msg( 6, 'doDatabase(): result: %s' %(result) )
     133                return result
     134
     135        def getNodeId( self, hostname ):
     136
     137                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )[0][0]
     138
     139                if id:
     140
     141                        return id
     142                else:
     143                        return None
     144
     145        def getNodeIds( self, hostnames ):
     146
     147                ids = [ ]
     148
     149                for node in hostnames:
     150
     151                        id = self.getNodeId( node )
     152
     153                        if id:
     154                                ids.append( id )
     155
     156                return ids
     157
     158        def getJobId( self, jobid ):
     159
     160                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
     161
     162                print id
     163
     164                if id:
     165
     166                        return id
     167                else:
     168                        return None
     169
     170        def addJob( self, job_id, jobattrs ):
     171
     172                if not self.getJobId( job_id ):
     173
     174                        self.mutateJob( 'insert', job_id, jobattrs )
     175                else:
     176                        self.mutateJob( 'update', job_id, jobattrs )
     177
     178        def mutateJob( self, action, job_id, jobattrs ):
     179
     180                job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
     181
     182                insert_col_str = 'job_id'
     183                insert_val_str = "'%s'" %job_id
     184                update_str = None
     185
     186                debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))
     187                print jobattrs
     188
     189                for valname, value in jobattrs.items():
     190
     191                        print valname, value
     192
     193                        if valname in job_values and value:
     194
     195                                column_name = 'job_' + valname
     196
     197                                if action == 'insert':
     198
     199                                        if not insert_col_str:
     200                                                insert_col_str = column_name
     201                                        else:
     202                                                insert_col_str = insert_col_str + ',' + column_name
     203
     204                                        if not insert_val_str:
     205                                                insert_val_str = value
     206                                        else:
     207                                                insert_val_str = insert_val_str + ",'%s'" %value
     208
     209                                elif action == 'update':
     210                                       
     211                                        if not update_str:
     212                                                update_str = "%s='%s'" %(column_name, value)
     213                                        else:
     214                                                update_str = update_str + ",%s='%s'" %(column_name, value)
     215
     216                        elif valname == 'nodes':
     217
     218                                self.addNodes( value )
     219                                #ids = self.getNodeIds( value )
     220
     221                                #for id in ids:
     222                                #       self.addJobNode( job_id, id )
     223
     224                if action == 'insert':
     225
     226                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
     227                elif action == 'update':
     228
     229                        self.setDatabase( "UPDATE jobs SET %s" %(update_str) )
     230
     231        def addNodes( self, hostnames ):
     232
     233                for node in hostnames:
     234
     235                        id = self.getNodeId( node )
     236       
     237                        if not id:
     238                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
     239
     240        def addJobNode( self, jobid, nodeid ):
     241
     242                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
     243
     244        def storeJobInfo( self, jobid, jobattrs ):
     245
     246                self.addJob( jobid, jobattrs )
    87247
    88248class RRDMutator:
     
    227387
    228388        jobAttrs = { }
     389        jobs_to_store = [ ]
     390
     391        def __init__( self ):
     392
     393                self.ds = DataSQLStore( TOGA_SQL_DBASE.split( '/' )[0], TOGA_SQL_DBASE.split( '/' )[1] )
    229394
    230395        def startElement( self, name, attrs ):
     
    260425                                for myval in valinfo:
    261426
    262                                         valname = myval.split( '=' )[0]
    263                                         value = myval.split( '=' )[1]
    264 
    265                                         if valname == 'nodes':
    266                                                 value = value.split( ';' )
    267 
    268                                         jobinfo[ valname ] = value
     427                                        if len( myval.split( '=' ) ) > 1:
     428
     429                                                valname = myval.split( '=' )[0]
     430                                                value = myval.split( '=' )[1]
     431
     432                                                if valname == 'nodes':
     433                                                        value = value.split( ';' )
     434                                                        self.ds.addNodes( value )
     435
     436                                                jobinfo[ valname ] = value
    269437
    270438                                if check_change:
    271439                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ):
    272440                                                self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
     441                                                if not job_id in self.jobs_to_store:
     442                                                        self.jobs_to_store.append( job_id )
     443
    273444                                                debug_msg( 0, 'jobinfo for job %s has changed' %job_id )
    274445                                else:
    275446                                        self.jobAttrs[ job_id ] = jobinfo
     447
     448                                        if not job_id in self.jobs_to_store:
     449                                                self.jobs_to_store.append( job_id )
     450
    276451                                        debug_msg( 0, 'jobinfo for job %s has changed' %job_id )
    277452                                       
     
    288463                                self.jobAttrs[ jobid ]['status'] = 'F'
    289464                                self.jobAttrs[ jobid ]['stop_timestamp'] = str( int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] ) )
     465                                if not jobid in self.jobs_to_store:
     466                                        self.jobs_to_store.append( jobid )
     467
     468                for jobid in self.jobs_to_store:
     469                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )   
     470
     471                self.jobs_to_store = [ ]
    290472
    291473        def setJobAttrs( self, old, new ):
Note: See TracChangeset for help on using the changeset viewer.