source: trunk/src/PBSQuery.py @ 368

Last change on this file since 368 was 368, checked in by bas, 6 years ago

sara_nodes also displays NHC offline notes,

  • Property svn:keywords set to Id
File size: 19.6 KB
Line 
1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
6#   $Id: PBSQuery.py 368 2018-10-22 10:16:12Z bas $
7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
24
25There are the following functions for PBSQuery:
26  job -
27    getjob(job_id, attributes=<default is all>)
28    getjobs(attributes=<default is all>)
29
30  node -
31    getnode(node_id, attributes=<default is all>)
32    getnodes(attributes=<default is all>)
33
34  queue -
35    getqueue(queue_id, attributes=<default is all>)
36    getqueues(attributes=<default is all>)
37
38  server -
39    get_serverinfo(attributes=<default is all>)
40
41Here is an example how to use the module:
42    from PBSQuery import PBSQuery
43    p = PBSQuery()
44    nodes = p.getnodes()
45    for name,node in nodes.items():
46        print name
47        if node.is_free():
48           print node, node['state']
49
50    l = [ 'state', 'np' ]
51    nodes = p.getnodes(l)
52    for name,node in nodes.items():
53           print node, node['state']
54
55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
58    l.append('state')
59    nodes = p.getnodes(l)
60"""
61import pbs
62import UserDict
63import string
64import sys
65import re
66import types
67
68
69REG_SUBRANGE = re.compile(r'^\d+(-\d+)?$')
70"""
71>>> b='rectime=1424696750,macaddr=40:a8:f0:2f:17:f4,cpuclock=Fixed,varattr=,jobs=419[1].master(cput=236745,energy_used=0,mem=6562224kb,vmem=7391872kb,walltime=22647,session_id=15941) 446[1].master(cput=7385,energy_used=0,mem=202936kb,vmem=368184kb,walltime=7391,session_id=30940) 446[2].master(cput=7385,energy_used=0,mem=204016kb,vmem=368184kb,walltime=7387,session_id=31168) 446[3].master(cput=7384,energy_used=0,mem=202380kb,vmem=368184kb,walltime=7387,session_id=31344) 446[4].master(cput=7384,energy_used=0,mem=204108kb,vmem=368184kb,walltime=7386,session_id=31536) 446[5].master(cput=7384,energy_used=0,mem=203940kb,vmem=368184kb,walltime=7386,session_id=31718) 446[6].master(cput=7383,energy_used=0,mem=188720kb,vmem=352608kb,walltime=7386,session_id=31904),state=free,size=456341748kb:459945088kb,netload=587288451179,gres=,loadave=18.07,ncpus=24,physmem=65850220kb,availmem=77961112kb,totmem=86821736kb,idletime=13933,nusers=1,nsessions=7,sessions=31904 31718 31536 31344 31168 30940 15941,uname=Linux node24 3.10.0 #1 SMP Wed Feb 4 08:16:54 CET 2015 x86_64,opsys=linux'
72>>> c=','.join([x[1] for x in re.compile(r'((:?[^,(]+(?:\(.*?\))?))(?:,|$)').findall(b)])
73>>> c == b
74True
75
76"""
77REG_SPLIT_COMMA_BRACE = re.compile(r'((:?[^,(]+(?:\(.*?\))?))(?:,|$)')
78"""
79>>> d='jobs=419[1].master(cput=236745,energy_used=0,mem=6562224kb,vmem=7391872kb,walltime=22647,session_id=15941) 446[1].master(cput=7385,energy_used=0,mem=202936kb,vmem=368184kb,walltime=7391,session_id=30940) 446[2].master(cput=7385,energy_used=0,mem=204016kb,vmem=368184kb,walltime=7387,session_id=31168) 446[3].master(cput=7384,energy_used=0,mem=202380kb,vmem=368184kb,walltime=7387,session_id=31344) 446[4].master(cput=7384,energy_used=0,mem=204108kb,vmem=368184kb,walltime=7386,session_id=31536) 446[5].master(cput=7384,energy_used=0,mem=203940kb,vmem=368184kb,walltime=7386,session_id=31718) 446[6].master(cput=7383,energy_used=0,mem=188720kb,vmem=352608kb,walltime=7386,session_id=31904)'
80>>> e='='.join([x[1] for x in re.compile(r'((:?[^=(]+(?:\(.*?\))?))(?:=|$)').findall(d)])
81>>> d == e
82True
83"""
84REG_SPLIT_EQUAL_BRACE = re.compile(r'((:?[^=(]+(?:\(.*?\))?))(?:=|$)')
85
86JOB_RE = re.compile('(?:^|,)(?:((?:[\d,-]+)?\d+)/)?(.+)')
87
88
89def convert_range(rangetxt):
90    """
91    Convert range string into list of id strings: eg.g '3,5,7-9' -> ['3','5','7','8','9']
92    """
93    ids=[]
94    for subrange in [r.split('-') for r in rangetxt.split(',')]:
95        start = int(subrange[0])
96        if(len(subrange) == 2 ):
97            end = int(subrange[1]) + 1
98        else:
99            end = start + 1
100        ids.extend([str(i) for i in range(start,end)])
101    return ids
102
103
104class PBSError(Exception):
105    def __init__(self, msg=''):
106        self.msg = msg
107        Exception.__init__(self, msg)
108
109    def __repr__(self):
110        return self.msg
111
112    __str__ = __repr__
113
114
115class PBSQuery:
116
117    # a[key] = value, key and value are data type string
118    #
119    OLD_DATA_STRUCTURE = False
120
121    def __init__(self, server=None):
122        if not server:
123            self.server = pbs.pbs_default()
124        else:
125            self.server = server
126
127        self._connect()
128        ## this is needed for getjob a jobid is made off:
129        #    sequence_number.server (is not self.server)
130        #
131        self.job_server_id = list(self.get_serverinfo())[0]
132        self._disconnect()
133
134
135    def _connect(self):
136        """Connect to the PBS/Torque server"""
137        self.con = pbs.pbs_connect(self.server)
138        if self.con < 0:
139            str = "Could not make a connection with %s\n" %(self.server)
140            raise PBSError(str)
141
142    def _disconnect(self):
143        """Close the PBS/Torque connection"""
144        pbs.pbs_disconnect(self.con)
145        self.attribs = 'NULL'
146
147    def _list_2_attrib(self, list):
148        """Convert a python list to an attrib list suitable for pbs"""
149        self.attribs = pbs.new_attrl( len(list) )
150        i = 0
151        for attrib in list:
152            # So we can user Resource
153            attrib = attrib.split('.')
154            self.attribs[i].name = attrib[0]
155            i = i + 1
156
157    def _pbsstr_2_list(self, str, delimiter):
158        """Convert a string to a python list and use delimiter as spit char"""
159        l = sting.splitfields(str, delimiter)
160        if len(l) > 1:
161            return l
162
163    def _list_2_dict(self, l, class_func):
164        """
165        Convert a pbsstat function list to a class dictionary, The
166        data structure depends on the function new_data_structure().
167
168        Default data structure is:
169            class[key] = value, Where key and value are of type string
170
171        Future release, can be set by new_data_structure():
172            - class[key] = value where value can be:
173              1. a list of values of type string
174              2. a dictionary with as list of values of type string. If
175                 values contain a '=' character
176
177          eg:
178                print node['np']
179                >> [ '2' ]
180
181                print node['status']['arch']
182                >> [ 'x86_64' ]
183        """
184        self.d = {}
185        for item in l:
186            new = class_func()
187
188            self.d[item.name] = new
189
190            new.name = item.name
191
192            for a in item.attribs:
193
194                if self.OLD_DATA_STRUCTURE:
195
196                    if a.resource:
197                        key = '%s.%s' %(a.name, a.resource)
198                    else:
199                        key = '%s' %(a.name)
200
201                    new[key] = a.value
202                else:
203                    # Don't split , between ()
204                    values = [x[1] for x in REG_SPLIT_COMMA_BRACE.findall(a.value)]
205                    if len(values) == 1:
206                        values = [ a.value ]
207
208                    # We must creat sub dicts, only for specified
209                    # key values
210                    #
211                    if a.name in ['status', 'Variable_List']:
212
213                        for v in values:
214
215                            # Don't split between ()
216                            tmp_l = [x[1] for x in REG_SPLIT_EQUAL_BRACE.findall(v)]
217
218                            ## Support for multiple EVENT mesages in format [key=value:]+
219                            #  format eg: message=EVENT:sample.time=1288864220.003,EVENT:kernel=upgrade,cputotals.user=0
220                            #             message=ERROR <text>
221                            #
222                            if tmp_l[0] in ['message']:
223
224                                if tmp_l[1].startswith('EVENT:'):
225
226                                    tmp_d  = dict()
227                                    new['event'] = class_func(tmp_d)
228
229                                    message_list = v.split(':')
230                                    for event_type in message_list[1:]:
231                                        tmp_l = event_type.split('=')
232                                        new['event'][ tmp_l[0] ] = tmp_l[1:]
233
234                                else:
235
236                                    ## ERROR message
237                                    #
238                                    new['error'] = tmp_l [1:]
239
240                            elif tmp_l[0].startswith('EVENT:'):
241
242                                  message_list = v.split(':')
243                                  for event_type in message_list[1:]:
244                                      tmp_l = event_type.split('=')
245                                      new['event'][ tmp_l[0] ] = tmp_l[1:]
246
247                            else:
248                                  ## Check if we already added the key
249                                  #
250                                  if new.has_key(a.name):
251
252                                      new[a.name][ tmp_l[0] ] = tmp_l[1:]
253
254                                  else:
255
256                                      tmp_d  = dict()
257                                      tmp_d[ tmp_l[0] ] = tmp_l[1:]
258                                      new[a.name] = class_func(tmp_d)
259
260                    else:
261
262                        ## Check if it is a resource type variable, eg:
263                        #  - Resource_List.(nodes, walltime, ..)
264                        #
265                        if a.resource:
266
267                            if new.has_key(a.name):
268                                new[a.name][a.resource] = values
269
270                            else:
271                                tmp_d = dict()
272                                tmp_d[a.resource] = values
273                                new[a.name] = class_func(tmp_d)
274                        else:
275                            # Simple value
276                            #
277                            new[a.name] = values
278
279        self._free(l)
280
281    def _free(self, memory):
282        """
283        freeing up used memmory
284
285        """
286        pbs.pbs_statfree(memory)
287
288    def _statserver(self, attrib_list=None):
289        """Get the server config from the pbs server"""
290        if attrib_list:
291            self._list_2_attrib(attrib_list)
292        else:
293            self.attribs = 'NULL'
294
295        self._connect()
296        serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
297        self._disconnect()
298
299        self._list_2_dict(serverinfo, server)
300
301    def get_serverinfo(self, attrib_list=None):
302        self._statserver(attrib_list)
303        return self.d
304
305    def _statqueue(self, queue_name='', attrib_list=None):
306        """Get the queue config from the pbs server"""
307        if attrib_list:
308            self._list_2_attrib(attrib_list)
309        else:
310            self.attribs = 'NULL'
311
312        self._connect()
313        queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
314        self._disconnect()
315
316        self._list_2_dict(queues, queue)
317
318    def getqueue(self, name, attrib_list=None):
319        self._statqueue(name, attrib_list)
320        try:
321            return self.d[name]
322        except KeyError, detail:
323            return self.d
324
325    def getqueues(self, attrib_list=None):
326        self._statqueue('', attrib_list)
327        return self.d
328
329    def _statnode(self, select='', attrib_list=None, property=None):
330        """Get the node config from the pbs server"""
331        if attrib_list:
332            self._list_2_attrib(attrib_list)
333        else:
334            self.attribs = 'NULL'
335
336        if property:
337            select = ':%s' %(property)
338
339        self._connect()
340        nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
341        self._disconnect()
342
343        self._list_2_dict(nodes, node)
344
345    def getnode(self, name, attrib_list=None):
346        self._statnode(name, attrib_list)
347        try:
348            return self.d[name]
349        except KeyError, detail:
350            return self.d
351
352    def getnodes(self, attrib_list=None):
353        self._statnode('', attrib_list)
354        return self.d
355
356    def getnodes_with_property(self, property, attrib_list=None):
357        self._statnode('', attrib_list, property)
358        return self.d
359
360    def _statjob(self, job_name='', attrib_list=None):
361        """Get the job config from the pbs server"""
362        if attrib_list:
363            self._list_2_attrib(attrib_list)
364        else:
365            self.attribs = 'NULL'
366
367        self._connect()
368        jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
369        self._disconnect()
370
371        self._list_2_dict(jobs, job)
372
373    def getjob(self, name, attrib_list=None):
374        ## To make sure we use the full name of a job; Changes a name
375        # like 1234567 into 1234567.job_server_id
376        #
377        if len(name.split('.')) == 1 :
378            name = name.split('.')[0] + "." + self.job_server_id
379
380        self._statjob(name, attrib_list)
381        try:
382            return self.d[name]
383        except KeyError, detail:
384            return self.d
385
386    def getjobs(self, attrib_list=None):
387        self._statjob('', attrib_list)
388        return self.d
389
390    def get_server_name(self):
391        return self.server
392
393    def new_data_structure(self):
394        """
395        Use the new data structure. Is now the default
396        """
397        self.OLD_DATA_STRUCTURE = False
398
399    def old_data_structure(self):
400        """
401        Use the old data structure. This function is obselete and
402        will be removed in a future release
403        """
404        self.OLD_DATA_STRUCTURE = True
405
406class _PBSobject(UserDict.UserDict):
407    TRUE  = 1
408    FALSE = 0
409
410    def __init__(self, dictin = None):
411        UserDict.UserDict.__init__(self)
412        self.name = None
413
414        if dictin:
415            if dictin.has_key('name'):
416                self.name = dictin['name']
417                del dictin['name']
418            self.data = dictin
419
420    def get_value(self, key):
421        if self.has_key(key):
422            return self[key]
423        else:
424            return None
425
426    def __repr__(self):
427        return repr(self.data)
428
429    def __str__(self):
430        return str(self.data)
431
432    def __getattr__(self, name):
433        """
434        override the class attribute get method. Return the value
435        from the Userdict
436        """
437        try:
438            return self.data[name]
439        except KeyError:
440            error = 'Attribute key error: %s' %(name)
441            raise PBSError(error)
442
443    def __iter__(self):
444        return iter(self.data.keys())
445
446    def __nonzero__(self):
447        if self.data:
448            return True
449        else:
450            return False
451
452    def uniq(self, list):
453        """Filter out unique items of a list"""
454        uniq_items = {}
455        for item in list:
456            uniq_items[item] = 1
457        return uniq_items.keys()
458
459    def return_value(self, key):
460        """Function that returns a value independent of new or old data structure"""
461        if isinstance(self[key], types.ListType):
462            return self[key][0]
463        else:
464            return self[key]
465
466class job(_PBSobject):
467    """PBS job class"""
468    def is_running(self):
469
470        value = self.return_value('job_state')
471        if value == 'Q':
472            return self.TRUE
473        else:
474            return self.FALSE
475
476    def get_nodes(self, unique=None):
477        """
478        Returns a list of the nodes which run this job
479        format:
480          * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
481          * split on '+' and if uniq is set split on '/'
482        """
483        nodes = self.get_value('exec_host')
484        if not nodes:
485            return list()
486
487        if isinstance(nodes, str):
488            nodelist = string.split(nodes,'+')
489        else:
490            nodelist = []
491            for n in nodes:
492                if REG_SUBRANGE.search(n):
493                    # This is a range split by _list_2_dict in a list
494                    # E.g. exec_host node1/4,5,8-9 is after _list_dict ['node1/4', '5', '8-9']
495                    # Re-join them with the last node
496                    nodelist[-1] += ',%s' % n
497                else:
498                    nodelist.extend(n.split('+'))
499
500        res=[]
501        for n in nodelist:
502            t = string.split(n,'/')
503
504            if not unique:
505                res.extend(["%s/%s" % (t[0],i) for i in convert_range(t[1])])
506            else:
507                if t[0] not in res:
508                    res.append(t[0])
509
510        return res
511
512
513class node(_PBSobject):
514    """PBS node class"""
515
516    def is_free(self):
517        """Check if node is free"""
518
519        value = self.return_value('state')
520        if value == 'free':
521            return self.TRUE
522        else:
523            return self.FALSE
524
525    def has_job(self):
526        """Does the node run a job"""
527        try:
528            a = self['jobs']
529            return self.TRUE
530        except KeyError, detail:
531            return self.FALSE
532
533    def get_jobs(self, unique=None):
534        """Returns a list of the currently running job-id('s) on the node"""
535
536        jobs = self.get_value('jobs')
537        if not jobs:
538            return list()
539
540        if isinstance(jobs, str):
541            jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
542
543            if not unique:
544                return jlist
545            else:
546                return self.uniq(jlist)
547
548        else:
549            # Support id ranges before job id
550            joblist = []
551            # Jobs might be splitted in ranges, but here _list_2_dict does
552            # 1,3,7-9/jobid -> ['1','3','7-9/jobid']
553            # Process in reverse order
554            for j in jobs[::-1]:
555                if REG_SUBRANGE.search(j):
556                    joblist[-1] = '%s,%s' % (j, joblist[-1])
557                else:
558                    joblist.append(j)
559           
560            # extend with nodes
561            l = []
562           
563            for j in joblist[::-1]:
564                r=JOB_RE.search(j)
565                # 1st match is range, second part is jobid
566                jobstr=r.groups()[1]
567                if unique:
568                    if jobstr not in l:
569                        l.append(jobstr)
570                else:
571                    l.extend(["%s/%s"%(i,jobstr) for i in convert_range(r.groups()[0])])
572                   
573            return l
574
575
576class queue(_PBSobject):
577    """PBS queue class"""
578    def is_enabled(self):
579
580        value = self.return_value('enabled')
581        if value == 'True':
582            return self.TRUE
583        else:
584            return self.FALSE
585
586    def is_execution(self):
587
588        value = self.return_value('queue_type')
589        if value == 'Execution':
590            return self.TRUE
591        else:
592            return self.FALSE
593
594class server(_PBSobject):
595    """PBS server class"""
596
597    def get_version(self):
598        return self.get_value('pbs_version')
599
600def main():
601    p = PBSQuery() 
602    serverinfo = p.get_serverinfo()
603    for server in serverinfo.keys():
604        print server, ' version: ', serverinfo[server].get_version()
605    for resource in serverinfo[server].keys():
606        print '\t ', resource, ' = ', serverinfo[server][resource]
607
608    queues = p.getqueues()
609    for queue in queues.keys():
610        print queue
611        if queues[queue].is_execution():
612            print '\t ', queues[queue]
613        if queues[queue].has_key('acl_groups'):
614            print '\t acl_groups: yes'
615        else:
616            print '\t acl_groups: no'
617
618    jobs = p.getjobs()
619    for name,job in jobs.items():
620        if job.is_running():
621            print job
622
623    l = ['state']
624    nodes = p.getnodes(l)
625    for name,node in nodes.items():
626        if node.is_free(): 
627            print node
628
629if __name__ == "__main__":
630    main()
Note: See TracBrowser for help on using the repository browser.