source: trunk/src/PBSQuery.py @ 285

Last change on this file since 285 was 285, checked in by bas, 11 years ago

Fixed an error in status line for EVENT parseing

  • Property svn:keywords set to Id
File size: 16.8 KB
Line 
1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
6#   $Id: PBSQuery.py 285 2012-12-20 10:56:42Z bas $
7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
24
25There are the following functions for PBSQuery:
26  job -
27    getjob(job_id, attributes=<default is all>)
28    getjobs(attributes=<default is all>)
29 
30  node -
31    getnode(node_id, attributes=<default is all>)
32    getnodes(attributes=<default is all>)
33
34  queue -
35    getqueue(queue_id, attributes=<default is all>)
36    getqueues(attributes=<default is all>)
37
38  server -
39    get_serverinfo(attributes=<default is all>)
40
41Here is an example how to use the module:
42    from PBSQuery import PBSQuery
43    p = PBSQuery()
44    nodes = p.getnodes()
45    for name,node in nodes.items():
46        print name
47        if node.is_free():
48           print node, node['state']
49
50    l = [ 'state', 'np' ]
51    nodes = p.getnodes(l)
52    for name,node in nodes.items():
53           print node, node['state']
54
55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
58    l.append('state')
59    nodes = p.getnodes(l)
60"""
61import pbs
62import UserDict
63import string
64import sys
65import re
66import types
67
68class PBSError(Exception):
69    def __init__(self, msg=''):
70        self.msg = msg
71        Exception.__init__(self, msg)
72       
73    def __repr__(self):
74        return self.msg
75
76    __str__ = __repr__
77
78
79class PBSQuery:
80
81    # a[key] = value, key and value are data type string
82    #
83    OLD_DATA_STRUCTURE = False
84
85    def __init__(self, server=None):
86        if not server:
87            self.server = pbs.pbs_default()
88        else:
89            self.server = server
90
91        self._connect()
92        ## this is needed for getjob a jobid is made off:
93        #    sequence_number.server (is not self.server)
94        #
95        self.job_server_id = list(self.get_serverinfo())[0] 
96        self._disconnect()
97
98
99    def _connect(self):
100        """Connect to the PBS/Torque server"""
101        self.con = pbs.pbs_connect(self.server)
102        if self.con < 0:
103            str = "Could not make a connection with %s\n" %(self.server)
104            raise PBSError(str)
105
106    def _disconnect(self):
107        """Close the PBS/Torque connection"""
108        pbs.pbs_disconnect(self.con)
109        self.attribs = 'NULL'
110
111    def _list_2_attrib(self, list):
112        """Convert a python list to an attrib list suitable for pbs"""
113        self.attribs = pbs.new_attrl( len(list) )
114        i = 0 
115        for attrib in list:
116            # So we can user Resource
117            attrib = attrib.split('.')
118            self.attribs[i].name = attrib[0]
119            i = i + 1
120
121    def _pbsstr_2_list(self, str, delimiter):
122        """Convert a string to a python list and use delimiter as spit char"""
123        l = sting.splitfields(str, delimiter)
124        if len(l) > 1:
125            return l
126
127    def _list_2_dict(self, l, class_func):
128        """
129        Convert a pbsstat function list to a class dictionary, The
130        data structure depends on the function new_data_structure().
131       
132        Default data structure is:
133            class[key] = value, Where key and value are of type string
134
135        Future release, can be set by new_data_structure():
136            - class[key] = value where value can be:
137              1. a list of values of type string
138              2. a dictionary with as list of values of type string. If
139                 values contain a '=' character
140
141          eg:
142                print node['np']
143                >> [ '2' ]
144
145                print node['status']['arch']
146                >> [ 'x86_64' ]
147        """
148        self.d = {}
149        for item in l:
150            new = class_func()
151
152            self.d[item.name] = new
153           
154            new.name = item.name
155
156            for a in item.attribs:
157
158                if self.OLD_DATA_STRUCTURE:
159
160                    if a.resource:
161                        key = '%s.%s' %(a.name, a.resource)
162                    else:
163                        key = '%s' %(a.name)
164
165                    new[key] = a.value
166
167                else:
168                    values = string.split(a.value, ',') 
169                    sub_dict = string.split(a.value, '=')
170
171
172                    # We must creat sub dicts, only for specified
173                    # key values
174                    #
175                    if a.name in ['status', 'Variable_List']:
176
177                        for v in values:
178
179                            tmp_l = v.split('=')
180
181                            ## Support for multiple EVENT mesages in format [key=value:]+
182                            #  format eg: message=EVENT:sample.time=1288864220.003,EVENT:kernel=upgrade,cputotals.user=0
183                            #             message=ERROR <text>
184                            #
185                            if tmp_l[0] in ['message']:
186
187                                if tmp_l[1].startswith('EVENT:'):
188
189                                    tmp_d  = dict()
190                                    new['event'] = class_func(tmp_d)
191
192                                    message_list = v.split(':')
193                                    for event_type in message_list[1:]:
194                                        tmp_l = event_type.split('=')
195                                        new['event'][ tmp_l[0] ] = tmp_l[1:]
196
197                                else:
198
199                                    ## ERROR message
200                                    #
201                                    new['error'] = tmp_l [1:]
202
203                            elif tmp_l[0].startswith('EVENT:'): 
204
205                                  message_list = v.split(':')
206                                  for event_type in message_list[1:]:
207                                      tmp_l = event_type.split('=')
208                                      new['event'][ tmp_l[0] ] = tmp_l[1:]
209
210                            else:
211                                  ## Check if we already added the key
212                                  #
213                                  if new.has_key(a.name):
214
215                                      new[a.name][ tmp_l[0] ] = tmp_l[1:] 
216
217                                  else:
218
219                                      tmp_d  = dict()
220                                      tmp_d[ tmp_l[0] ] = tmp_l[1:]
221                                      new[a.name] = class_func(tmp_d) 
222
223                    else: 
224
225                        ## Check if it is a resource type variable, eg:
226                        #  - Resource_List.(nodes, walltime, ..)
227                        #
228                        if a.resource:
229
230                            if new.has_key(a.name):
231                                new[a.name][a.resource] = values
232
233                            else:
234                                tmp_d = dict()
235                                tmp_d[a.resource] = values
236                                new[a.name] = class_func(tmp_d) 
237                        else:
238                            # Simple value
239                            #
240                            new[a.name] = values
241
242        self._free(l)
243           
244    def _free(self, memory):
245        """
246        freeing up used memmory
247
248        """
249        pbs.pbs_statfree(memory)
250
251    def _statserver(self, attrib_list=None):
252        """Get the server config from the pbs server"""
253        if attrib_list:
254            self._list_2_attrib(attrib_list)
255        else:
256            self.attribs = 'NULL' 
257           
258        self._connect()
259        serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
260        self._disconnect() 
261       
262        self._list_2_dict(serverinfo, server)
263
264    def get_serverinfo(self, attrib_list=None):
265        self._statserver(attrib_list)
266        return self.d
267
268    def _statqueue(self, queue_name='', attrib_list=None):
269        """Get the queue config from the pbs server"""
270        if attrib_list:
271            self._list_2_attrib(attrib_list)
272        else:
273            self.attribs = 'NULL' 
274           
275        self._connect()
276        queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
277        self._disconnect() 
278       
279        self._list_2_dict(queues, queue)
280
281    def getqueue(self, name, attrib_list=None):
282        self._statqueue(name, attrib_list)
283        try:
284            return self.d[name]
285        except KeyError, detail:
286            return self.d
287       
288    def getqueues(self, attrib_list=None):
289        self._statqueue('', attrib_list)
290        return self.d
291
292    def _statnode(self, select='', attrib_list=None, property=None):
293        """Get the node config from the pbs server"""
294        if attrib_list:
295            self._list_2_attrib(attrib_list)
296        else:
297            self.attribs = 'NULL' 
298           
299        if property:
300            select = ':%s' %(property)
301
302        self._connect()
303        nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
304        self._disconnect() 
305       
306        self._list_2_dict(nodes, node)
307
308    def getnode(self, name, attrib_list=None):
309        self._statnode(name, attrib_list)
310        try:
311            return self.d[name]
312        except KeyError, detail:
313            return self.d
314       
315    def getnodes(self, attrib_list=None):
316        self._statnode('', attrib_list)
317        return self.d
318
319    def getnodes_with_property(self, property, attrib_list=None):
320        self._statnode('', attrib_list, property)
321        return self.d
322
323    def _statjob(self, job_name='', attrib_list=None):
324        """Get the job config from the pbs server"""
325        if attrib_list:
326            self._list_2_attrib(attrib_list)
327        else:
328            self.attribs = 'NULL' 
329           
330        self._connect()
331        jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
332        self._disconnect() 
333       
334        self._list_2_dict(jobs, job)
335
336    def getjob(self, name, attrib_list=None):
337        ## To make sure we use the full name of a job; Changes a name
338        # like 1234567 into 1234567.job_server_id
339        #
340        if len(name.split('.')) == 1 :
341            name = name.split('.')[0] + "." + self.job_server_id
342
343        self._statjob(name, attrib_list)
344        try:
345            return self.d[name]
346        except KeyError, detail:
347            return self.d
348       
349    def getjobs(self, attrib_list=None):
350        self._statjob('', attrib_list)
351        return self.d
352
353    def get_server_name(self):
354        return self.server
355
356    def new_data_structure(self): 
357        """
358        Use the new data structure. Is now the default
359        """
360        self.OLD_DATA_STRUCTURE = False
361
362    def old_data_structure(self): 
363        """
364        Use the old data structure. This function is obselete and
365        will be removed in a future release
366        """
367        self.OLD_DATA_STRUCTURE = True
368
369class _PBSobject(UserDict.UserDict):
370    TRUE  = 1
371    FALSE = 0
372
373    def __init__(self, dictin = None):
374        UserDict.UserDict.__init__(self)
375        self.name = None
376
377        if dictin:
378            if dictin.has_key('name'):
379                self.name = dictin['name']
380                del dictin['name']
381            self.data = dictin
382
383    def get_value(self, key):
384        if self.has_key(key):
385            return self[key]
386        else:
387            return None
388
389    def __repr__(self):
390        return repr(self.data)
391
392    def __str__(self):
393        return str(self.data)
394
395    def __getattr__(self, name):
396        """
397        override the class attribute get method. Return the value
398        from the Userdict
399        """
400        try:
401            return self.data[name]
402        except KeyError:
403            error = 'Attribute key error: %s' %(name)
404            raise PBSError(error)
405
406    ## Disabled for this moment, BvdV 16 July 2010
407    #
408    #def __setattr__(self, name, value):
409    #   """
410    #   override the class attribute set method only when the UserDict
411    #   has set its class attribute
412    #   """
413    #   if self.__dict__.has_key('data'):
414    #       self.data[name] = value
415    #   else:
416    #       self.__dict__[name] = value
417
418    def __iter__(self):
419        return iter(self.data.keys())
420
421    def uniq(self, list):
422        """Filter out unique items of a list"""
423        uniq_items = {}
424        for item in list:
425            uniq_items[item] = 1
426        return uniq_items.keys()
427
428    def return_value(self, key):
429        """Function that returns a value independent of new or old data structure"""
430        if isinstance(self[key], types.ListType):
431            return self[key][0]
432        else:
433            return self[key] 
434
435class job(_PBSobject):
436    """PBS job class""" 
437    def is_running(self):
438
439        value = self.return_value('job_state')
440        if value == 'Q':
441            return self.TRUE
442        else:
443            return self.FALSE
444
445    def get_nodes(self, unique=None):
446        """
447        Returns a list of the nodes which run this job
448        format:
449          * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
450          * split on '+' and if uniq is set split on '/'
451        """
452        nodes = self.get_value('exec_host')
453       
454        if isinstance(nodes, str):
455            if nodes:
456                nodelist = string.split(nodes,'+')
457                if not unique:
458                    return nodelist
459                else:
460                    l = list()
461
462                    for n in nodelist:
463                        t = string.split(n,'/')
464                        if t[0] not in l:
465                            l.append(t[0])
466
467                    return l
468
469            else:
470                return list()
471        else:
472                l = list()
473                for n in nodes:
474
475                    nlist = string.split(n,'+')
476
477                    if unique:
478                        for entry in nlist:
479
480                            t = string.split(entry,'/')
481                            if t[0] not in l:
482                                l.append(t[0])
483                    else:
484                        l += nlist
485
486                return l
487       
488
489class node(_PBSobject):
490    """PBS node class"""
491   
492    def is_free(self):
493        """Check if node is free"""
494
495        value = self.return_value('state')
496        if value == 'free':
497            return self.TRUE
498        else: 
499            return self.FALSE
500
501    def has_job(self):
502        """Does the node run a job"""
503        try:
504            a = self['jobs']
505            return self.TRUE
506        except KeyError, detail:
507            return self.FALSE
508   
509    def get_jobs(self, unique=None):
510        """Returns a list of the currently running job-id('s) on the node"""
511
512        jobs = self.get_value('jobs')
513        if jobs:   
514            if isinstance(jobs, str):
515                jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
516           
517                if not unique:
518                    return jlist
519                else:
520                    return self.uniq(jlist)
521
522            else:
523                job_re = re.compile('^(?:\d+/)?(.+)')
524                l = list()
525
526                if unique:
527                        for j in jobs:
528                            jobstr = job_re.findall(j.strip())[0]
529                            if jobstr not in l: 
530                                l.append(jobstr)           
531
532                        return l
533                else:
534                        return jobs
535
536        return list()
537
538
539class queue(_PBSobject):
540    """PBS queue class"""
541    def is_enabled(self):
542
543        value = self.return_value('enabled')
544        if value == 'True':
545            return self.TRUE
546        else:
547            return self.FALSE
548
549    def is_execution(self):
550
551        value = self.return_value('queue_type')
552        if value == 'Execution':
553            return self.TRUE
554        else:
555            return self.FALSE
556
557class server(_PBSobject):
558    """PBS server class"""
559
560    def get_version(self):
561        return self.get_value('pbs_version')
562
563def main():
564    p = PBSQuery() 
565    serverinfo = p.get_serverinfo()
566    for server in serverinfo.keys():
567        print server, ' version: ', serverinfo[server].get_version()
568    for resource in serverinfo[server].keys():
569        print '\t ', resource, ' = ', serverinfo[server][resource]
570
571    queues = p.getqueues()
572    for queue in queues.keys():
573        print queue
574        if queues[queue].is_execution():
575            print '\t ', queues[queue]
576        if queues[queue].has_key('acl_groups'):
577            print '\t acl_groups: yes'
578        else:
579            print '\t acl_groups: no'
580
581    jobs = p.getjobs()
582    for name,job in jobs.items():
583        if job.is_running():
584            print job
585
586    l = ['state']
587    nodes = p.getnodes(l)
588    for name,node in nodes.items():
589        if node.is_free(): 
590            print node
591
592if __name__ == "__main__":
593    main()
Note: See TracBrowser for help on using the repository browser.