source: trunk/src/PBSQuery.py @ 263

Last change on this file since 263 was 263, checked in by bas, 13 years ago

added message events to PBSQuery

  • Property svn:keywords set to Id
File size: 12.4 KB
Line 
1#
2# Authors: Roy Dragseth (roy.dragseth@cc.uit.no)
3#          Bas van der Vlies (basv@sara.nl)
4#
5# SVN INFO:
6#       $Id: PBSQuery.py 263 2010-11-04 15:23:07Z bas $
7#
8"""
9Usage: from PBSQuery import PBSQuery
10
11This class gets the info from the pbs_server via the pbs.py module
12for the several batch objects. All get..() functions return an dictionary
13with id as key and batch object as value
14
15There are four batch objects:
16 - server
17 - queue
18 - job
19 - node
20
21Each object can be handled as an dictionary and has several member
22functions. The second parameter is an python list and can be used if you
23are only interested in certain resources, see example
24
25There are the following functions for PBSQuery:
26  job -
27        getjob(job_id, attributes=<default is all>)
28        getjobs(attributes=<default is all>)
29 
30  node -
31        getnode(node_id, attributes=<default is all>)
32        getnodes(attributes=<default is all>)
33
34  queue -
35        getqueue(queue_id, attributes=<default is all>)
36        getqueues(attributes=<default is all>)
37
38  server -
39        get_serverinfo(attributes=<default is all>)
40
41Here is an example how to use the module:
42        from PBSQuery import PBSQuery
43        p = PBSQuery()
44        nodes = p.getnodes()
45        for name,node in nodes.items():
46            print name
47            if node.is_free():
48               print node, node['state']
49
50        l = [ 'state', 'np' ]
51        nodes = p.getnodes(l)
52        for name,node in nodes.items():
53               print node, node['state']
54
55The parameter 'attributes' is an python list of resources that
56you are interested in, eg: only show state of nodes
57        l = list()
58        l.append('state')
59        nodes = p.getnodes(l)
60"""
61import pbs
62import UserDict
63import string
64import sys
65import re
66import types
67
68class PBSError(Exception):
69        def __init__(self, msg=''):
70                self.msg = msg
71                Exception.__init__(self, msg)
72               
73        def __repr__(self):
74                return self.msg
75
76        __str__ = __repr__
77
78
79class PBSQuery:
80
81        # a[key] = value, key and value are data type string
82        #
83        OLD_DATA_STRUCTURE = False
84
85        def __init__(self, server=None):
86                if not server:
87                        self.server = pbs.pbs_default()
88                else:
89                        self.server = server
90
91        def _connect(self):
92                """Connect to the PBS/Torque server"""
93                self.con = pbs.pbs_connect(self.server)
94                if self.con < 0:
95                        str = "Could not make a connection with %s\n" %(self.server)
96                        raise PBSError(str)
97
98        def _disconnect(self):
99                """Close the PBS/Torque connection"""
100                pbs.pbs_disconnect(self.con)
101                self.attribs = 'NULL'
102
103        def _list_2_attrib(self, list):
104                """Convert a python list to an attrib list suitable for pbs"""
105                self.attribs = pbs.new_attrl( len(list) )
106                i = 0 
107                for attrib in list:
108                        # So we can user Resource
109                        attrib = attrib.split('.')
110                        self.attribs[i].name = attrib[0]
111                        i = i + 1
112
113        def _pbsstr_2_list(self, str, delimiter):
114                """Convert a string to a python list and use delimiter as spit char"""
115                l = sting.splitfields(str, delimiter)
116                if len(l) > 1:
117                        return l
118
119        def _list_2_dict(self, l, class_func):
120                """
121                Convert a pbsstat function list to a class dictionary, The
122                data structure depends on the function new_data_structure().
123               
124                Default data structure is:
125                        class[key] = value, Where key and value are of type string
126
127                Future release, can be set by new_data_structure():
128                        - class[key] = value where value can be:
129                          1. a list of values of type string
130                          2. a dictionary with as list of values of type string. If
131                             values contain a '=' character
132
133                  eg:
134                            print node['np']
135                                >> [ '2' ]
136
137                                print node['status']['arch']
138                                >> [ 'x86_64' ]
139                """
140                self.d = {}
141                for item in l:
142                        new = class_func()
143
144                        self.d[item.name] = new
145                       
146                        new.name = item.name
147
148                        for a in item.attribs:
149
150                                if self.OLD_DATA_STRUCTURE:
151
152                                        if a.resource:
153                                                key = '%s.%s' %(a.name, a.resource)
154                                        else:
155                                                key = '%s' %(a.name)
156
157                                        new[key] = a.value
158
159                                else:
160                                        values = string.split(a.value, ',') 
161                                        sub_dict = string.split(a.value, '=')
162
163
164                                        # We must creat sub dicts, only for specified
165                                        # key values
166                                        #
167                                        if a.name in ['status', 'Variable_List']:
168
169                                                for v in values:
170
171                                                        tmp_l = v.split('=')
172
173                                                        ## Support for multiple EVENT mesages in format [:key=value]+
174                                                        #  format eg: message=EVENT:sample.time=1288864220.003:cputotals.user=0
175                                                        #
176                                                        if tmp_l[0] in ['message']:
177
178                                                                tmp_d  = dict()
179                                                                new['event'] = class_func(tmp_d)
180
181                                                                message_list = v.split(':')
182                                                                for event_type in message_list[1:]:
183                                                                        tmp_l = event_type.split('=')
184                                                                        new['event'][ tmp_l[0] ] = tmp_l[1:]
185
186                                                                ## continue with next status value
187                                                                #
188                                                                continue
189
190                                                                       
191                                                        ## Check if we already added the key
192                                                        #
193                                                        if new.has_key(a.name):
194                                                                new[a.name][ tmp_l[0] ] = tmp_l[1:]
195
196                                                        else:
197                                                                tmp_d  = dict()
198                                                                tmp_d[ tmp_l[0] ] = tmp_l[1:]
199                                                                new[a.name] = class_func(tmp_d) 
200
201                                        else: 
202                                                ## Check if it is a resource type variable, eg:
203                                                #  - Resource_List.(nodes, walltime, ..)
204                                                #
205                                                if a.resource:
206
207                                                        if new.has_key(a.name):
208                                                                new[a.name][a.resource] = values
209
210                                                        else:
211                                                                tmp_d = dict()
212                                                                tmp_d[a.resource] = values
213                                                                new[a.name] = class_func(tmp_d) 
214                                                else:
215                                                        # Simple value
216                                                        #
217                                                        new[a.name] = values
218
219                self._free(l)
220               
221        def _free(self, memory):
222                """
223                freeing up used memmory
224
225                """
226                pbs.pbs_statfree(memory)
227
228        def _statserver(self, attrib_list=None):
229                """Get the server config from the pbs server"""
230                if attrib_list:
231                        self._list_2_attrib(attrib_list)
232                else:
233                        self.attribs = 'NULL' 
234                       
235                self._connect()
236                serverinfo = pbs.pbs_statserver(self.con, self.attribs, 'NULL')
237                self._disconnect() 
238               
239                self._list_2_dict(serverinfo, server)
240
241        def get_serverinfo(self, attrib_list=None):
242                self._statserver(attrib_list)
243                return self.d
244
245        def _statqueue(self, queue_name='', attrib_list=None):
246                """Get the queue config from the pbs server"""
247                if attrib_list:
248                        self._list_2_attrib(attrib_list)
249                else:
250                        self.attribs = 'NULL' 
251                       
252                self._connect()
253                queues = pbs.pbs_statque(self.con, queue_name, self.attribs, 'NULL')
254                self._disconnect() 
255               
256                self._list_2_dict(queues, queue)
257
258        def getqueue(self, name, attrib_list=None):
259                self._statqueue(name, attrib_list)
260                try:
261                        return self.d[name]
262                except KeyError, detail:
263                        return self.d
264       
265        def getqueues(self, attrib_list=None):
266                self._statqueue('', attrib_list)
267                return self.d
268
269        def _statnode(self, select='', attrib_list=None, property=None):
270                """Get the node config from the pbs server"""
271                if attrib_list:
272                        self._list_2_attrib(attrib_list)
273                else:
274                        self.attribs = 'NULL' 
275                       
276                if property:
277                        select = ':%s' %(property)
278
279                self._connect()
280                nodes = pbs.pbs_statnode(self.con, select, self.attribs, 'NULL')
281                self._disconnect() 
282               
283                self._list_2_dict(nodes, node)
284
285        def getnode(self, name, attrib_list=None):
286                self._statnode(name, attrib_list)
287                try:
288                        return self.d[name]
289                except KeyError, detail:
290                        return self.d
291       
292        def getnodes(self, attrib_list=None):
293                self._statnode('', attrib_list)
294                return self.d
295
296        def getnodes_with_property(self, property, attrib_list=None):
297                self._statnode('', attrib_list, property)
298                return self.d
299
300        def _statjob(self, job_name='', attrib_list=None):
301                """Get the job config from the pbs server"""
302                if attrib_list:
303                        self._list_2_attrib(attrib_list)
304                else:
305                        self.attribs = 'NULL' 
306                       
307                self._connect()
308                jobs = pbs.pbs_statjob(self.con, job_name, self.attribs, 'NULL')
309                self._disconnect() 
310               
311                self._list_2_dict(jobs, job)
312
313        def getjob(self, name, attrib_list=None):
314                # To make sure we use the full name of a job; Changes a name
315                # like 1234567 into 1234567.server.name
316                name = name.split('.')[0] + "." + self.get_server_name()
317
318                self._statjob(name, attrib_list)
319                try:
320                        return self.d[name]
321                except KeyError, detail:
322                        return self.d
323       
324        def getjobs(self, attrib_list=None):
325                self._statjob('', attrib_list)
326                return self.d
327
328        def get_server_name(self):
329                return self.server
330
331        def new_data_structure(self): 
332                """
333                Use the new data structure. Is now the default
334                """
335                self.OLD_DATA_STRUCTURE = False
336
337        def old_data_structure(self): 
338                """
339                Use the old data structure. This function is obselete and
340                will be removed in a future release
341                """
342                self.OLD_DATA_STRUCTURE = True
343
344class _PBSobject(UserDict.UserDict):
345        TRUE  = 1
346        FALSE = 0
347
348        def __init__(self, dictin = None):
349                UserDict.UserDict.__init__(self)
350                self.name = None
351
352                if dictin:
353                        if dictin.has_key('name'):
354                                self.name = dictin['name']
355                                del dictin['name']
356                        self.data = dictin
357
358        def get_value(self, key):
359                if self.has_key(key):
360                        return self[key]
361                else:
362                        return None
363
364        def __repr__(self):
365                return repr(self.data)
366
367        def __str__(self):
368                return str(self.data)
369
370        def __getattr__(self, name):
371                """
372                override the class attribute get method. Return the value
373                from the Userdict
374                """
375                try:
376                        return self.data[name]
377                except KeyError:
378                        error = 'Attribute key error: %s' %(name)
379                        raise PBSError(error)
380
381        ## Disabled for this moment, BvdV 16 July 2010
382        #
383        #def __setattr__(self, name, value):
384        #       """
385        #       override the class attribute set method only when the UserDict
386        #       has set its class attribute
387        #       """
388        #       if self.__dict__.has_key('data'):
389        #               self.data[name] = value
390        #       else:
391        #               self.__dict__[name] = value
392
393        def __iter__(self):
394                return iter(self.data.keys())
395
396        def uniq(self, list):
397                """Filter out unique items of a list"""
398                uniq_items = {}
399                for item in list:
400                        uniq_items[item] = 1
401                return uniq_items.keys()
402
403        def return_value(self, key):
404                """Function that returns a value independent of new or old data structure"""
405                if isinstance(self[key], types.ListType):
406                        return self[key][0]
407                else:
408                        return self[key] 
409
410class job(_PBSobject):
411        """PBS job class""" 
412        def is_running(self):
413
414                value = self.return_value('job_state')
415                if value == 'Q':
416                        return self.TRUE
417                else:
418                        return self.FALSE
419
420        def get_nodes(self, unique=None):
421                """
422                Returns a list of the nodes which run this job
423                format:
424                  * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
425                  * split on '+' and if uniq is set split on '/'
426                """
427                nodes = self.get_value('exec_host')
428               
429                if isinstance(nodes, str):
430                        if nodes:
431                                nodelist = string.split(nodes,'+')
432                                if not unique:
433                                        return nodelist
434                                else:
435                                        l = list()
436
437                                        for n in nodelist:
438                                                t = string.split(n,'/')
439                                                if t[0] not in l:
440                                                        l.append(t[0])
441
442                                        return l
443
444                        else:
445                                return list()
446                else:
447                                l = list()
448                                for n in nodes:
449
450                                        nlist = string.split(n,'+')
451
452                                        if unique:
453                                                for entry in nlist:
454
455                                                        t = string.split(entry,'/')
456                                                        if t[0] not in l:
457                                                                l.append(t[0])
458                                        else:
459                                                l += nlist
460
461                                return l
462               
463
464class node(_PBSobject):
465        """PBS node class"""
466       
467        def is_free(self):
468                """Check if node is free"""
469
470                value = self.return_value('state')
471                if value == 'free':
472                        return self.TRUE
473                else: 
474                        return self.FALSE
475
476        def has_job(self):
477                """Does the node run a job"""
478                try:
479                        a = self['jobs']
480                        return self.TRUE
481                except KeyError, detail:
482                        return self.FALSE
483       
484        def get_jobs(self, unique=None):
485                """Returns a list of the currently running job-id('s) on the node"""
486
487                jobs = self.get_value('jobs')
488                if jobs:       
489                        if isinstance(jobs, str):
490                                jlist = re.compile('[^\\ /]\\d+[^/.]').findall( jobs )
491                       
492                                if not unique:
493                                        return jlist
494                                else:
495                                        return self.uniq(jlist)
496
497                        else:
498                                job_re = re.compile('^(?:\d+/)?(.+)')
499                                l = list()
500
501                                if unique:
502                                                for j in jobs:
503                                                        jobstr = job_re.findall(j.strip())[0]
504                                                if jobstr not in l: 
505                                                        l.append(jobstr)           
506
507                                                return l
508                                else:
509                                                return jobs
510
511                return list()
512
513
514class queue(_PBSobject):
515        """PBS queue class"""
516        def is_enabled(self):
517
518                value = self.return_value('enabled')
519                if value == 'True':
520                        return self.TRUE
521                else:
522                        return self.FALSE
523
524        def is_execution(self):
525
526                value = self.return_value('queue_type')
527                if value == 'Execution':
528                        return self.TRUE
529                else:
530                        return self.FALSE
531
532class server(_PBSobject):
533        """PBS server class"""
534
535        def get_version(self):
536                return self.get_value('pbs_version')
537
538def main():
539        p = PBSQuery() 
540        serverinfo = p.get_serverinfo()
541        for server in serverinfo.keys():
542                print server, ' version: ', serverinfo[server].get_version()
543        for resource in serverinfo[server].keys():
544                print '\t ', resource, ' = ', serverinfo[server][resource]
545
546        queues = p.getqueues()
547        for queue in queues.keys():
548                print queue
549                if queues[queue].is_execution():
550                        print '\t ', queues[queue]
551                if queues[queue].has_key('acl_groups'):
552                        print '\t acl_groups: yes'
553                else:
554                        print '\t acl_groups: no'
555
556        jobs = p.getjobs()
557        for name,job in jobs.items():
558                if job.is_running():
559                        print job
560
561        l = ['state']
562        nodes = p.getnodes(l)
563        for name,node in nodes.items():
564                if node.is_free(): 
565                        print node
566
567if __name__ == "__main__":
568        main()
Note: See TracBrowser for help on using the repository browser.