source: trunk/src/PBSQuery.py @ 339

Last change on this file since 339 was 339, checked in by bas, 9 years ago

applied PBSQuery patch to handle cpuid range for torque5, see #47

  • Property svn:keywords set to Id
File size: 19.5 KB
Line 
1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
6#   $Id: PBSQuery.py 339 2015-03-10 16:55:29Z bas $
7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
24
25There are the following functions for PBSQuery:
26  job -
27    getjob(job_id, attributes=<default is all>)
28    getjobs(attributes=<default is all>)
29
30  node -
31    getnode(node_id, attributes=<default is all>)
32    getnodes(attributes=<default is all>)
33
34  queue -
35    getqueue(queue_id, attributes=<default is all>)
36    getqueues(attributes=<default is all>)
37
38  server -
39    get_serverinfo(attributes=<default is all>)
40
41Here is an example how to use the module:
42    from PBSQuery import PBSQuery
43    p = PBSQuery()
44    nodes = p.getnodes()
45    for name,node in nodes.items():
46        print name
47        if node.is_free():
48           print node, node['state']
49
50    l = [ 'state', 'np' ]
51    nodes = p.getnodes(l)
52    for name,node in nodes.items():
53           print node, node['state']
54
55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
58    l.append('state')
59    nodes = p.getnodes(l)
60"""
61import pbs
62import UserDict
63import string
64import sys
65import re
66import types
67
68
69REG_SUBRANGE = re.compile(r'^\d+(-\d+)?$')
70"""
71>>> b='rectime=1424696750,macaddr=40:a8:f0:2f:17:f4,cpuclock=Fixed,varattr=,jobs=419[1].master(cput=236745,energy_used=0,mem=6562224kb,vmem=7391872kb,walltime=22647,session_id=15941) 446[1].master(cput=7385,energy_used=0,mem=202936kb,vmem=368184kb,walltime=7391,session_id=30940) 446[2].master(cput=7385,energy_used=0,mem=204016kb,vmem=368184kb,walltime=7387,session_id=31168) 446[3].master(cput=7384,energy_used=0,mem=202380kb,vmem=368184kb,walltime=7387,session_id=31344) 446[4].master(cput=7384,energy_used=0,mem=204108kb,vmem=368184kb,walltime=7386,session_id=31536) 446[5].master(cput=7384,energy_used=0,mem=203940kb,vmem=368184kb,walltime=7386,session_id=31718) 446[6].master(cput=7383,energy_used=0,mem=188720kb,vmem=352608kb,walltime=7386,session_id=31904),state=free,size=456341748kb:459945088kb,netload=587288451179,gres=,loadave=18.07,ncpus=24,physmem=65850220kb,availmem=77961112kb,totmem=86821736kb,idletime=13933,nusers=1,nsessions=7,sessions=31904 31718 31536 31344 31168 30940 15941,uname=Linux node24 3.10.0 #1 SMP Wed Feb 4 08:16:54 CET 2015 x86_64,opsys=linux'
72>>> c=','.join([x[1] for x in re.compile(r'((:?[^,(]+(?:\(.*?\))?))(?:,|$)').findall(b)])
73>>> c == b
74True
75
76"""
77REG_SPLIT_COMMA_BRACE = re.compile(r'((:?[^,(]+(?:\(.*?\))?))(?:,|$)')
78"""
79>>> d='jobs=419[1].master(cput=236745,energy_used=0,mem=6562224kb,vmem=7391872kb,walltime=22647,session_id=15941) 446[1].master(cput=7385,energy_used=0,mem=202936kb,vmem=368184kb,walltime=7391,session_id=30940) 446[2].master(cput=7385,energy_used=0,mem=204016kb,vmem=368184kb,walltime=7387,session_id=31168) 446[3].master(cput=7384,energy_used=0,mem=202380kb,vmem=368184kb,walltime=7387,session_id=31344) 446[4].master(cput=7384,energy_used=0,mem=204108kb,vmem=368184kb,walltime=7386,session_id=31536) 446[5].master(cput=7384,energy_used=0,mem=203940kb,vmem=368184kb,walltime=7386,session_id=31718) 446[6].master(cput=7383,energy_used=0,mem=188720kb,vmem=352608kb,walltime=7386,session_id=31904)'
80>>> e='='.join([x[1] for x in re.compile(r'((:?[^=(]+(?:\(.*?\))?))(?:=|$)').findall(d)])
81>>> d == e
82True
83"""
84REG_SPLIT_EQUAL_BRACE = re.compile(r'((:?[^=(]+(?:\(.*?\))?))(?:=|$)')
85
86JOB_RE = re.compile('(?:^|,)(?:((?:[\d,-]+)?\d+)/)?(.+)')
87
88
89def convert_range(rangetxt):
90    """
91    Convert range string into list of id strings: eg.g '3,5,7-9' -> ['3','5','7','8','9']
92    """
93    ids=[]
94    for subrange in [r.split('-') for r in rangetxt.split(',')]:
95        start = int(subrange[0])
96        if(len(subrange) == 2 ):
97            end = int(subrange[1]) + 1
98        else:
99            end = start + 1
100        ids.extend([str(i) for i in range(start,end)])
101    return ids
102
103
104class PBSError(Exception):
105    def __init__(self, msg=''):
106        self.msg = msg
107        Exception.__init__(self, msg)
108
109    def __repr__(self):
110        return self.msg
111
112    __str__ = __repr__
113
114
115class PBSQuery:
116
117    # a[key] = value, key and value are data type string
118    #
119    OLD_DATA_STRUCTURE = False
120
121    def __init__(self, server=None):
122        if not server:
123            self.server = pbs.pbs_default()
124        else:
125            self.server = server
126
127        self._connect()
128        ## this is needed for getjob a jobid is made off:
129        #    sequence_number.server (is not self.server)
130        #
131        self.job_server_id = list(self.get_serverinfo())[0]
132        self._disconnect()
133
134
135    def _connect(self):
136        """Connect to the PBS/Torque server"""
137        self.con = pbs.pbs_connect(self.server)
138        if self.con < 0:
139            str = "Could not make a connection with %s\n" %(self.server)
140            raise PBSError(str)
141
142    def _disconnect(self):
143        """Close the PBS/Torque connection"""
144        pbs.pbs_disconnect(self.con)
145        self.attribs = 'NULL'
146
147    def _list_2_attrib(self, list):
148        """Convert a python list to an attrib list suitable for pbs"""
149        self.attribs = pbs.new_attrl( len(list) )
150        i = 0
151        for attrib in list:
152            # So we can user Resource
153            attrib = attrib.split('.')
154            self.attribs[i].name = attrib[0]
155            i = i + 1
156
157    def _pbsstr_2_list(self, str, delimiter):
158        """Convert a string to a python list and use delimiter as spit char"""
159        l = sting.splitfields(str, delimiter)
160        if len(l) > 1:
161            return l
162
163    def _list_2_dict(self, l, class_func):
164        """
165        Convert a pbsstat function list to a class dictionary, The
166        data structure depends on the function new_data_structure().
167
168        Default data structure is:
169            class[key] = value, Where key and value are of type string
170
171        Future release, can be set by new_data_structure():
172            - class[key] = value where value can be:
173              1. a list of values of type string
174              2. a dictionary with as list of values of type string. If
175                 values contain a '=' character
176
177          eg:
178                print node['np']
179                >> [ '2' ]
180
181                print node['status']['arch']
182                >> [ 'x86_64' ]
183        """
184        self.d = {}
185        for item in l:
186            new = class_func()
187
188            self.d[item.name] = new
189
190            new.name = item.name
191
192            for a in item.attribs:
193
194                if self.OLD_DATA_STRUCTURE:
195
196                    if a.resource:
197                        key = '%s.%s' %(a.name, a.resource)
198                    else:
199                        key = '%s' %(a.name)
200
201                    new[key] = a.value
202
203                else:
204                    # Don't split , between ()
205                    values = [x[1] for x in REG_SPLIT_COMMA_BRACE.findall(a.value)]
206
207                    # We must creat sub dicts, only for specified
208                    # key values
209                    #
210                    if a.name in ['status', 'Variable_List']:
211
212                        for v in values:
213
214                            # Don't split between ()
215                            tmp_l = [x[1] for x in REG_SPLIT_EQUAL_BRACE.findall(v)]
216
217                            ## Support for multiple EVENT mesages in format [key=value:]+
218                            #  format eg: message=EVENT:sample.time=1288864220.003,EVENT:kernel=upgrade,cputotals.user=0
219                            #             message=ERROR <text>
220                            #
221                            if tmp_l[0] in ['message']:
222
223                                if tmp_l[1].startswith('EVENT:'):
224
225                                    tmp_d  = dict()
226                                    new['event'] = class_func(tmp_d)
227
228                                    message_list = v.split(':')
229                                    for event_type in message_list[1:]:
230                                        tmp_l = event_type.split('=')
231                                        new['event'][ tmp_l[0] ] = tmp_l[1:]
232
233                                else:
234
235                                    ## ERROR message
236                                    #
237                                    new['error'] = tmp_l [1:]
238
239                            elif tmp_l[0].startswith('EVENT:'):
240
241                                  message_list = v.split(':')
242                                  for event_type in message_list[1:]:
243                                      tmp_l = event_type.split('=')
244                                      new['event'][ tmp_l[0] ] = tmp_l[1:]
245
246                            else:
247                                  ## Check if we already added the key
248                                  #
249                                  if new.has_key(a.name):
250
251                                      new[a.name][ tmp_l[0] ] = tmp_l[1:]
252
253                                  else:
254
255                                      tmp_d  = dict()
256                                      tmp_d[ tmp_l[0] ] = tmp_l[1:]
257                                      new[a.name] = class_func(tmp_d)
258
259                    else:
260
261                        ## Check if it is a resource type variable, eg:
262                        #  - Resource_List.(nodes, walltime, ..)
263                        #
264                        if a.resource:
265
266                            if new.has_key(a.name):
267                                new[a.name][a.resource] = values
268
269                            else:
270                                tmp_d = dict()
271                                tmp_d[a.resource] = values
272                                new[a.name] = class_func(tmp_d)
273                        else:
274                            # Simple value
275                            #
276                            new[a.name] = values
277
278        self._free(l)
279
280    def _free(self, memory):
281        """
282        freeing up used memmory
283
284        """
285        pbs.pbs_statfree(memory)
286
287    def _statserver(self, attrib_list=None):
288        """Get the server config from the pbs server"""
289        if attrib_list:
290            self._list_2_attrib(attrib_list)
291        else:
292            self.attribs = 'NULL'
293
294        self._connect()
295        serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
296        self._disconnect()
297
298        self._list_2_dict(serverinfo, server)
299
300    def get_serverinfo(self, attrib_list=None):
301        self._statserver(attrib_list)
302        return self.d
303
304    def _statqueue(self, queue_name='', attrib_list=None):
305        """Get the queue config from the pbs server"""
306        if attrib_list:
307            self._list_2_attrib(attrib_list)
308        else:
309            self.attribs = 'NULL'
310
311        self._connect()
312        queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
313        self._disconnect()
314
315        self._list_2_dict(queues, queue)
316
317    def getqueue(self, name, attrib_list=None):
318        self._statqueue(name, attrib_list)
319        try:
320            return self.d[name]
321        except KeyError, detail:
322            return self.d
323
324    def getqueues(self, attrib_list=None):
325        self._statqueue('', attrib_list)
326        return self.d
327
328    def _statnode(self, select='', attrib_list=None, property=None):
329        """Get the node config from the pbs server"""
330        if attrib_list:
331            self._list_2_attrib(attrib_list)
332        else:
333            self.attribs = 'NULL'
334
335        if property:
336            select = ':%s' %(property)
337
338        self._connect()
339        nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
340        self._disconnect()
341
342        self._list_2_dict(nodes, node)
343
344    def getnode(self, name, attrib_list=None):
345        self._statnode(name, attrib_list)
346        try:
347            return self.d[name]
348        except KeyError, detail:
349            return self.d
350
351    def getnodes(self, attrib_list=None):
352        self._statnode('', attrib_list)
353        return self.d
354
355    def getnodes_with_property(self, property, attrib_list=None):
356        self._statnode('', attrib_list, property)
357        return self.d
358
359    def _statjob(self, job_name='', attrib_list=None):
360        """Get the job config from the pbs server"""
361        if attrib_list:
362            self._list_2_attrib(attrib_list)
363        else:
364            self.attribs = 'NULL'
365
366        self._connect()
367        jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
368        self._disconnect()
369
370        self._list_2_dict(jobs, job)
371
372    def getjob(self, name, attrib_list=None):
373        ## To make sure we use the full name of a job; Changes a name
374        # like 1234567 into 1234567.job_server_id
375        #
376        if len(name.split('.')) == 1 :
377            name = name.split('.')[0] + "." + self.job_server_id
378
379        self._statjob(name, attrib_list)
380        try:
381            return self.d[name]
382        except KeyError, detail:
383            return self.d
384
385    def getjobs(self, attrib_list=None):
386        self._statjob('', attrib_list)
387        return self.d
388
389    def get_server_name(self):
390        return self.server
391
392    def new_data_structure(self):
393        """
394        Use the new data structure. Is now the default
395        """
396        self.OLD_DATA_STRUCTURE = False
397
398    def old_data_structure(self):
399        """
400        Use the old data structure. This function is obselete and
401        will be removed in a future release
402        """
403        self.OLD_DATA_STRUCTURE = True
404
405class _PBSobject(UserDict.UserDict):
406    TRUE  = 1
407    FALSE = 0
408
409    def __init__(self, dictin = None):
410        UserDict.UserDict.__init__(self)
411        self.name = None
412
413        if dictin:
414            if dictin.has_key('name'):
415                self.name = dictin['name']
416                del dictin['name']
417            self.data = dictin
418
419    def get_value(self, key):
420        if self.has_key(key):
421            return self[key]
422        else:
423            return None
424
425    def __repr__(self):
426        return repr(self.data)
427
428    def __str__(self):
429        return str(self.data)
430
431    def __getattr__(self, name):
432        """
433        override the class attribute get method. Return the value
434        from the Userdict
435        """
436        try:
437            return self.data[name]
438        except KeyError:
439            error = 'Attribute key error: %s' %(name)
440            raise PBSError(error)
441
442    def __iter__(self):
443        return iter(self.data.keys())
444
445    def __nonzero__(self):
446        if self.data:
447            return True
448        else:
449            return False
450
451    def uniq(self, list):
452        """Filter out unique items of a list"""
453        uniq_items = {}
454        for item in list:
455            uniq_items[item] = 1
456        return uniq_items.keys()
457
458    def return_value(self, key):
459        """Function that returns a value independent of new or old data structure"""
460        if isinstance(self[key], types.ListType):
461            return self[key][0]
462        else:
463            return self[key]
464
465class job(_PBSobject):
466    """PBS job class"""
467    def is_running(self):
468
469        value = self.return_value('job_state')
470        if value == 'Q':
471            return self.TRUE
472        else:
473            return self.FALSE
474
475    def get_nodes(self, unique=None):
476        """
477        Returns a list of the nodes which run this job
478        format:
479          * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
480          * split on '+' and if uniq is set split on '/'
481        """
482        nodes = self.get_value('exec_host')
483        if not nodes:
484            return list()
485
486        if isinstance(nodes, str):
487            nodelist = string.split(nodes,'+')
488        else:
489            nodelist = []
490            for n in nodes:
491                if REG_SUBRANGE.search(n):
492                    # This is a range split by _list_2_dict in a list
493                    # E.g. exec_host node1/4,5,8-9 is after _list_dict ['node1/4', '5', '8-9']
494                    # Re-join them with the last node
495                    nodelist[-1] += ',%s' % n
496                else:
497                    nodelist.extend(n.split('+'))
498
499        res=[]
500        for n in nodelist:
501            t = string.split(n,'/')
502
503            if not unique:
504                res.extend(["%s/%s" % (t[0],i) for i in convert_range(t[1])])
505            else:
506                if t[0] not in res:
507                    res.append(t[0])
508
509        return res
510
511
512class node(_PBSobject):
513    """PBS node class"""
514
515    def is_free(self):
516        """Check if node is free"""
517
518        value = self.return_value('state')
519        if value == 'free':
520            return self.TRUE
521        else:
522            return self.FALSE
523
524    def has_job(self):
525        """Does the node run a job"""
526        try:
527            a = self['jobs']
528            return self.TRUE
529        except KeyError, detail:
530            return self.FALSE
531
532    def get_jobs(self, unique=None):
533        """Returns a list of the currently running job-id('s) on the node"""
534
535        jobs = self.get_value('jobs')
536        if not jobs:
537            return list()
538
539        if isinstance(jobs, str):
540            jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
541
542            if not unique:
543                return jlist
544            else:
545                return self.uniq(jlist)
546
547        else:
548            # Support id ranges before job id
549            joblist = []
550            # Jobs might be splitted in ranges, but here _list_2_dict does
551            # 1,3,7-9/jobid -> ['1','3','7-9/jobid']
552            # Process in reverse order
553            for j in jobs[::-1]:
554                if REG_SUBRANGE.search(j):
555                    joblist[-1] = '%s,%s' % (j, joblist[-1])
556                else:
557                    joblist.append(j)
558           
559            # extend with nodes
560            l = []
561           
562            for j in joblist[::-1]:
563                r=JOB_RE.search(j)
564                # 1st match is range, second part is jobid
565                jobstr=r.groups()[1]
566                if unique:
567                    if jobstr not in l:
568                        l.append(jobstr)
569                else:
570                    l.extend(["%s/%s"%(i,jobstr) for i in convert_range(r.groups()[0])])
571                   
572            return l
573
574
575class queue(_PBSobject):
576    """PBS queue class"""
577    def is_enabled(self):
578
579        value = self.return_value('enabled')
580        if value == 'True':
581            return self.TRUE
582        else:
583            return self.FALSE
584
585    def is_execution(self):
586
587        value = self.return_value('queue_type')
588        if value == 'Execution':
589            return self.TRUE
590        else:
591            return self.FALSE
592
593class server(_PBSobject):
594    """PBS server class"""
595
596    def get_version(self):
597        return self.get_value('pbs_version')
598
599def main():
600    p = PBSQuery() 
601    serverinfo = p.get_serverinfo()
602    for server in serverinfo.keys():
603        print server, ' version: ', serverinfo[server].get_version()
604    for resource in serverinfo[server].keys():
605        print '\t ', resource, ' = ', serverinfo[server][resource]
606
607    queues = p.getqueues()
608    for queue in queues.keys():
609        print queue
610        if queues[queue].is_execution():
611            print '\t ', queues[queue]
612        if queues[queue].has_key('acl_groups'):
613            print '\t acl_groups: yes'
614        else:
615            print '\t acl_groups: no'
616
617    jobs = p.getjobs()
618    for name,job in jobs.items():
619        if job.is_running():
620            print job
621
622    l = ['state']
623    nodes = p.getnodes(l)
624    for name,node in nodes.items():
625        if node.is_free(): 
626            print node
627
628if __name__ == "__main__":
629    main()
Note: See TracBrowser for help on using the repository browser.