source: trunk/src/PBSQuery.py @ 264

Last change on this file since 264 was 264, checked in by bas, 13 years ago

COPYING:

  • added GPL3 version

COPYING.LESSER:

  • renamed LICENSE.SARA

debian/copyright:

  • added some more info about the different licenses

src/PBSQuery.py:

  • some real code changes ;-)
  • Property svn:keywords set to Id
File size: 12.6 KB
Line 
1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
6#       $Id: PBSQuery.py 264 2010-11-05 08:08:25Z bas $
7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
24
25There are the following functions for PBSQuery:
26  job -
27        getjob(job_id, attributes=<default is all>)
28        getjobs(attributes=<default is all>)
29 
30  node -
31        getnode(node_id, attributes=<default is all>)
32        getnodes(attributes=<default is all>)
33
34  queue -
35        getqueue(queue_id, attributes=<default is all>)
36        getqueues(attributes=<default is all>)
37
38  server -
39        get_serverinfo(attributes=<default is all>)
40
41Here is an example how to use the module:
42        from PBSQuery import PBSQuery
43        p = PBSQuery()
44        nodes = p.getnodes()
45        for name,node in nodes.items():
46            print name
47            if node.is_free():
48               print node, node['state']
49
50        l = [ 'state', 'np' ]
51        nodes = p.getnodes(l)
52        for name,node in nodes.items():
53               print node, node['state']
54
55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
58        l.append('state')
59        nodes = p.getnodes(l)
60"""
61import pbs
62import UserDict
63import string
64import sys
65import re
66import types
67
68class PBSError(Exception):
69        def __init__(self, msg=''):
70                self.msg = msg
71                Exception.__init__(self, msg)
72               
73        def __repr__(self):
74                return self.msg
75
76        __str__ = __repr__
77
78
79class PBSQuery:
80
81        # a[key] = value, key and value are data type string
82        #
83        OLD_DATA_STRUCTURE = False
84
85        def __init__(self, server=None):
86                if not server:
87                        self.server = pbs.pbs_default()
88                else:
89                        self.server = server
90
91        def _connect(self):
92                """Connect to the PBS/Torque server"""
93                self.con = pbs.pbs_connect(self.server)
94                if self.con < 0:
95                        str = "Could not make a connection with %s\n" %(self.server)
96                        raise PBSError(str)
97
98        def _disconnect(self):
99                """Close the PBS/Torque connection"""
100                pbs.pbs_disconnect(self.con)
101                self.attribs = 'NULL'
102
103        def _list_2_attrib(self, list):
104                """Convert a python list to an attrib list suitable for pbs"""
105                self.attribs = pbs.new_attrl( len(list) )
106                i = 0 
107                for attrib in list:
108                        # So we can user Resource
109                        attrib = attrib.split('.')
110                        self.attribs[i].name = attrib[0]
111                        i = i + 1
112
113        def _pbsstr_2_list(self, str, delimiter):
114                """Convert a string to a python list and use delimiter as spit char"""
115                l = sting.splitfields(str, delimiter)
116                if len(l) > 1:
117                        return l
118
119        def _list_2_dict(self, l, class_func):
120                """
121                Convert a pbsstat function list to a class dictionary, The
122                data structure depends on the function new_data_structure().
123               
124                Default data structure is:
125                        class[key] = value, Where key and value are of type string
126
127                Future release, can be set by new_data_structure():
128                        - class[key] = value where value can be:
129                          1. a list of values of type string
130                          2. a dictionary with as list of values of type string. If
131                             values contain a '=' character
132
133                  eg:
134                            print node['np']
135                                >> [ '2' ]
136
137                                print node['status']['arch']
138                                >> [ 'x86_64' ]
139                """
140                self.d = {}
141                for item in l:
142                        new = class_func()
143
144                        self.d[item.name] = new
145                       
146                        new.name = item.name
147
148                        for a in item.attribs:
149
150                                if self.OLD_DATA_STRUCTURE:
151
152                                        if a.resource:
153                                                key = '%s.%s' %(a.name, a.resource)
154                                        else:
155                                                key = '%s' %(a.name)
156
157                                        new[key] = a.value
158
159                                else:
160                                        values = string.split(a.value, ',') 
161                                        sub_dict = string.split(a.value, '=')
162
163
164                                        # We must creat sub dicts, only for specified
165                                        # key values
166                                        #
167                                        if a.name in ['status', 'Variable_List']:
168
169                                                for v in values:
170
171                                                        tmp_l = v.split('=')
172
173                                                        ## Support for multiple EVENT mesages in format [key=value:]+
174                                                        #  format eg: message=EVENT:sample.time=1288864220.003:cputotals.user=0
175                                                        #             message=ERROR <text>
176                                                        #
177                                                        if tmp_l[0] in ['message']:
178
179                                                                if tmp_l[1].startswith('EVENT:'):
180
181                                                                        tmp_d  = dict()
182                                                                        new['event'] = class_func(tmp_d)
183
184                                                                        message_list = v.split(':')
185                                                                        for event_type in message_list[1:]:
186                                                                                tmp_l = event_type.split('=')
187                                                                                new['event'][ tmp_l[0] ] = tmp_l[1:]
188
189                                                                else:
190                                                                        ## ERROR message
191                                                                        #
192                                                                        new['error'] = tmp_l [1:]
193
194                                                                ## continue with next status value
195                                                                #
196                                                                continue
197
198                                                                       
199                                                        ## Check if we already added the key
200                                                        #
201                                                        if new.has_key(a.name):
202                                                                new[a.name][ tmp_l[0] ] = tmp_l[1:]
203
204                                                        else:
205                                                                tmp_d  = dict()
206                                                                tmp_d[ tmp_l[0] ] = tmp_l[1:]
207                                                                new[a.name] = class_func(tmp_d) 
208
209                                        else: 
210                                                ## Check if it is a resource type variable, eg:
211                                                #  - Resource_List.(nodes, walltime, ..)
212                                                #
213                                                if a.resource:
214
215                                                        if new.has_key(a.name):
216                                                                new[a.name][a.resource] = values
217
218                                                        else:
219                                                                tmp_d = dict()
220                                                                tmp_d[a.resource] = values
221                                                                new[a.name] = class_func(tmp_d) 
222                                                else:
223                                                        # Simple value
224                                                        #
225                                                        new[a.name] = values
226
227                self._free(l)
228               
229        def _free(self, memory):
230                """
231                freeing up used memmory
232
233                """
234                pbs.pbs_statfree(memory)
235
236        def _statserver(self, attrib_list=None):
237                """Get the server config from the pbs server"""
238                if attrib_list:
239                        self._list_2_attrib(attrib_list)
240                else:
241                        self.attribs = 'NULL' 
242                       
243                self._connect()
244                serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
245                self._disconnect() 
246               
247                self._list_2_dict(serverinfo, server)
248
249        def get_serverinfo(self, attrib_list=None):
250                self._statserver(attrib_list)
251                return self.d
252
253        def _statqueue(self, queue_name='', attrib_list=None):
254                """Get the queue config from the pbs server"""
255                if attrib_list:
256                        self._list_2_attrib(attrib_list)
257                else:
258                        self.attribs = 'NULL' 
259                       
260                self._connect()
261                queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
262                self._disconnect() 
263               
264                self._list_2_dict(queues, queue)
265
266        def getqueue(self, name, attrib_list=None):
267                self._statqueue(name, attrib_list)
268                try:
269                        return self.d[name]
270                except KeyError, detail:
271                        return self.d
272       
273        def getqueues(self, attrib_list=None):
274                self._statqueue('', attrib_list)
275                return self.d
276
277        def _statnode(self, select='', attrib_list=None, property=None):
278                """Get the node config from the pbs server"""
279                if attrib_list:
280                        self._list_2_attrib(attrib_list)
281                else:
282                        self.attribs = 'NULL' 
283                       
284                if property:
285                        select = ':%s' %(property)
286
287                self._connect()
288                nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
289                self._disconnect() 
290               
291                self._list_2_dict(nodes, node)
292
293        def getnode(self, name, attrib_list=None):
294                self._statnode(name, attrib_list)
295                try:
296                        return self.d[name]
297                except KeyError, detail:
298                        return self.d
299       
300        def getnodes(self, attrib_list=None):
301                self._statnode('', attrib_list)
302                return self.d
303
304        def getnodes_with_property(self, property, attrib_list=None):
305                self._statnode('', attrib_list, property)
306                return self.d
307
308        def _statjob(self, job_name='', attrib_list=None):
309                """Get the job config from the pbs server"""
310                if attrib_list:
311                        self._list_2_attrib(attrib_list)
312                else:
313                        self.attribs = 'NULL' 
314                       
315                self._connect()
316                jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
317                self._disconnect() 
318               
319                self._list_2_dict(jobs, job)
320
321        def getjob(self, name, attrib_list=None):
322                # To make sure we use the full name of a job; Changes a name
323                # like 1234567 into 1234567.server.name
324                name = name.split('.')[0] + "." + self.get_server_name()
325
326                self._statjob(name, attrib_list)
327                try:
328                        return self.d[name]
329                except KeyError, detail:
330                        return self.d
331       
332        def getjobs(self, attrib_list=None):
333                self._statjob('', attrib_list)
334                return self.d
335
336        def get_server_name(self):
337                return self.server
338
339        def new_data_structure(self): 
340                """
341                Use the new data structure. Is now the default
342                """
343                self.OLD_DATA_STRUCTURE = False
344
345        def old_data_structure(self): 
346                """
347                Use the old data structure. This function is obselete and
348                will be removed in a future release
349                """
350                self.OLD_DATA_STRUCTURE = True
351
352class _PBSobject(UserDict.UserDict):
353        TRUE  = 1
354        FALSE = 0
355
356        def __init__(self, dictin = None):
357                UserDict.UserDict.__init__(self)
358                self.name = None
359
360                if dictin:
361                        if dictin.has_key('name'):
362                                self.name = dictin['name']
363                                del dictin['name']
364                        self.data = dictin
365
366        def get_value(self, key):
367                if self.has_key(key):
368                        return self[key]
369                else:
370                        return None
371
372        def __repr__(self):
373                return repr(self.data)
374
375        def __str__(self):
376                return str(self.data)
377
378        def __getattr__(self, name):
379                """
380                override the class attribute get method. Return the value
381                from the Userdict
382                """
383                try:
384                        return self.data[name]
385                except KeyError:
386                        error = 'Attribute key error: %s' %(name)
387                        raise PBSError(error)
388
389        ## Disabled for this moment, BvdV 16 July 2010
390        #
391        #def __setattr__(self, name, value):
392        #       """
393        #       override the class attribute set method only when the UserDict
394        #       has set its class attribute
395        #       """
396        #       if self.__dict__.has_key('data'):
397        #               self.data[name] = value
398        #       else:
399        #               self.__dict__[name] = value
400
401        def __iter__(self):
402                return iter(self.data.keys())
403
404        def uniq(self, list):
405                """Filter out unique items of a list"""
406                uniq_items = {}
407                for item in list:
408                        uniq_items[item] = 1
409                return uniq_items.keys()
410
411        def return_value(self, key):
412                """Function that returns a value independent of new or old data structure"""
413                if isinstance(self[key], types.ListType):
414                        return self[key][0]
415                else:
416                        return self[key] 
417
418class job(_PBSobject):
419        """PBS job class""" 
420        def is_running(self):
421
422                value = self.return_value('job_state')
423                if value == 'Q':
424                        return self.TRUE
425                else:
426                        return self.FALSE
427
428        def get_nodes(self, unique=None):
429                """
430                Returns a list of the nodes which run this job
431                format:
432                  * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
433                  * split on '+' and if uniq is set split on '/'
434                """
435                nodes = self.get_value('exec_host')
436               
437                if isinstance(nodes, str):
438                        if nodes:
439                                nodelist = string.split(nodes,'+')
440                                if not unique:
441                                        return nodelist
442                                else:
443                                        l = list()
444
445                                        for n in nodelist:
446                                                t = string.split(n,'/')
447                                                if t[0] not in l:
448                                                        l.append(t[0])
449
450                                        return l
451
452                        else:
453                                return list()
454                else:
455                                l = list()
456                                for n in nodes:
457
458                                        nlist = string.split(n,'+')
459
460                                        if unique:
461                                                for entry in nlist:
462
463                                                        t = string.split(entry,'/')
464                                                        if t[0] not in l:
465                                                                l.append(t[0])
466                                        else:
467                                                l += nlist
468
469                                return l
470               
471
472class node(_PBSobject):
473        """PBS node class"""
474       
475        def is_free(self):
476                """Check if node is free"""
477
478                value = self.return_value('state')
479                if value == 'free':
480                        return self.TRUE
481                else: 
482                        return self.FALSE
483
484        def has_job(self):
485                """Does the node run a job"""
486                try:
487                        a = self['jobs']
488                        return self.TRUE
489                except KeyError, detail:
490                        return self.FALSE
491       
492        def get_jobs(self, unique=None):
493                """Returns a list of the currently running job-id('s) on the node"""
494
495                jobs = self.get_value('jobs')
496                if jobs:       
497                        if isinstance(jobs, str):
498                                jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
499                       
500                                if not unique:
501                                        return jlist
502                                else:
503                                        return self.uniq(jlist)
504
505                        else:
506                                job_re = re.compile('^(?:\d+/)?(.+)')
507                                l = list()
508
509                                if unique:
510                                                for j in jobs:
511                                                        jobstr = job_re.findall(j.strip())[0]
512                                                if jobstr not in l: 
513                                                        l.append(jobstr)           
514
515                                                return l
516                                else:
517                                                return jobs
518
519                return list()
520
521
522class queue(_PBSobject):
523        """PBS queue class"""
524        def is_enabled(self):
525
526                value = self.return_value('enabled')
527                if value == 'True':
528                        return self.TRUE
529                else:
530                        return self.FALSE
531
532        def is_execution(self):
533
534                value = self.return_value('queue_type')
535                if value == 'Execution':
536                        return self.TRUE
537                else:
538                        return self.FALSE
539
540class server(_PBSobject):
541        """PBS server class"""
542
543        def get_version(self):
544                return self.get_value('pbs_version')
545
546def main():
547        p = PBSQuery() 
548        serverinfo = p.get_serverinfo()
549        for server in serverinfo.keys():
550                print server, ' version: ', serverinfo[server].get_version()
551        for resource in serverinfo[server].keys():
552                print '\t ', resource, ' = ', serverinfo[server][resource]
553
554        queues = p.getqueues()
555        for queue in queues.keys():
556                print queue
557                if queues[queue].is_execution():
558                        print '\t ', queues[queue]
559                if queues[queue].has_key('acl_groups'):
560                        print '\t acl_groups: yes'
561                else:
562                        print '\t acl_groups: no'
563
564        jobs = p.getjobs()
565        for name,job in jobs.items():
566                if job.is_running():
567                        print job
568
569        l = ['state']
570        nodes = p.getnodes(l)
571        for name,node in nodes.items():
572                if node.is_free(): 
573                        print node
574
575if __name__ == "__main__":
576        main()
Note: See TracBrowser for help on using the repository browser.