Changeset 84
- Timestamp:
- 04/18/05 14:28:41 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r82 r84 21 21 # 8 = RRD file activity 22 22 # 7 = daemon threading 23 # 6 = SQL 23 24 # 24 25 DEBUG_LEVEL = 7 … … 85 86 This is TOrque-GAnglia's data Daemon 86 87 """ 88 89 class DataSQLStore: 90 91 db_vars = None 92 dbc = None 93 94 def __init__( self, hostname, database ): 95 96 self.db_vars = DBClass.InitVars(DataBaseName=database, 97 User='root', 98 Host=hostname, 99 Password='', 100 Dictionary='true') 101 102 try: 103 self.dbc = DBClass.DB(self.db_vars) 104 except DBClass.DBError, details: 105 print 'Error in connection to db: %s' %details 106 sys.exit(1) 107 108 def setDatabase(self, statement): 109 ret = self.doDatabase('set', statement) 110 return ret 111 112 def getDatabase(self, statement): 113 ret = self.doDatabase('get', statement) 114 return ret 115 116 def doDatabase(self, type, statement): 117 118 debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) ) 119 try: 120 if type == 'set': 121 result = self.dbc.Set( statement ) 122 self.dbc.Commit() 123 elif type == 'get': 124 result = self.dbc.Get( statement ) 125 126 except DBClass.DBError, detail: 127 operation = statement.split(' ')[0] 128 print "%s operation on database failed while performing\n'%s'\n%s"\ 129 %(operation, statement, detail) 130 sys.exit(1) 131 132 debug_msg( 6, 'doDatabase(): result: %s' %(result) ) 133 return result 134 135 def getNodeId( self, hostname ): 136 137 id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )[0][0] 138 139 if id: 140 141 return id 142 else: 143 return None 144 145 def getNodeIds( self, hostnames ): 146 147 ids = [ ] 148 149 for node in hostnames: 150 151 id = self.getNodeId( node ) 152 153 if id: 154 ids.append( id ) 155 156 return ids 157 158 def getJobId( self, jobid ): 159 160 id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid ) 161 162 print id 163 164 if id: 165 166 return id 167 else: 168 return None 169 170 def addJob( self, job_id, jobattrs ): 171 172 if not self.getJobId( job_id ): 173 174 self.mutateJob( 'insert', job_id, jobattrs ) 175 else: 176 self.mutateJob( 'update', job_id, jobattrs ) 177 178 def mutateJob( self, action, job_id, jobattrs ): 179 180 job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ] 181 182 insert_col_str = 'job_id' 183 insert_val_str = "'%s'" %job_id 184 update_str = None 185 186 debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id)) 187 print jobattrs 188 189 for valname, value in jobattrs.items(): 190 191 print valname, value 192 193 if valname in job_values and value: 194 195 column_name = 'job_' + valname 196 197 if action == 'insert': 198 199 if not insert_col_str: 200 insert_col_str = column_name 201 else: 202 insert_col_str = insert_col_str + ',' + column_name 203 204 if not insert_val_str: 205 insert_val_str = value 206 else: 207 insert_val_str = insert_val_str + ",'%s'" %value 208 209 elif action == 'update': 210 211 if not update_str: 212 update_str = "%s='%s'" %(column_name, value) 213 else: 214 update_str = update_str + ",%s='%s'" %(column_name, value) 215 216 elif valname == 'nodes': 217 218 self.addNodes( value ) 219 #ids = self.getNodeIds( value ) 220 221 #for id in ids: 222 # self.addJobNode( job_id, id ) 223 224 if action == 'insert': 225 226 self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) ) 227 elif action == 'update': 228 229 self.setDatabase( "UPDATE jobs SET %s" %(update_str) ) 230 231 def addNodes( self, hostnames ): 232 233 for node in hostnames: 234 235 id = self.getNodeId( node ) 236 237 if not id: 238 self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node ) 239 240 def addJobNode( self, jobid, nodeid ): 241 242 self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) ) 243 244 def storeJobInfo( self, jobid, jobattrs ): 245 246 self.addJob( jobid, jobattrs ) 87 247 88 248 class RRDMutator: … … 227 387 228 388 jobAttrs = { } 389 jobs_to_store = [ ] 390 391 def __init__( self ): 392 393 self.ds = DataSQLStore( TOGA_SQL_DBASE.split( '/' )[0], TOGA_SQL_DBASE.split( '/' )[1] ) 229 394 230 395 def startElement( self, name, attrs ): … … 260 425 for myval in valinfo: 261 426 262 valname = myval.split( '=' )[0] 263 value = myval.split( '=' )[1] 264 265 if valname == 'nodes': 266 value = value.split( ';' ) 267 268 jobinfo[ valname ] = value 427 if len( myval.split( '=' ) ) > 1: 428 429 valname = myval.split( '=' )[0] 430 value = myval.split( '=' )[1] 431 432 if valname == 'nodes': 433 value = value.split( ';' ) 434 self.ds.addNodes( value ) 435 436 jobinfo[ valname ] = value 269 437 270 438 if check_change: 271 439 if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ): 272 440 self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo ) 441 if not job_id in self.jobs_to_store: 442 self.jobs_to_store.append( job_id ) 443 273 444 debug_msg( 0, 'jobinfo for job %s has changed' %job_id ) 274 445 else: 275 446 self.jobAttrs[ job_id ] = jobinfo 447 448 if not job_id in self.jobs_to_store: 449 self.jobs_to_store.append( job_id ) 450 276 451 debug_msg( 0, 'jobinfo for job %s has changed' %job_id ) 277 452 … … 288 463 self.jobAttrs[ jobid ]['status'] = 'F' 289 464 self.jobAttrs[ jobid ]['stop_timestamp'] = str( int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] ) ) 465 if not jobid in self.jobs_to_store: 466 self.jobs_to_store.append( jobid ) 467 468 for jobid in self.jobs_to_store: 469 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 470 471 self.jobs_to_store = [ ] 290 472 291 473 def setJobAttrs( self, old, new ):
Note: See TracChangeset
for help on using the changeset viewer.