source: trunk/src/PBSQuery.py @ 329

Last change on this file since 329 was 289, checked in by bas, 11 years ago

implemented nonzero for PBS dictionary, closes #33

  • Property svn:keywords set to Id
File size: 16.9 KB
RevLine 
[72]1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
[275]6#   $Id: PBSQuery.py 289 2013-02-26 09:22:59Z bas $
[72]7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
[75]12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
[72]16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
[75]22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
[72]24
25There are the following functions for PBSQuery:
26  job -
[275]27    getjob(job_id, attributes=<default is all>)
28    getjobs(attributes=<default is all>)
[72]29 
30  node -
[275]31    getnode(node_id, attributes=<default is all>)
32    getnodes(attributes=<default is all>)
[72]33
34  queue -
[275]35    getqueue(queue_id, attributes=<default is all>)
36    getqueues(attributes=<default is all>)
[72]37
38  server -
[275]39    get_serverinfo(attributes=<default is all>)
[72]40
41Here is an example how to use the module:
[275]42    from PBSQuery import PBSQuery
43    p = PBSQuery()
44    nodes = p.getnodes()
45    for name,node in nodes.items():
46        print name
47        if node.is_free():
48           print node, node['state']
[72]49
[275]50    l = [ 'state', 'np' ]
51    nodes = p.getnodes(l)
52    for name,node in nodes.items():
53           print node, node['state']
[75]54
[84]55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
[275]58    l.append('state')
59    nodes = p.getnodes(l)
[72]60"""
61import pbs
[139]62import UserDict
[143]63import string
[139]64import sys
[167]65import re
[195]66import types
[72]67
[99]68class PBSError(Exception):
[275]69    def __init__(self, msg=''):
70        self.msg = msg
71        Exception.__init__(self, msg)
72       
73    def __repr__(self):
74        return self.msg
[72]75
[275]76    __str__ = __repr__
[99]77
[139]78
[72]79class PBSQuery:
[188]80
[275]81    # a[key] = value, key and value are data type string
82    #
83    OLD_DATA_STRUCTURE = False
[188]84
[275]85    def __init__(self, server=None):
86        if not server:
87            self.server = pbs.pbs_default()
88        else:
89            self.server = server
[72]90
[279]91        self._connect()
92        ## this is needed for getjob a jobid is made off:
93        #    sequence_number.server (is not self.server)
94        #
95        self.job_server_id = list(self.get_serverinfo())[0] 
96        self._disconnect()
97
98
[275]99    def _connect(self):
100        """Connect to the PBS/Torque server"""
101        self.con = pbs.pbs_connect(self.server)
102        if self.con < 0:
103            str = "Could not make a connection with %s\n" %(self.server)
104            raise PBSError(str)
[79]105
[275]106    def _disconnect(self):
107        """Close the PBS/Torque connection"""
108        pbs.pbs_disconnect(self.con)
109        self.attribs = 'NULL'
[72]110
[275]111    def _list_2_attrib(self, list):
112        """Convert a python list to an attrib list suitable for pbs"""
113        self.attribs = pbs.new_attrl( len(list) )
114        i = 0 
115        for attrib in list:
116            # So we can user Resource
117            attrib = attrib.split('.')
118            self.attribs[i].name = attrib[0]
119            i = i + 1
[72]120
[275]121    def _pbsstr_2_list(self, str, delimiter):
122        """Convert a string to a python list and use delimiter as spit char"""
123        l = sting.splitfields(str, delimiter)
124        if len(l) > 1:
125            return l
[72]126
[275]127    def _list_2_dict(self, l, class_func):
128        """
129        Convert a pbsstat function list to a class dictionary, The
130        data structure depends on the function new_data_structure().
131       
132        Default data structure is:
133            class[key] = value, Where key and value are of type string
[188]134
[275]135        Future release, can be set by new_data_structure():
136            - class[key] = value where value can be:
137              1. a list of values of type string
138              2. a dictionary with as list of values of type string. If
139                 values contain a '=' character
[188]140
[275]141          eg:
142                print node['np']
143                >> [ '2' ]
[188]144
[275]145                print node['status']['arch']
146                >> [ 'x86_64' ]
147        """
148        self.d = {}
149        for item in l:
150            new = class_func()
[188]151
[275]152            self.d[item.name] = new
153           
154            new.name = item.name
[188]155
[275]156            for a in item.attribs:
[188]157
[275]158                if self.OLD_DATA_STRUCTURE:
[188]159
[275]160                    if a.resource:
161                        key = '%s.%s' %(a.name, a.resource)
162                    else:
163                        key = '%s' %(a.name)
[205]164
[275]165                    new[key] = a.value
[205]166
[275]167                else:
168                    values = string.split(a.value, ',') 
169                    sub_dict = string.split(a.value, '=')
[205]170
171
[275]172                    # We must creat sub dicts, only for specified
173                    # key values
174                    #
175                    if a.name in ['status', 'Variable_List']:
[188]176
[275]177                        for v in values:
[188]178
[275]179                            tmp_l = v.split('=')
[188]180
[275]181                            ## Support for multiple EVENT mesages in format [key=value:]+
[285]182                            #  format eg: message=EVENT:sample.time=1288864220.003,EVENT:kernel=upgrade,cputotals.user=0
[275]183                            #             message=ERROR <text>
184                            #
185                            if tmp_l[0] in ['message']:
[263]186
[275]187                                if tmp_l[1].startswith('EVENT:'):
[263]188
[275]189                                    tmp_d  = dict()
190                                    new['event'] = class_func(tmp_d)
[263]191
[275]192                                    message_list = v.split(':')
193                                    for event_type in message_list[1:]:
194                                        tmp_l = event_type.split('=')
195                                        new['event'][ tmp_l[0] ] = tmp_l[1:]
[264]196
[275]197                                else:
[285]198
[275]199                                    ## ERROR message
200                                    #
201                                    new['error'] = tmp_l [1:]
[264]202
[285]203                            elif tmp_l[0].startswith('EVENT:'): 
[263]204
[285]205                                  message_list = v.split(':')
206                                  for event_type in message_list[1:]:
207                                      tmp_l = event_type.split('=')
208                                      new['event'][ tmp_l[0] ] = tmp_l[1:]
[188]209
[275]210                            else:
[285]211                                  ## Check if we already added the key
212                                  #
213                                  if new.has_key(a.name):
[205]214
[285]215                                      new[a.name][ tmp_l[0] ] = tmp_l[1:] 
216
217                                  else:
218
219                                      tmp_d  = dict()
220                                      tmp_d[ tmp_l[0] ] = tmp_l[1:]
221                                      new[a.name] = class_func(tmp_d) 
222
[275]223                    else: 
[285]224
[275]225                        ## Check if it is a resource type variable, eg:
226                        #  - Resource_List.(nodes, walltime, ..)
227                        #
228                        if a.resource:
[205]229
[275]230                            if new.has_key(a.name):
231                                new[a.name][a.resource] = values
[205]232
[275]233                            else:
234                                tmp_d = dict()
235                                tmp_d[a.resource] = values
236                                new[a.name] = class_func(tmp_d) 
237                        else:
238                            # Simple value
239                            #
240                            new[a.name] = values
[205]241
[275]242        self._free(l)
243           
244    def _free(self, memory):
245        """
246        freeing up used memmory
[188]247
[275]248        """
249        pbs.pbs_statfree(memory)
[72]250
[275]251    def _statserver(self, attrib_list=None):
252        """Get the server config from the pbs server"""
253        if attrib_list:
254            self._list_2_attrib(attrib_list)
255        else:
256            self.attribs = 'NULL' 
257           
258        self._connect()
259        serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
260        self._disconnect() 
261       
262        self._list_2_dict(serverinfo, server)
[72]263
[275]264    def get_serverinfo(self, attrib_list=None):
265        self._statserver(attrib_list)
266        return self.d
[72]267
[275]268    def _statqueue(self, queue_name='', attrib_list=None):
269        """Get the queue config from the pbs server"""
270        if attrib_list:
271            self._list_2_attrib(attrib_list)
272        else:
273            self.attribs = 'NULL' 
274           
275        self._connect()
276        queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
277        self._disconnect() 
278       
279        self._list_2_dict(queues, queue)
[79]280
[275]281    def getqueue(self, name, attrib_list=None):
282        self._statqueue(name, attrib_list)
283        try:
284            return self.d[name]
285        except KeyError, detail:
286            return self.d
[72]287       
[275]288    def getqueues(self, attrib_list=None):
289        self._statqueue('', attrib_list)
290        return self.d
[72]291
[275]292    def _statnode(self, select='', attrib_list=None, property=None):
293        """Get the node config from the pbs server"""
294        if attrib_list:
295            self._list_2_attrib(attrib_list)
296        else:
297            self.attribs = 'NULL' 
298           
299        if property:
300            select = ':%s' %(property)
[164]301
[275]302        self._connect()
303        nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
304        self._disconnect() 
305       
306        self._list_2_dict(nodes, node)
[79]307
[275]308    def getnode(self, name, attrib_list=None):
309        self._statnode(name, attrib_list)
310        try:
311            return self.d[name]
312        except KeyError, detail:
313            return self.d
[72]314       
[275]315    def getnodes(self, attrib_list=None):
316        self._statnode('', attrib_list)
317        return self.d
[72]318
[275]319    def getnodes_with_property(self, property, attrib_list=None):
320        self._statnode('', attrib_list, property)
321        return self.d
[164]322
[275]323    def _statjob(self, job_name='', attrib_list=None):
324        """Get the job config from the pbs server"""
325        if attrib_list:
326            self._list_2_attrib(attrib_list)
327        else:
328            self.attribs = 'NULL' 
329           
330        self._connect()
331        jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
332        self._disconnect() 
333       
334        self._list_2_dict(jobs, job)
[72]335
[275]336    def getjob(self, name, attrib_list=None):
[279]337        ## To make sure we use the full name of a job; Changes a name
338        # like 1234567 into 1234567.job_server_id
339        #
340        if len(name.split('.')) == 1 :
341            name = name.split('.')[0] + "." + self.job_server_id
[175]342
[275]343        self._statjob(name, attrib_list)
344        try:
345            return self.d[name]
346        except KeyError, detail:
347            return self.d
[72]348       
[275]349    def getjobs(self, attrib_list=None):
350        self._statjob('', attrib_list)
351        return self.d
[72]352
[275]353    def get_server_name(self):
354        return self.server
[169]355
[275]356    def new_data_structure(self): 
357        """
358        Use the new data structure. Is now the default
359        """
360        self.OLD_DATA_STRUCTURE = False
[188]361
[275]362    def old_data_structure(self): 
363        """
364        Use the old data structure. This function is obselete and
365        will be removed in a future release
366        """
367        self.OLD_DATA_STRUCTURE = True
[227]368
[139]369class _PBSobject(UserDict.UserDict):
[275]370    TRUE  = 1
371    FALSE = 0
[72]372
[275]373    def __init__(self, dictin = None):
374        UserDict.UserDict.__init__(self)
375        self.name = None
[76]376
[275]377        if dictin:
378            if dictin.has_key('name'):
379                self.name = dictin['name']
380                del dictin['name']
381            self.data = dictin
[205]382
[275]383    def get_value(self, key):
384        if self.has_key(key):
385            return self[key]
386        else:
387            return None
[72]388
[275]389    def __repr__(self):
390        return repr(self.data)
[188]391
[275]392    def __str__(self):
393        return str(self.data)
[188]394
[275]395    def __getattr__(self, name):
396        """
397        override the class attribute get method. Return the value
398        from the Userdict
399        """
400        try:
401            return self.data[name]
402        except KeyError:
403            error = 'Attribute key error: %s' %(name)
404            raise PBSError(error)
[188]405
[275]406    ## Disabled for this moment, BvdV 16 July 2010
407    #
408    #def __setattr__(self, name, value):
409    #   """
410    #   override the class attribute set method only when the UserDict
411    #   has set its class attribute
412    #   """
413    #   if self.__dict__.has_key('data'):
414    #       self.data[name] = value
415    #   else:
416    #       self.__dict__[name] = value
[243]417
[275]418    def __iter__(self):
419        return iter(self.data.keys())
[190]420
[289]421    def __nonzero__(self):
422        if self.data:
423            return True
424        else:
425            return False
426
[275]427    def uniq(self, list):
428        """Filter out unique items of a list"""
429        uniq_items = {}
430        for item in list:
431            uniq_items[item] = 1
432        return uniq_items.keys()
[171]433
[275]434    def return_value(self, key):
435        """Function that returns a value independent of new or old data structure"""
436        if isinstance(self[key], types.ListType):
437            return self[key][0]
438        else:
439            return self[key] 
[195]440
[72]441class job(_PBSobject):
[275]442    """PBS job class""" 
443    def is_running(self):
[195]444
[275]445        value = self.return_value('job_state')
446        if value == 'Q':
447            return self.TRUE
448        else:
449            return self.FALSE
[72]450
[275]451    def get_nodes(self, unique=None):
452        """
453        Returns a list of the nodes which run this job
454        format:
455          * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
456          * split on '+' and if uniq is set split on '/'
457        """
458        nodes = self.get_value('exec_host')
459       
460        if isinstance(nodes, str):
461            if nodes:
462                nodelist = string.split(nodes,'+')
463                if not unique:
464                    return nodelist
465                else:
466                    l = list()
[246]467
[275]468                    for n in nodelist:
469                        t = string.split(n,'/')
470                        if t[0] not in l:
471                            l.append(t[0])
[246]472
[275]473                    return l
[246]474
[275]475            else:
476                return list()
477        else:
478                l = list()
479                for n in nodes:
[143]480
[275]481                    nlist = string.split(n,'+')
[143]482
[275]483                    if unique:
484                        for entry in nlist:
[246]485
[275]486                            t = string.split(entry,'/')
487                            if t[0] not in l:
488                                l.append(t[0])
489                    else:
490                        l += nlist
[246]491
[275]492                return l
493       
[246]494
[72]495class node(_PBSobject):
[275]496    """PBS node class"""
497   
498    def is_free(self):
499        """Check if node is free"""
[195]500
[275]501        value = self.return_value('state')
502        if value == 'free':
503            return self.TRUE
504        else: 
505            return self.FALSE
[72]506
[275]507    def has_job(self):
508        """Does the node run a job"""
509        try:
510            a = self['jobs']
511            return self.TRUE
512        except KeyError, detail:
513            return self.FALSE
514   
515    def get_jobs(self, unique=None):
516        """Returns a list of the currently running job-id('s) on the node"""
[240]517
[275]518        jobs = self.get_value('jobs')
519        if jobs:   
520            if isinstance(jobs, str):
521                jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
522           
523                if not unique:
524                    return jlist
525                else:
526                    return self.uniq(jlist)
[245]527
[275]528            else:
529                job_re = re.compile('^(?:\d+/)?(.+)')
530                l = list()
[240]531
[275]532                if unique:
533                        for j in jobs:
534                            jobstr = job_re.findall(j.strip())[0]
535                            if jobstr not in l: 
536                                l.append(jobstr)           
[240]537
[275]538                        return l
539                else:
540                        return jobs
[245]541
[275]542        return list()
[72]543
[166]544
[72]545class queue(_PBSobject):
[275]546    """PBS queue class"""
547    def is_enabled(self):
[195]548
[275]549        value = self.return_value('enabled')
550        if value == 'True':
551            return self.TRUE
552        else:
553            return self.FALSE
[72]554
[275]555    def is_execution(self):
[195]556
[275]557        value = self.return_value('queue_type')
558        if value == 'Execution':
559            return self.TRUE
560        else:
561            return self.FALSE
[72]562
563class server(_PBSobject):
[275]564    """PBS server class"""
[72]565
[275]566    def get_version(self):
567        return self.get_value('pbs_version')
[143]568
[72]569def main():
[275]570    p = PBSQuery() 
571    serverinfo = p.get_serverinfo()
572    for server in serverinfo.keys():
573        print server, ' version: ', serverinfo[server].get_version()
574    for resource in serverinfo[server].keys():
575        print '\t ', resource, ' = ', serverinfo[server][resource]
[72]576
[275]577    queues = p.getqueues()
578    for queue in queues.keys():
579        print queue
580        if queues[queue].is_execution():
581            print '\t ', queues[queue]
582        if queues[queue].has_key('acl_groups'):
583            print '\t acl_groups: yes'
584        else:
585            print '\t acl_groups: no'
[72]586
[275]587    jobs = p.getjobs()
588    for name,job in jobs.items():
589        if job.is_running():
590            print job
[72]591
[275]592    l = ['state']
593    nodes = p.getnodes(l)
594    for name,node in nodes.items():
595        if node.is_free(): 
596            print node
[72]597
598if __name__ == "__main__":
[275]599    main()
Note: See TracBrowser for help on using the repository browser.