source: trunk/src/PBSacct.py @ 284

Last change on this file since 284 was 284, checked in by bas, 12 years ago

added rough example framework for PBSacct.py

  • Property svn:executable set to *
  • Property svn:keywords set to Id URL
File size: 6.2 KB
Line 
1#!/usr/bin/env python
2#
3"""
4 Author: Bas van der Vlies
5 Date  : 1 June 2012
6 Desc. : Standard python module to parse accounting files for TORQUE
7
8 SVN Info:
9    $Id: PBSacct.py 284 2012-06-01 14:00:09Z bas $
10    $URL: trunk/src/PBSacct.py $
11
12Torque Info:
13 abort   Job has been aborted by the server
14  C   checkpoint  Job has been checkpointed and held
15  D   delete  Job has been deleted
16  E   exit    Job has exited (either successfully or unsuccessfully)
17  Q   queue   Job has been submitted/queued
18  R   rerun   Attempt to rerun the job has been made
19  S   start   Attempt to start the job has been made (if the job fails to properly start, it may have multiple job start records)
20  T   restart     Attempt to restart the job (from checkpoint) has been made (if the job fails to properly start, it may have multiple job start records)
21
22 ctime   Time job was created
23 etime   Time job became eligible to run
24 qtime   Time job was queued
25 start   Time job started to run
26
27  05/30/2012 23:59:46;E;6155805.batch1.irc.sara.nl;user=rvosmeer group=rvosmeer jo
28  bname=grid_ef_GROUP18 queue=serial ctime=1338411242 qtime=1338411242 etime=13384
29  11242 start=1338414827 owner=rvosmeer@login4.irc.sara.nl exec_host=gb-r3n8/7+gb-r3n8/6+gb-r3n8/5+gb-r3n8/4+gb-r3n8/3+gb-r3n8/2+gb-r3n8/1+gb-r3n8/0 Resource_List.arch=x86_64 Resource_List.ncpus=1 Resource_List.neednodes=1:cores8:ppn=8 Resource_List.nodect=1 Resource_List.nodes=1:cores8:ppn=8 Resource_List.walltime=00:20:00 session=22781 end=1338415186 Exit_status=0 resources_used.cput=00:05:49 resources_used.mem=238052kb resources_used.vmem=351124kb resources_used.walltime=00:06:00
30
31"""
32import os
33import string
34import re
35import sys
36
37def get_nodes(nodes, unique=None):
38    """
39    Returns a list of the nodes which run this job
40    format:
41    * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
42    * split on '+' and if uniq is set split on '/'
43"""
44
45    if nodes:
46        nodelist = string.split(nodes,'+') 
47        if not unique:
48             return nodelist
49        else:
50            l = list()
51
52            for n in nodelist:
53                t = string.split(n,'/')
54                if t[0] not in l:
55                    l.append(t[0])
56
57            return l
58
59def get_racks( host_l ):
60
61    NODE_EXPR = "gb-r(?P<racknr>[0-9]+)n(?P<nodenr>[0-9]+)"
62
63    racks = list()
64    for h in host_l:
65        parts = re.search( r'%s' % NODE_EXPR, h, re.VERBOSE )
66
67        try:
68
69            racknr = parts.group( 'racknr' )
70            if not racknr in racks:
71                racks.append(racknr)
72
73        except Exception: 
74            pass
75
76        return racks
77           
78
79
80
81def parse_key_value( d, state, line):
82    """
83    user=rvosmeer group=rvosmeer jobname=FRAME_398_SPq.adf.job ...
84    a value can also contain a '=' character
85    """
86    key_value_pairs = line.split()
87
88    for entry in key_value_pairs:
89        key_value_l = entry.split('=')
90        k = key_value_l[0].strip()
91        v = ''.join(key_value_l[1:]).strip()
92        d[state][ k ] = v
93
94        if k in ['exec_host']:
95            d[state]['host_list'] = get_nodes( v, True)
96            d[state]['rack_list'] = get_racks( d[state]['host_list'] )
97
98
99def read_file(file, d):
100
101    fd = open(file, 'r')
102
103    go = True
104    while go:
105        line = fd.readline() 
106       
107        l = line.split(';') 
108       
109        if not line: 
110            break
111   
112        jobid = l[2].strip()
113        if not d.has_key( jobid ):
114            d[ jobid ] = dict()
115   
116
117        if l[1] == 'S':
118
119            try:
120                d[ jobid ]['start']['retry_count'] += 1
121                parse_key_value( d[ jobid ], 'start', ' '.join(l[3:])) 
122
123            except KeyError:
124                d[ jobid ]['start'] = dict()
125                d[ jobid ]['start']['retry_count'] = 0
126                parse_key_value( d[ jobid ], 'start', ' '.join(l[3:])) 
127
128        elif l[1] == 'C':
129            d[ jobid ]['checkpoint'] =  l[3:]
130            parse_key_value( d[ jobid ], 'checkpoint', ' '.join(l[3:])) 
131
132        elif l[1] == 'D':
133            d[ jobid ]['deleted'] = dict()
134            parse_key_value( d[ jobid ], 'deleted', ' '.join(l[3:])) 
135
136        elif l[1] == 'E':
137            d[ jobid ]['exit'] = dict()
138            parse_key_value( d[ jobid ], 'exit', ' '.join(l[3:])) 
139
140        elif l[1] == 'Q':
141            d[ jobid ]['queued'] = dict()
142            parse_key_value( d[ jobid ], 'queued', ' '.join(l[3:])) 
143
144        elif l[1] == 'R':
145            d[ jobid ]['rerun'] = dict()
146            parse_key_value( d[ jobid ], 'rerun', ' '.join(l[3:])) 
147
148        elif l[1] == 'T':
149            d[ jobid ]['restart'] = dict()
150            parse_key_value( d[ jobid ], 'restart', ' '.join(l[3:])) 
151
152
153
154serial_jobs = 0
155parallel_jobs = 0
156express_jobs = 0
157serial_jobs_on_parallel_node = 0
158total_jobs = 0
159
160jobs = dict()
161parallel_job_len = dict()
162
163for f in sys.argv[1:] :
164
165    read_file(f, jobs)
166
167
168for j in jobs.keys():
169
170    if jobs[j].has_key('restart'):
171        print j, jobs[j]['restart']
172
173sys.exit(0)
174for j in jobs.keys():
175
176    if jobs[j].has_key('exit'):
177
178        record = jobs[j]['exit']
179
180        total_jobs += 1
181
182        if record['queue'] in [ 'serial']:
183            serial_jobs += 1 
184
185            if record['rack_list'][0] in [ '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13' ]:
186                serial_jobs_on_parallel_node += 1
187
188        elif record['queue'] in [ 'parallel']:
189            parallel_jobs += 1
190            try:
191                number_of_hosts = len(record['host_list']) 
192                parallel_job_len[ number_of_hosts ]['number'] += 1
193                #parallel_job_len[ number_of_hosts ] ['walltime'].append(record['Resource_List.walltime'])
194
195            except KeyError:
196                parallel_job_len[ number_of_hosts ] = dict()
197                parallel_job_len[ number_of_hosts ] ['number'] = 1
198                #parallel_job_len[ number_of_hosts ] ['walltime'] = list()
199                #parallel_job_len[ number_of_hosts ] ['walltime'].append(record['Resource_List.walltime'])
200
201        elif record['queue'] in [ 'express']:
202            express_jobs += 1
203
204
205
206print 'total_jobs :', total_jobs
207print 'serial_jobs :', serial_jobs
208print 'parallel_jobs :', parallel_jobs
209print 'express_jobs :', express_jobs
210print 'serial_jobs_on_parallel_node :', serial_jobs_on_parallel_node
211
212for p in parallel_job_len:
213    print p, parallel_job_len[p]
Note: See TracBrowser for help on using the repository browser.