source: trunk/src/PBSQuery.py @ 279

Last change on this file since 279 was 279, checked in by bas, 12 years ago

fix a bug in getjob could not hamdle short jobname

  • Property svn:keywords set to Id
File size: 16.6 KB
Line 
1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
6#   $Id: PBSQuery.py 279 2012-04-27 07:56:31Z bas $
7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
24
25There are the following functions for PBSQuery:
26  job -
27    getjob(job_id, attributes=<default is all>)
28    getjobs(attributes=<default is all>)
29 
30  node -
31    getnode(node_id, attributes=<default is all>)
32    getnodes(attributes=<default is all>)
33
34  queue -
35    getqueue(queue_id, attributes=<default is all>)
36    getqueues(attributes=<default is all>)
37
38  server -
39    get_serverinfo(attributes=<default is all>)
40
41Here is an example how to use the module:
42    from PBSQuery import PBSQuery
43    p = PBSQuery()
44    nodes = p.getnodes()
45    for name,node in nodes.items():
46        print name
47        if node.is_free():
48           print node, node['state']
49
50    l = [ 'state', 'np' ]
51    nodes = p.getnodes(l)
52    for name,node in nodes.items():
53           print node, node['state']
54
55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
58    l.append('state')
59    nodes = p.getnodes(l)
60"""
61import pbs
62import UserDict
63import string
64import sys
65import re
66import types
67
68class PBSError(Exception):
69    def __init__(self, msg=''):
70        self.msg = msg
71        Exception.__init__(self, msg)
72       
73    def __repr__(self):
74        return self.msg
75
76    __str__ = __repr__
77
78
79class PBSQuery:
80
81    # a[key] = value, key and value are data type string
82    #
83    OLD_DATA_STRUCTURE = False
84
85    def __init__(self, server=None):
86        if not server:
87            self.server = pbs.pbs_default()
88        else:
89            self.server = server
90
91        self._connect()
92        ## this is needed for getjob a jobid is made off:
93        #    sequence_number.server (is not self.server)
94        #
95        self.job_server_id = list(self.get_serverinfo())[0] 
96        self._disconnect()
97
98
99    def _connect(self):
100        """Connect to the PBS/Torque server"""
101        self.con = pbs.pbs_connect(self.server)
102        if self.con < 0:
103            str = "Could not make a connection with %s\n" %(self.server)
104            raise PBSError(str)
105
106    def _disconnect(self):
107        """Close the PBS/Torque connection"""
108        pbs.pbs_disconnect(self.con)
109        self.attribs = 'NULL'
110
111    def _list_2_attrib(self, list):
112        """Convert a python list to an attrib list suitable for pbs"""
113        self.attribs = pbs.new_attrl( len(list) )
114        i = 0 
115        for attrib in list:
116            # So we can user Resource
117            attrib = attrib.split('.')
118            self.attribs[i].name = attrib[0]
119            i = i + 1
120
121    def _pbsstr_2_list(self, str, delimiter):
122        """Convert a string to a python list and use delimiter as spit char"""
123        l = sting.splitfields(str, delimiter)
124        if len(l) > 1:
125            return l
126
127    def _list_2_dict(self, l, class_func):
128        """
129        Convert a pbsstat function list to a class dictionary, The
130        data structure depends on the function new_data_structure().
131       
132        Default data structure is:
133            class[key] = value, Where key and value are of type string
134
135        Future release, can be set by new_data_structure():
136            - class[key] = value where value can be:
137              1. a list of values of type string
138              2. a dictionary with as list of values of type string. If
139                 values contain a '=' character
140
141          eg:
142                print node['np']
143                >> [ '2' ]
144
145                print node['status']['arch']
146                >> [ 'x86_64' ]
147        """
148        self.d = {}
149        for item in l:
150            new = class_func()
151
152            self.d[item.name] = new
153           
154            new.name = item.name
155
156            for a in item.attribs:
157
158                if self.OLD_DATA_STRUCTURE:
159
160                    if a.resource:
161                        key = '%s.%s' %(a.name, a.resource)
162                    else:
163                        key = '%s' %(a.name)
164
165                    new[key] = a.value
166
167                else:
168                    values = string.split(a.value, ',') 
169                    sub_dict = string.split(a.value, '=')
170
171
172                    # We must creat sub dicts, only for specified
173                    # key values
174                    #
175                    if a.name in ['status', 'Variable_List']:
176
177                        for v in values:
178
179                            tmp_l = v.split('=')
180
181                            ## Support for multiple EVENT mesages in format [key=value:]+
182                            #  format eg: message=EVENT:sample.time=1288864220.003:cputotals.user=0
183                            #             message=ERROR <text>
184                            #
185                            if tmp_l[0] in ['message']:
186
187                                if tmp_l[1].startswith('EVENT:'):
188
189                                    tmp_d  = dict()
190                                    new['event'] = class_func(tmp_d)
191
192                                    message_list = v.split(':')
193                                    for event_type in message_list[1:]:
194                                        tmp_l = event_type.split('=')
195                                        new['event'][ tmp_l[0] ] = tmp_l[1:]
196
197                                else:
198                                    ## ERROR message
199                                    #
200                                    new['error'] = tmp_l [1:]
201
202                                ## continue with next status value
203                                #
204                                continue
205
206                                   
207                            ## Check if we already added the key
208                            #
209                            if new.has_key(a.name):
210                                new[a.name][ tmp_l[0] ] = tmp_l[1:]
211
212                            else:
213                                tmp_d  = dict()
214                                tmp_d[ tmp_l[0] ] = tmp_l[1:]
215                                new[a.name] = class_func(tmp_d) 
216
217                    else: 
218                        ## Check if it is a resource type variable, eg:
219                        #  - Resource_List.(nodes, walltime, ..)
220                        #
221                        if a.resource:
222
223                            if new.has_key(a.name):
224                                new[a.name][a.resource] = values
225
226                            else:
227                                tmp_d = dict()
228                                tmp_d[a.resource] = values
229                                new[a.name] = class_func(tmp_d) 
230                        else:
231                            # Simple value
232                            #
233                            new[a.name] = values
234
235        self._free(l)
236           
237    def _free(self, memory):
238        """
239        freeing up used memmory
240
241        """
242        pbs.pbs_statfree(memory)
243
244    def _statserver(self, attrib_list=None):
245        """Get the server config from the pbs server"""
246        if attrib_list:
247            self._list_2_attrib(attrib_list)
248        else:
249            self.attribs = 'NULL' 
250           
251        self._connect()
252        serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
253        self._disconnect() 
254       
255        self._list_2_dict(serverinfo, server)
256
257    def get_serverinfo(self, attrib_list=None):
258        self._statserver(attrib_list)
259        return self.d
260
261    def _statqueue(self, queue_name='', attrib_list=None):
262        """Get the queue config from the pbs server"""
263        if attrib_list:
264            self._list_2_attrib(attrib_list)
265        else:
266            self.attribs = 'NULL' 
267           
268        self._connect()
269        queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
270        self._disconnect() 
271       
272        self._list_2_dict(queues, queue)
273
274    def getqueue(self, name, attrib_list=None):
275        self._statqueue(name, attrib_list)
276        try:
277            return self.d[name]
278        except KeyError, detail:
279            return self.d
280       
281    def getqueues(self, attrib_list=None):
282        self._statqueue('', attrib_list)
283        return self.d
284
285    def _statnode(self, select='', attrib_list=None, property=None):
286        """Get the node config from the pbs server"""
287        if attrib_list:
288            self._list_2_attrib(attrib_list)
289        else:
290            self.attribs = 'NULL' 
291           
292        if property:
293            select = ':%s' %(property)
294
295        self._connect()
296        nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
297        self._disconnect() 
298       
299        self._list_2_dict(nodes, node)
300
301    def getnode(self, name, attrib_list=None):
302        self._statnode(name, attrib_list)
303        try:
304            return self.d[name]
305        except KeyError, detail:
306            return self.d
307       
308    def getnodes(self, attrib_list=None):
309        self._statnode('', attrib_list)
310        return self.d
311
312    def getnodes_with_property(self, property, attrib_list=None):
313        self._statnode('', attrib_list, property)
314        return self.d
315
316    def _statjob(self, job_name='', attrib_list=None):
317        """Get the job config from the pbs server"""
318        if attrib_list:
319            self._list_2_attrib(attrib_list)
320        else:
321            self.attribs = 'NULL' 
322           
323        self._connect()
324        jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
325        self._disconnect() 
326       
327        self._list_2_dict(jobs, job)
328
329    def getjob(self, name, attrib_list=None):
330        ## To make sure we use the full name of a job; Changes a name
331        # like 1234567 into 1234567.job_server_id
332        #
333        if len(name.split('.')) == 1 :
334            name = name.split('.')[0] + "." + self.job_server_id
335
336        self._statjob(name, attrib_list)
337        try:
338            return self.d[name]
339        except KeyError, detail:
340            return self.d
341       
342    def getjobs(self, attrib_list=None):
343        self._statjob('', attrib_list)
344        return self.d
345
346    def get_server_name(self):
347        return self.server
348
349    def new_data_structure(self): 
350        """
351        Use the new data structure. Is now the default
352        """
353        self.OLD_DATA_STRUCTURE = False
354
355    def old_data_structure(self): 
356        """
357        Use the old data structure. This function is obselete and
358        will be removed in a future release
359        """
360        self.OLD_DATA_STRUCTURE = True
361
362class _PBSobject(UserDict.UserDict):
363    TRUE  = 1
364    FALSE = 0
365
366    def __init__(self, dictin = None):
367        UserDict.UserDict.__init__(self)
368        self.name = None
369
370        if dictin:
371            if dictin.has_key('name'):
372                self.name = dictin['name']
373                del dictin['name']
374            self.data = dictin
375
376    def get_value(self, key):
377        if self.has_key(key):
378            return self[key]
379        else:
380            return None
381
382    def __repr__(self):
383        return repr(self.data)
384
385    def __str__(self):
386        return str(self.data)
387
388    def __getattr__(self, name):
389        """
390        override the class attribute get method. Return the value
391        from the Userdict
392        """
393        try:
394            return self.data[name]
395        except KeyError:
396            error = 'Attribute key error: %s' %(name)
397            raise PBSError(error)
398
399    ## Disabled for this moment, BvdV 16 July 2010
400    #
401    #def __setattr__(self, name, value):
402    #   """
403    #   override the class attribute set method only when the UserDict
404    #   has set its class attribute
405    #   """
406    #   if self.__dict__.has_key('data'):
407    #       self.data[name] = value
408    #   else:
409    #       self.__dict__[name] = value
410
411    def __iter__(self):
412        return iter(self.data.keys())
413
414    def uniq(self, list):
415        """Filter out unique items of a list"""
416        uniq_items = {}
417        for item in list:
418            uniq_items[item] = 1
419        return uniq_items.keys()
420
421    def return_value(self, key):
422        """Function that returns a value independent of new or old data structure"""
423        if isinstance(self[key], types.ListType):
424            return self[key][0]
425        else:
426            return self[key] 
427
428class job(_PBSobject):
429    """PBS job class""" 
430    def is_running(self):
431
432        value = self.return_value('job_state')
433        if value == 'Q':
434            return self.TRUE
435        else:
436            return self.FALSE
437
438    def get_nodes(self, unique=None):
439        """
440        Returns a list of the nodes which run this job
441        format:
442          * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
443          * split on '+' and if uniq is set split on '/'
444        """
445        nodes = self.get_value('exec_host')
446       
447        if isinstance(nodes, str):
448            if nodes:
449                nodelist = string.split(nodes,'+')
450                if not unique:
451                    return nodelist
452                else:
453                    l = list()
454
455                    for n in nodelist:
456                        t = string.split(n,'/')
457                        if t[0] not in l:
458                            l.append(t[0])
459
460                    return l
461
462            else:
463                return list()
464        else:
465                l = list()
466                for n in nodes:
467
468                    nlist = string.split(n,'+')
469
470                    if unique:
471                        for entry in nlist:
472
473                            t = string.split(entry,'/')
474                            if t[0] not in l:
475                                l.append(t[0])
476                    else:
477                        l += nlist
478
479                return l
480       
481
482class node(_PBSobject):
483    """PBS node class"""
484   
485    def is_free(self):
486        """Check if node is free"""
487
488        value = self.return_value('state')
489        if value == 'free':
490            return self.TRUE
491        else: 
492            return self.FALSE
493
494    def has_job(self):
495        """Does the node run a job"""
496        try:
497            a = self['jobs']
498            return self.TRUE
499        except KeyError, detail:
500            return self.FALSE
501   
502    def get_jobs(self, unique=None):
503        """Returns a list of the currently running job-id('s) on the node"""
504
505        jobs = self.get_value('jobs')
506        if jobs:   
507            if isinstance(jobs, str):
508                jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
509           
510                if not unique:
511                    return jlist
512                else:
513                    return self.uniq(jlist)
514
515            else:
516                job_re = re.compile('^(?:\d+/)?(.+)')
517                l = list()
518
519                if unique:
520                        for j in jobs:
521                            jobstr = job_re.findall(j.strip())[0]
522                            if jobstr not in l: 
523                                l.append(jobstr)           
524
525                        return l
526                else:
527                        return jobs
528
529        return list()
530
531
532class queue(_PBSobject):
533    """PBS queue class"""
534    def is_enabled(self):
535
536        value = self.return_value('enabled')
537        if value == 'True':
538            return self.TRUE
539        else:
540            return self.FALSE
541
542    def is_execution(self):
543
544        value = self.return_value('queue_type')
545        if value == 'Execution':
546            return self.TRUE
547        else:
548            return self.FALSE
549
550class server(_PBSobject):
551    """PBS server class"""
552
553    def get_version(self):
554        return self.get_value('pbs_version')
555
556def main():
557    p = PBSQuery() 
558    serverinfo = p.get_serverinfo()
559    for server in serverinfo.keys():
560        print server, ' version: ', serverinfo[server].get_version()
561    for resource in serverinfo[server].keys():
562        print '\t ', resource, ' = ', serverinfo[server][resource]
563
564    queues = p.getqueues()
565    for queue in queues.keys():
566        print queue
567        if queues[queue].is_execution():
568            print '\t ', queues[queue]
569        if queues[queue].has_key('acl_groups'):
570            print '\t acl_groups: yes'
571        else:
572            print '\t acl_groups: no'
573
574    jobs = p.getjobs()
575    for name,job in jobs.items():
576        if job.is_running():
577            print job
578
579    l = ['state']
580    nodes = p.getnodes(l)
581    for name,node in nodes.items():
582        if node.is_free(): 
583            print node
584
585if __name__ == "__main__":
586    main()
Note: See TracBrowser for help on using the repository browser.