Changeset 275
- Timestamp:
- 02/14/12 09:38:49 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/PBSQuery.py
r264 r275 4 4 # 5 5 # SVN INFO: 6 # 6 # $Id$ 7 7 # 8 8 """ … … 25 25 There are the following functions for PBSQuery: 26 26 job - 27 28 27 getjob(job_id, attributes=<default is all>) 28 getjobs(attributes=<default is all>) 29 29 30 30 node - 31 32 31 getnode(node_id, attributes=<default is all>) 32 getnodes(attributes=<default is all>) 33 33 34 34 queue - 35 36 35 getqueue(queue_id, attributes=<default is all>) 36 getqueues(attributes=<default is all>) 37 37 38 38 server - 39 39 get_serverinfo(attributes=<default is all>) 40 40 41 41 Here is an example how to use the module: 42 43 44 45 46 47 48 49 50 51 52 53 42 from PBSQuery import PBSQuery 43 p = PBSQuery() 44 nodes = p.getnodes() 45 for name,node in nodes.items(): 46 print name 47 if node.is_free(): 48 print node, node['state'] 49 50 l = [ 'state', 'np' ] 51 nodes = p.getnodes(l) 52 for name,node in nodes.items(): 53 print node, node['state'] 54 54 55 55 The parameter 'attributes' is an python list of resources that 56 56 you are interested in, eg: only show state of nodes 57 57 l = list() 58 59 58 l.append('state') 59 nodes = p.getnodes(l) 60 60 """ 61 61 import pbs … … 67 67 68 68 class PBSError(Exception): 69 70 71 72 73 74 75 76 69 def __init__(self, msg=''): 70 self.msg = msg 71 Exception.__init__(self, msg) 72 73 def __repr__(self): 74 return self.msg 75 76 __str__ = __repr__ 77 77 78 78 79 79 class PBSQuery: 80 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 81 # a[key] = value, key and value are data type string 82 # 83 OLD_DATA_STRUCTURE = False 84 85 def __init__(self, server=None): 86 if not server: 87 self.server = pbs.pbs_default() 88 else: 89 self.server = server 90 91 def _connect(self): 92 """Connect to the PBS/Torque server""" 93 self.con = pbs.pbs_connect(self.server) 94 if self.con < 0: 95 str = "Could not make a connection with %s\n" %(self.server) 96 raise PBSError(str) 97 98 def _disconnect(self): 99 """Close the PBS/Torque connection""" 100 pbs.pbs_disconnect(self.con) 101 self.attribs = 'NULL' 102 103 def _list_2_attrib(self, list): 104 """Convert a python list to an attrib list suitable for pbs""" 105 self.attribs = pbs.new_attrl( len(list) ) 106 i = 0 107 for attrib in list: 108 # So we can user Resource 109 attrib = attrib.split('.') 110 self.attribs[i].name = attrib[0] 111 i = i + 1 112 113 def _pbsstr_2_list(self, str, delimiter): 114 """Convert a string to a python list and use delimiter as spit char""" 115 l = sting.splitfields(str, delimiter) 116 if len(l) > 1: 117 return l 118 119 def _list_2_dict(self, l, class_func): 120 """ 121 Convert a pbsstat function list to a class dictionary, The 122 data structure depends on the function new_data_structure(). 123 124 Default data structure is: 125 class[key] = value, Where key and value are of type string 126 127 Future release, can be set by new_data_structure(): 128 - class[key] = value where value can be: 129 1. a list of values of type string 130 2. a dictionary with as list of values of type string. If 131 values contain a '=' character 132 133 eg: 134 print node['np'] 135 >> [ '2' ] 136 137 print node['status']['arch'] 138 >> [ 'x86_64' ] 139 """ 140 self.d = {} 141 for item in l: 142 new = class_func() 143 144 self.d[item.name] = new 145 146 new.name = item.name 147 148 for a in item.attribs: 149 150 if self.OLD_DATA_STRUCTURE: 151 152 if a.resource: 153 key = '%s.%s' %(a.name, a.resource) 154 else: 155 key = '%s' %(a.name) 156 157 new[key] = a.value 158 159 else: 160 values = string.split(a.value, ',') 161 sub_dict = string.split(a.value, '=') 162 163 164 # We must creat sub dicts, only for specified 165 # key values 166 # 167 if a.name in ['status', 'Variable_List']: 168 169 for v in values: 170 171 tmp_l = v.split('=') 172 173 ## Support for multiple EVENT mesages in format [key=value:]+ 174 # format eg: message=EVENT:sample.time=1288864220.003:cputotals.user=0 175 # message=ERROR <text> 176 # 177 if tmp_l[0] in ['message']: 178 179 if tmp_l[1].startswith('EVENT:'): 180 181 tmp_d = dict() 182 new['event'] = class_func(tmp_d) 183 184 message_list = v.split(':') 185 for event_type in message_list[1:]: 186 tmp_l = event_type.split('=') 187 new['event'][ tmp_l[0] ] = tmp_l[1:] 188 189 else: 190 ## ERROR message 191 # 192 new['error'] = tmp_l [1:] 193 194 ## continue with next status value 195 # 196 continue 197 198 199 ## Check if we already added the key 200 # 201 if new.has_key(a.name): 202 new[a.name][ tmp_l[0] ] = tmp_l[1:] 203 204 else: 205 tmp_d = dict() 206 tmp_d[ tmp_l[0] ] = tmp_l[1:] 207 new[a.name] = class_func(tmp_d) 208 209 else: 210 ## Check if it is a resource type variable, eg: 211 # - Resource_List.(nodes, walltime, ..) 212 # 213 if a.resource: 214 215 if new.has_key(a.name): 216 new[a.name][a.resource] = values 217 218 else: 219 tmp_d = dict() 220 tmp_d[a.resource] = values 221 new[a.name] = class_func(tmp_d) 222 else: 223 # Simple value 224 # 225 new[a.name] = values 226 227 self._free(l) 228 229 def _free(self, memory): 230 """ 231 freeing up used memmory 232 233 """ 234 pbs.pbs_statfree(memory) 235 236 def _statserver(self, attrib_list=None): 237 """Get the server config from the pbs server""" 238 if attrib_list: 239 self._list_2_attrib(attrib_list) 240 else: 241 self.attribs = 'NULL' 242 243 self._connect() 244 serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL') 245 self._disconnect() 246 247 self._list_2_dict(serverinfo, server) 248 249 def get_serverinfo(self, attrib_list=None): 250 self._statserver(attrib_list) 251 return self.d 252 253 def _statqueue(self, queue_name='', attrib_list=None): 254 """Get the queue config from the pbs server""" 255 if attrib_list: 256 self._list_2_attrib(attrib_list) 257 else: 258 self.attribs = 'NULL' 259 260 self._connect() 261 queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL') 262 self._disconnect() 263 264 self._list_2_dict(queues, queue) 265 266 def getqueue(self, name, attrib_list=None): 267 self._statqueue(name, attrib_list) 268 try: 269 return self.d[name] 270 except KeyError, detail: 271 return self.d 272 273 def getqueues(self, attrib_list=None): 274 self._statqueue('', attrib_list) 275 return self.d 276 277 def _statnode(self, select='', attrib_list=None, property=None): 278 """Get the node config from the pbs server""" 279 if attrib_list: 280 self._list_2_attrib(attrib_list) 281 else: 282 self.attribs = 'NULL' 283 284 if property: 285 select = ':%s' %(property) 286 287 self._connect() 288 nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL') 289 self._disconnect() 290 291 self._list_2_dict(nodes, node) 292 293 def getnode(self, name, attrib_list=None): 294 self._statnode(name, attrib_list) 295 try: 296 return self.d[name] 297 except KeyError, detail: 298 return self.d 299 300 def getnodes(self, attrib_list=None): 301 self._statnode('', attrib_list) 302 return self.d 303 304 def getnodes_with_property(self, property, attrib_list=None): 305 self._statnode('', attrib_list, property) 306 return self.d 307 308 def _statjob(self, job_name='', attrib_list=None): 309 """Get the job config from the pbs server""" 310 if attrib_list: 311 self._list_2_attrib(attrib_list) 312 else: 313 self.attribs = 'NULL' 314 315 self._connect() 316 jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL') 317 self._disconnect() 318 319 self._list_2_dict(jobs, job) 320 321 def getjob(self, name, attrib_list=None): 322 # To make sure we use the full name of a job; Changes a name 323 # like 1234567 into 1234567.server.name 324 name = name.split('.')[0] + "." + self.get_server_name() 325 326 self._statjob(name, attrib_list) 327 try: 328 return self.d[name] 329 except KeyError, detail: 330 return self.d 331 332 def getjobs(self, attrib_list=None): 333 self._statjob('', attrib_list) 334 return self.d 335 336 def get_server_name(self): 337 return self.server 338 339 def new_data_structure(self): 340 """ 341 Use the new data structure. Is now the default 342 """ 343 self.OLD_DATA_STRUCTURE = False 344 345 def old_data_structure(self): 346 """ 347 Use the old data structure. This function is obselete and 348 will be removed in a future release 349 """ 350 self.OLD_DATA_STRUCTURE = True 351 351 352 352 class _PBSobject(UserDict.UserDict): 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 #"""393 #override the class attribute set method only when the UserDict394 #has set its class attribute395 #"""396 #if self.__dict__.has_key('data'):397 #self.data[name] = value398 #else:399 #self.__dict__[name] = value400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 353 TRUE = 1 354 FALSE = 0 355 356 def __init__(self, dictin = None): 357 UserDict.UserDict.__init__(self) 358 self.name = None 359 360 if dictin: 361 if dictin.has_key('name'): 362 self.name = dictin['name'] 363 del dictin['name'] 364 self.data = dictin 365 366 def get_value(self, key): 367 if self.has_key(key): 368 return self[key] 369 else: 370 return None 371 372 def __repr__(self): 373 return repr(self.data) 374 375 def __str__(self): 376 return str(self.data) 377 378 def __getattr__(self, name): 379 """ 380 override the class attribute get method. Return the value 381 from the Userdict 382 """ 383 try: 384 return self.data[name] 385 except KeyError: 386 error = 'Attribute key error: %s' %(name) 387 raise PBSError(error) 388 389 ## Disabled for this moment, BvdV 16 July 2010 390 # 391 #def __setattr__(self, name, value): 392 # """ 393 # override the class attribute set method only when the UserDict 394 # has set its class attribute 395 # """ 396 # if self.__dict__.has_key('data'): 397 # self.data[name] = value 398 # else: 399 # self.__dict__[name] = value 400 401 def __iter__(self): 402 return iter(self.data.keys()) 403 404 def uniq(self, list): 405 """Filter out unique items of a list""" 406 uniq_items = {} 407 for item in list: 408 uniq_items[item] = 1 409 return uniq_items.keys() 410 411 def return_value(self, key): 412 """Function that returns a value independent of new or old data structure""" 413 if isinstance(self[key], types.ListType): 414 return self[key][0] 415 else: 416 return self[key] 417 417 418 418 class job(_PBSobject): 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 419 """PBS job class""" 420 def is_running(self): 421 422 value = self.return_value('job_state') 423 if value == 'Q': 424 return self.TRUE 425 else: 426 return self.FALSE 427 428 def get_nodes(self, unique=None): 429 """ 430 Returns a list of the nodes which run this job 431 format: 432 * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0 433 * split on '+' and if uniq is set split on '/' 434 """ 435 nodes = self.get_value('exec_host') 436 437 if isinstance(nodes, str): 438 if nodes: 439 nodelist = string.split(nodes,'+') 440 if not unique: 441 return nodelist 442 else: 443 l = list() 444 445 for n in nodelist: 446 t = string.split(n,'/') 447 if t[0] not in l: 448 l.append(t[0]) 449 450 return l 451 452 else: 453 return list() 454 else: 455 l = list() 456 for n in nodes: 457 458 nlist = string.split(n,'+') 459 460 if unique: 461 for entry in nlist: 462 463 t = string.split(entry,'/') 464 if t[0] not in l: 465 l.append(t[0]) 466 else: 467 l += nlist 468 469 return l 470 471 471 472 472 class node(_PBSobject): 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 if jobs: 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 473 """PBS node class""" 474 475 def is_free(self): 476 """Check if node is free""" 477 478 value = self.return_value('state') 479 if value == 'free': 480 return self.TRUE 481 else: 482 return self.FALSE 483 484 def has_job(self): 485 """Does the node run a job""" 486 try: 487 a = self['jobs'] 488 return self.TRUE 489 except KeyError, detail: 490 return self.FALSE 491 492 def get_jobs(self, unique=None): 493 """Returns a list of the currently running job-id('s) on the node""" 494 495 jobs = self.get_value('jobs') 496 if jobs: 497 if isinstance(jobs, str): 498 jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs ) 499 500 if not unique: 501 return jlist 502 else: 503 return self.uniq(jlist) 504 505 else: 506 job_re = re.compile('^(?:\d+/)?(.+)') 507 l = list() 508 509 if unique: 510 for j in jobs: 511 jobstr = job_re.findall(j.strip())[0] 512 if jobstr not in l: 513 l.append(jobstr) 514 515 return l 516 else: 517 return jobs 518 519 return list() 520 520 521 521 522 522 class queue(_PBSobject): 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 523 """PBS queue class""" 524 def is_enabled(self): 525 526 value = self.return_value('enabled') 527 if value == 'True': 528 return self.TRUE 529 else: 530 return self.FALSE 531 532 def is_execution(self): 533 534 value = self.return_value('queue_type') 535 if value == 'Execution': 536 return self.TRUE 537 else: 538 return self.FALSE 539 539 540 540 class server(_PBSobject): 541 542 543 544 541 """PBS server class""" 542 543 def get_version(self): 544 return self.get_value('pbs_version') 545 545 546 546 def main(): 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 547 p = PBSQuery() 548 serverinfo = p.get_serverinfo() 549 for server in serverinfo.keys(): 550 print server, ' version: ', serverinfo[server].get_version() 551 for resource in serverinfo[server].keys(): 552 print '\t ', resource, ' = ', serverinfo[server][resource] 553 554 queues = p.getqueues() 555 for queue in queues.keys(): 556 print queue 557 if queues[queue].is_execution(): 558 print '\t ', queues[queue] 559 if queues[queue].has_key('acl_groups'): 560 print '\t acl_groups: yes' 561 else: 562 print '\t acl_groups: no' 563 564 jobs = p.getjobs() 565 for name,job in jobs.items(): 566 if job.is_running(): 567 print job 568 569 l = ['state'] 570 nodes = p.getnodes(l) 571 for name,node in nodes.items(): 572 if node.is_free(): 573 print node 574 574 575 575 if __name__ == "__main__": 576 576 main()
Note: See TracChangeset
for help on using the changeset viewer.