Changeset 65 for trunk/plugin
- Timestamp:
- 04/14/05 12:10:51 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/plugin/togap.py
r64 r65 19 19 import sys 20 20 import time 21 import os 22 import string 21 23 22 24 class DataProcessor: … … 29 31 self.binary = binary 30 32 31 def multicastGmetric( self, metricname, metricval, tmax ): 32 33 cmd = binary 33 self.dmax = TORQUE_POLL_INTERVAL 34 35 #incompatible = self.checkGmetricVersion 36 incompatible = 0 37 38 if incompatible: 39 debug_msg( 0, 'Gmetric version not compatible, pls upgrade to at least 3.0.1' ) 40 sys.exit( 1 ) 41 42 def checkGmetricVersion( self ): 43 44 for line in os.popen( self.binary + ' --version' ).readlines(): 45 46 line = line.split( ' ' ) 47 48 if len( line ) == 2 and line.find( 'gmetric' ) != -1: 49 50 gmetric_version = line[1] 51 52 version_major = int( gemtric_version.split( '.' )[0] ) 53 version_minor = int( gemtric_version.split( '.' )[1] ) 54 version_patch = int( gemtric_version.split( '.' )[2] ) 55 56 incompatible = 0 57 58 if version_major < 3: 59 60 incompatible = 1 61 62 elif version_major == 3: 63 64 if version_minor == 0: 65 66 if version_patch < 1: 67 68 incompatbiel = 1 69 70 return incompatible 71 72 def multicastGmetric( self, metricname, metricval, tmax='15' ): 73 74 cmd = self.binary 34 75 35 76 try: … … 38 79 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (ommitting)' ) 39 80 40 cmd = cmd + ' -n' + metricname + ' -v ' + metricval + ' -t' + tmax81 cmd = cmd + ' -n' + metricname + ' -v"' + metricval + '" -t' + tmax + ' -d' + str( self.dmax ) 41 82 42 83 print cmd … … 80 121 return 0 81 122 82 def getJobData( self ): 83 84 jobs = self.jobs[:] 123 def getJobData( self, known_jobs ): 124 125 if len( known_jobs ) > 0: 126 jobs = known_jobs 127 else: 128 jobs = { } 85 129 86 130 joblist = self.pq.getjobs() … … 106 150 status = self.getAttr( attrs, 'job_state' ) 107 151 start_timestamp = self.getAttr( attrs, 'mtime' ) 108 stop_timestamp = ''152 #stop_timestamp = '' 109 153 110 154 myAttrs = { } … … 117 161 myAttrs['status'] = status 118 162 myAttrs['start_timestamp'] = start_timestamp 119 myAttrs['stop_timestamp'] = stop_timestamp163 #myAttrs['stop_timestamp'] = stop_timestamp 120 164 121 165 if self.jobDataChanged( jobs, job_id, myAttrs ): … … 126 170 debug_msg( 10, printTime() + ' job %s state changed' %(job_id) ) 127 171 128 for id, attrs in jobs.items(): 129 130 # This job was there in the last run, and not anymore 131 # it must have finished 132 133 if id not in jobs_processed and attrs['stop_timestamp'] == '': 134 135 jobs[ id ]['status'] = 'F' 136 jobs[ id ]['stop_timestamp'] = time.time() 137 debug_msg( 10, printTime() + ' job %s finished' %(id) ) 138 self.printJob( jobs, id ) 172 #for id, attrs in jobs.items(): 173 174 # # This job was there in the last run, and not anymore 175 # # it must have finished 176 177 # if id not in jobs_processed and attrs['stop_timestamp'] == '': 178 179 # jobs[ id ]['status'] = 'F' 180 # jobs[ id ]['stop_timestamp'] = time.time() 181 # debug_msg( 10, printTime() + ' job %s finished' %(id) ) 182 # self.printJob( jobs, id ) 183 184 return jobs 185 186 def submitJobData( self, jobs ): 187 """Submit job info list""" 188 189 time_now = time.time() 190 191 self.dp.multicastGmetric( 'TOGA-HEARTBEAT', str( time_now ) ) 139 192 140 193 # Now let's spread the knowledge … … 142 195 for jobid, jobattrs in jobs.items(): 143 196 144 if ARCHIVE_MODE: 145 146 if self.jobDataChanged( self.jobs, jobid, jobattrs ): 147 148 self.dp.togaSubmitJob( jobid, jobattrs ) 149 150 self.dp.multicastGmetric( jobid, jobattrs ) 151 152 self.jobs = jobs 197 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 198 199 for val in gmetric_val: 200 self.dp.multicastGmetric( 'TOGA-JOB-' + jobid, val ) 201 202 def compileGmetricVal( self, jobid, jobattrs ): 203 """Create a val string for gmetric of jobinfo""" 204 205 name_str = 'name=' + jobattrs['name'] 206 queue_str = 'queue=' + jobattrs['queue'] 207 owner_str = 'owner=' + jobattrs['owner'] 208 rtime_str = 'rtime=' + jobattrs['requested_time'] 209 rmem_str = 'rmem=' + jobattrs['requested_memory'] 210 ppn_str = 'ppn=' + jobattrs['ppn'] 211 status_str = 'status=' + jobattrs['status'] 212 stime_str = 'stime=' + jobattrs['start_timestamp'] 213 214 appendList = [ name_str, queue_str, owner_str, rtime_str, rmem_str, ppn_str, status_str, stime_str ] 215 216 return self.makeAppendLists( appendList ) 217 218 def makeAppendLists( self, append_list ): 219 220 app_lists = [ ] 221 222 mystr = None 223 224 for val in append_list: 225 226 if not mystr: 227 mystr = val 228 else: 229 if not self.checkValAppendMaxSize( mystr, val ): 230 mystr = mystr + ' ' + val 231 else: 232 # Too big, new appenlist 233 app_lists.append( mystr ) 234 mystr = val 235 236 app_lists.append( mystr ) 237 238 return app_lists 239 240 def checkValAppendMaxSize( self, val, text ): 241 """Check if val + text size is not above 1400 (max msg size)""" 242 243 if len( val + text ) > 1400: 244 return 1 245 else: 246 return 0 153 247 154 248 def printJobs( self, jobs ): 155 249 """Print a jobinfo overview""" 250 156 251 for name, attrs in self.jobs.items(): 157 252 … … 163 258 164 259 def printJob( self, jobs, job_id ): 260 """Print job with job_id from jobs""" 165 261 166 262 print 'job %s' %(job_id) 167 263 168 for name, val in self.jobs[ job_id ].items():264 for name, val in jobs[ job_id ].items(): 169 265 170 266 print '\t%s = %s' %( name, val ) 171 267 172 268 def daemon( self ): 173 "Run as daemon forever" 174 175 self.DAEMON = 1 269 """Run as daemon forever""" 176 270 177 271 # Fork the first child … … 207 301 208 302 def run( self ): 209 " Main thread"303 """Main thread""" 210 304 211 305 while ( 1 ): 212 306 213 self.getJobData() 307 self.jobs = self.getJobData( self.jobs ) 308 self.submitJobData( self.jobs ) 214 309 time.sleep( TORQUE_POLL_INTERVAL ) 215 310 216 311 def printTime( ): 312 """Print current time/date in human readable format for log/debug""" 217 313 218 314 return time.strftime("%a, %d %b %Y %H:%M:%S") 219 315 220 316 def debug_msg( level, msg ): 317 """Print msg if at or above current debug level""" 221 318 222 319 if (DEBUG_LEVEL >= level): … … 224 321 225 322 def main(): 323 """Application start""" 226 324 227 325 gather = PBSDataGatherer() … … 231 329 gather.run() 232 330 331 # w00t someone started me 332 # 233 333 if __name__ == '__main__': 234 334 main()
Note: See TracChangeset
for help on using the changeset viewer.