source: trunk/plugin/togap.py @ 61

Last change on this file since 61 was 61, checked in by bastiaans, 19 years ago

plugin/togap.py:

  • Added initial setup for transmission of jobinfo
  • Not sure wether a extra direct connection to data server is needed or not. Will mean transmitting the same data twice by different means -> not logical.


Will contemplate on this..

File size: 5.1 KB
Line 
1#!/usr/bin/env python
2
3# Specify debugging level here;
4#
5DEBUG_LEVEL = 10
6
7# If set to 1, in addition to multicasting with gmetric,
8# also transmit jobinfo data to a Toga server for archival
9#
10ARCHIVE_MODE = 0
11
12# Where is the toga server at
13#
14#TOGA_SERVER = 'monitor2.irc.sara.nl:9048'
15
16# Wether or not to run as a daemon in background
17#
18DAEMONIZE = 0
19
20# Allows to specify alternate config
21#
22#GMOND_CONF = '/etc/gmondconf'
23
24from PBSQuery import PBSQuery
25import sys
26import time
27
28class DataProcessor:
29
30        binary = '/usr/bin/gmetric'
31
32        def __init__( self, binary=None ):
33
34                if binary:
35                        self.binary = binary
36
37        def multicastGmetric( self, metricname, metricval, tmax ):
38
39                cmd = binary
40
41                try:
42                        cmd = cmd + ' -c' + GMOND_CONF
43                except NameError:
44                        debug_msg( 8, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' )
45
46                cmd = cmd + ' -n' + metricname + ' -v' + metricval + ' -t' + tmax
47
48                print cmd
49                #os.system( cmd )
50
51        def togaSubmitJob( self, jobid, jobattrs ):
52
53                pass
54
55class PBSDataGatherer:
56
57        jobs = { }
58
59        def __init__( self ):
60
61                self.pq = PBSQuery()
62                self.jobs = { }
63                self.dp = DataProcessor()
64
65        def getAttr( self, attrs, name ):
66
67                if attrs.has_key( name ):
68                        return attrs[name]
69                else:
70                        return ''
71
72        def jobDataChanged( self, jobs, job_id, attrs ):
73
74                if jobs.has_key( job_id ):
75                        oldData = jobs[ job_id ]       
76                else:
77                        return 1
78
79                for name, val in attrs.items():
80
81                        if oldData.has_key( name ):
82
83                                if oldData[ name ] != attrs[ name ]:
84
85                                        return 1
86
87                        else:
88                                return 1
89
90                return 0
91
92        def getJobData( self ):
93
94                jobs = self.jobs[:]
95
96                joblist = self.pq.getjobs()
97
98                jobs_processed = [ ]
99
100                for name, attrs in joblist.items():
101
102                        job_id = name.split( '.' )[0]
103
104                        jobs_processed.append( job_id )
105
106                        name = self.getAttr( attrs, 'Job_Name' )
107                        queue = self.getAttr( attrs, 'queue' )
108                        owner = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
109                        requested_time = self.getAttr( attrs, 'Resource_List.walltime' )
110                        requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
111                        mynoderequest = self.getAttr( attrs, 'Resource_List.nodes' )
112                        if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
113                                ppn = mynoderequest.split( ':' )[1].split( 'ppn=' )[1]
114                        else:
115                                ppn = ''
116                        status = self.getAttr( attrs, 'job_state' )
117                        start_timestamp = self.getAttr( attrs, 'mtime' )
118                        stop_timestamp = ''
119
120                        myAttrs = { }
121                        myAttrs['name'] = name
122                        myAttrs['queue'] = queue
123                        myAttrs['owner'] = owner
124                        myAttrs['requested_time'] = requested_time
125                        myAttrs['requested_memory'] = requested_memory
126                        myAttrs['ppn'] = ppn
127                        myAttrs['status'] = status
128                        myAttrs['start_timestamp'] = start_timestamp
129                        myAttrs['stop_timestamp'] = stop_timestamp
130
131                        if self.jobDataChanged( jobs, job_id, myAttrs ):
132                                jobs[ job_id ] = myAttrs
133
134                                self.printJob( jobs, job_id )
135
136                                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
137
138                for id, attrs in jobs.items():
139
140                        # This job was there in the last run, and not anymore
141                        # it must have finished
142
143                        if id not in jobs_processed and attrs['stop_timestamp'] == '':
144
145                                jobs[ id ]['status'] = 'F'
146                                jobs[ id ]['stop_timestamp'] = time.time()
147                                debug_msg( 10, printTime() + ' job %s finished' %(id) )
148                                self.printJob( jobs, id )
149
150                # Now let's spread the knowledge
151                #
152                for jobid, jobattrs in jobs.items():
153
154                        if ARCHIVE_MODE:
155
156                                if self.jobDataChanged( self.jobs, jobid, jobattrs ):
157
158                                        self.dp.togaSubmitJob( jobid, jobattrs )
159
160                        self.dp.multicastGmetric( jobid, jobattrs )
161                                       
162                self.jobs = jobs
163
164        def printJobs( self, jobs ):
165       
166                for name, attrs in self.jobs.items():
167
168                        print 'job %s' %(name)
169
170                        for name, val in attrs.items():
171
172                                print '\t%s = %s' %( name, val )
173
174        def printJob( self, jobs, job_id ):
175
176                print 'job %s' %(job_id)
177
178                for name, val in self.jobs[ job_id ].items():
179
180                        print '\t%s = %s' %( name, val )
181
182        def daemon( self ):
183                "Run as daemon forever"
184
185                self.DAEMON = 1
186
187                # Fork the first child
188                #
189                pid = os.fork()
190                if pid > 0:
191                        sys.exit(0)  # end parrent
192
193                # creates a session and sets the process group ID
194                #
195                os.setsid()
196
197                # Fork the second child
198                #
199                pid = os.fork()
200                if pid > 0:
201                        sys.exit(0)  # end parrent
202
203                # Go to the root directory and set the umask
204                #
205                os.chdir('/')
206                os.umask(0)
207
208                sys.stdin.close()
209                sys.stdout.close()
210                sys.stderr.close()
211
212                os.open('/dev/null', 0)
213                os.dup(0)
214                os.dup(0)
215
216                self.run()
217
218        def run( self ):
219                "Main thread"
220
221                while ( 1 ):
222               
223                        self.getJobData()
224                        time.sleep( 1 ) 
225
226def printTime( ):
227
228        return time.strftime("%a, %d %b %Y %H:%M:%S")
229
230def debug_msg( level, msg ):
231
232        if (DEBUG_LEVEL >= level):
233                        sys.stderr.write( msg + '\n' )
234
235def main():
236
237        gather = PBSDataGatherer()
238        if DAEMONIZE:
239                gather.daemon()
240        else:
241                gather.run()
242
243if __name__ == '__main__':
244        main()
Note: See TracBrowser for help on using the repository browser.