source: trunk/src/PBSacct.py @ 329

Last change on this file since 329 was 286, checked in by bas, 11 years ago

PBSacct framework for accounting records, beta
list some changes

  • Property svn:executable set to *
  • Property svn:keywords set to Id URL
File size: 6.1 KB
Line 
1#!/usr/bin/env python
2#
3"""
4 Author: Bas van der Vlies
5 Date  : 1 June 2012
6 Desc. : Standard python module to parse accounting files for TORQUE
7
8 SVN Info:
9    $Id: PBSacct.py 286 2012-12-20 11:01:00Z bas $
10    $URL: trunk/src/PBSacct.py $
11
12Torque Info:
13 abort   Job has been aborted by the server
14  C   checkpoint  Job has been checkpointed and held
15  D   delete  Job has been deleted
16  E   exit    Job has exited (either successfully or unsuccessfully)
17  Q   queue   Job has been submitted/queued
18  R   rerun   Attempt to rerun the job has been made
19  S   start   Attempt to start the job has been made (if the job fails to properly start, it may have multiple job start records)
20  T   restart     Attempt to restart the job (from checkpoint) has been made (if the job fails to properly start, it may have multiple job start records)
21
22 ctime   Time job was created
23 etime   Time job became eligible to run
24 qtime   Time job was queued
25 start   Time job started to run
26
27  05/30/2012 23:59:46;E;6155805.batch1.irc.sara.nl;user=rvosmeer group=rvosmeer jo
28  bname=grid_ef_GROUP18 queue=serial ctime=1338411242 qtime=1338411242 etime=13384
29  11242 start=1338414827 owner=rvosmeer@login4.irc.sara.nl exec_host=gb-r3n8/7+gb-r3n8/6+gb-r3n8/5+gb-r3n8/4+gb-r3n8/3+gb-r3n8/2+gb-r3n8/1+gb-r3n8/0 Resource_List.arch=x86_64 Resource_List.ncpus=1 Resource_List.neednodes=1:cores8:ppn=8 Resource_List.nodect=1 Resource_List.nodes=1:cores8:ppn=8 Resource_List.walltime=00:20:00 session=22781 end=1338415186 Exit_status=0 resources_used.cput=00:05:49 resources_used.mem=238052kb resources_used.vmem=351124kb resources_used.walltime=00:06:00
30
31"""
32import os
33import string
34import re
35import sys
36
37def get_nodes(nodes, unique=None):
38    """
39    Returns a list of the nodes which run this job
40    format:
41    * exec_host: gb-r10n14/5+gb-r10n14/4+gb-r10n14/3+gb-r10n14/2+gb-r10n14/1+gb-r10n14/0
42    * split on '+' and if uniq is set split on '/'
43"""
44
45    if nodes:
46        nodelist = string.split(nodes,'+') 
47        if not unique:
48             return nodelist
49        else:
50            l = list()
51
52            for n in nodelist:
53                t = string.split(n,'/')
54                if t[0] not in l:
55                    l.append(t[0])
56
57            return l
58
59def get_racks( host_l ):
60
61    NODE_EXPR = "gb-r(?P<racknr>[0-9]+)n(?P<nodenr>[0-9]+)"
62
63    racks = list()
64    for h in host_l:
65        parts = re.search( r'%s' % NODE_EXPR, h, re.VERBOSE )
66
67        try:
68
69            racknr = parts.group( 'racknr' )
70            if not racknr in racks:
71                racks.append(racknr)
72
73        except Exception: 
74            pass
75
76        return racks
77           
78
79
80
81def parse_key_value( d, state, line):
82    """
83    user=rvosmeer group=rvosmeer jobname=FRAME_398_SPq.adf.job ...
84    a value can also contain a '=' character
85    """
86    key_value_pairs = line.split()
87
88    for entry in key_value_pairs:
89        key_value_l = entry.split('=')
90        k = key_value_l[0].strip()
91        v = ''.join(key_value_l[1:]).strip()
92        d[state][ k ] = v
93
94        if k in ['exec_host']:
95            d[state]['host_list'] = get_nodes( v, True)
96            d[state]['rack_list'] = get_racks( d[state]['host_list'] )
97
98
99def read_file(file, d):
100
101    fd = open(file, 'r')
102
103    go = True
104    while go:
105        line = fd.readline() 
106       
107        l = line.split(';') 
108       
109        if not line: 
110            break
111   
112        jobid = l[2].strip()
113        if not d.has_key( jobid ):
114            d[ jobid ] = dict()
115   
116
117        if l[1] == 'S':
118
119            try:
120                d[ jobid ]['start']['retry_count'] += 1
121                parse_key_value( d[ jobid ], 'start', ' '.join(l[3:])) 
122
123            except KeyError:
124                d[ jobid ]['start'] = dict()
125                d[ jobid ]['start']['retry_count'] = 0
126                parse_key_value( d[ jobid ], 'start', ' '.join(l[3:])) 
127
128        elif l[1] == 'C':
129            d[ jobid ]['checkpoint'] =  l[3:]
130            parse_key_value( d[ jobid ], 'checkpoint', ' '.join(l[3:])) 
131
132        elif l[1] == 'D':
133            d[ jobid ]['deleted'] = dict()
134            parse_key_value( d[ jobid ], 'deleted', ' '.join(l[3:])) 
135
136        elif l[1] == 'E':
137            d[ jobid ]['exit'] = dict()
138            parse_key_value( d[ jobid ], 'exit', ' '.join(l[3:])) 
139
140        elif l[1] == 'Q':
141            d[ jobid ]['queued'] = dict()
142            parse_key_value( d[ jobid ], 'queued', ' '.join(l[3:])) 
143
144        elif l[1] == 'R':
145            d[ jobid ]['rerun'] = dict()
146            parse_key_value( d[ jobid ], 'rerun', ' '.join(l[3:])) 
147
148        elif l[1] == 'T':
149            d[ jobid ]['restart'] = dict()
150            parse_key_value( d[ jobid ], 'restart', ' '.join(l[3:])) 
151
152
153
154serial_jobs = 0
155parallel_jobs = 0
156express_jobs = 0
157serial_jobs_on_parallel_node = 0
158total_jobs = 0
159
160jobs = dict()
161parallel_job_len = dict()
162
163for f in sys.argv[1:] :
164
165    read_file(f, jobs)
166
167
168#for j in jobs.keys():
169#
170#    if jobs[j].has_key('restart'):
171#        print j, jobs[j]['restart']
172
173for j in jobs.keys():
174
175    if jobs[j].has_key('exit'):
176
177        record = jobs[j]['exit']
178
179        total_jobs += 1
180
181        if record['queue'] in [ 'serial']:
182            serial_jobs += 1 
183
184            if record['rack_list'][0] in [ '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13' ]:
185                serial_jobs_on_parallel_node += 1
186
187        elif record['queue'] in [ 'parallel']:
188            parallel_jobs += 1
189            try:
190                number_of_hosts = len(record['host_list']) 
191                parallel_job_len[ number_of_hosts ]['number'] += 1
192                #parallel_job_len[ number_of_hosts ] ['walltime'].append(record['Resource_List.walltime'])
193
194            except KeyError:
195                parallel_job_len[ number_of_hosts ] = dict()
196                parallel_job_len[ number_of_hosts ] ['number'] = 1
197                #parallel_job_len[ number_of_hosts ] ['walltime'] = list()
198                #parallel_job_len[ number_of_hosts ] ['walltime'].append(record['Resource_List.walltime'])
199
200        elif record['queue'] in [ 'express']:
201            express_jobs += 1
202
203
204
205print 'total_jobs :', total_jobs
206print 'serial_jobs :', serial_jobs
207print 'parallel_jobs :', parallel_jobs
208print 'express_jobs :', express_jobs
209print 'serial_jobs_on_parallel_node :', serial_jobs_on_parallel_node
210
211for p in parallel_job_len:
212    print p, parallel_job_len[p]
Note: See TracBrowser for help on using the repository browser.