- Timestamp:
- 04/01/05 16:50:54 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r31 r32 63 63 "Parse Ganglia's XML" 64 64 65 metrics = [ ] 65 def __init__( self ): 66 self.metrics = [ ] 67 self.clusters = { } 66 68 67 69 def startElement( self, name, attrs ): … … 69 71 70 72 if name == 'GANGLIA_XML': 71 self.XMLSource = attrs.get('SOURCE',"") 72 self.gangliaVersion = attrs.get('VERSION',"") 73 74 self.XMLSource = attrs.get( 'SOURCE', "" ) 75 self.gangliaVersion = attrs.get( 'VERSION', "" ) 76 73 77 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) 74 78 75 79 elif name == 'GRID': 76 self.gridName = attrs.get('NAME',"") 77 self.time = attrs.get('LOCALTIME',"") 80 81 self.gridName = attrs.get( 'NAME', "" ) 82 self.time = attrs.get( 'LOCALTIME', "" ) 83 78 84 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) 79 85 80 86 elif name == 'CLUSTER': 81 self.clusterName = attrs.get('NAME',"") 82 self.time = attrs.get('LOCALTIME',"") 83 self.rrd = RRDHandler( self.clusterName ) 87 88 self.clusterName = attrs.get( 'NAME', "" ) 89 self.time = attrs.get( 'LOCALTIME', "" ) 90 91 self.clusters[ self.clusterName ] = RRDHandler( self.clusterName ) 92 84 93 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 85 94 86 95 elif name == 'HOST' and self.clusterName in ARCHIVE_SOURCES: 87 self.hostName = attrs.get('NAME',"") 88 self.host Ip = attrs.get('IP',"")89 self.host Reported = attrs.get('REPORTED',"")90 # Reset the metrics list for each host91 self.metrics = [ ] 96 97 self.hostName = attrs.get( 'NAME', "" ) 98 self.hostIp = attrs.get( 'IP', "" ) 99 self.hostReported = attrs.get( 'REPORTED', "" ) 100 92 101 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) 93 102 94 103 elif name == 'METRIC' and self.clusterName in ARCHIVE_SOURCES: 95 myMetric = { } 96 myMetric['name'] = attrs.get('NAME',"") 97 myMetric['val'] = attrs.get('VAL',"") 98 myMetric['time'] = self.hostReported 99 myMetric['type'] = attrs.get('TYPE',"") 100 101 self.metrics.append( myMetric ) 104 105 type = attrs.get( 'TYPE', "" ) 106 107 if type not in UNSUPPORTED_ARCHIVE_TYPES: 108 109 myMetric = { } 110 myMetric['name'] = attrs.get( 'NAME', "" ) 111 myMetric['val'] = attrs.get( 'VAL', "" ) 112 myMetric['time'] = self.hostReported 113 114 self.clusters[ self.clusterName ].memMetric( self.hostname, myMetric ) 115 102 116 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 103 104 return105 106 def endElement( self, name ):107 #if name == 'GANGLIA_XML':108 109 #if name == 'GRID':110 111 #if name == 'CLUSTER':112 113 if name == 'HOST' and self.clusterName in ARCHIVE_SOURCES:114 115 # Determine time here, so all use same time in this run116 mytime = self.rrd.makeTimeSerial()117 correct_serial = self.rrd.checkNewRrdPeriod( self.hostName, mytime )118 119 #debug_msg( 8, 'time %s: Storing metrics for %s' %(mytime, self.hostName) )120 print 'Storing metrics for %s:' %(self.hostName),121 self.storeMetrics( self.hostName, correct_serial )122 123 #if name == 'METRIC':124 117 125 118 def storeMetrics( self, hostname, timeserial ): … … 129 122 130 123 self.rrd.createCheck( hostname, metric, timeserial ) 131 print ' [%s.%s]' %(metric['name'], metric['time']),132 124 self.rrd.update( hostname, metric, timeserial ) 133 125 debug_msg( 9, 'stored metric %s for %s: %s' %( hostname, metric['name'], metric['val'] ) ) 134 126 #sys.exit(1) 135 print136 137 127 138 128 class GangliaXMLGatherer: … … 156 146 157 147 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): 148 158 149 af, socktype, proto, canonname, sa = res 150 159 151 try: 152 160 153 self.s = socket.socket( af, socktype, proto ) 154 161 155 except socket.error, msg: 156 162 157 self.s = None 163 158 continue 159 164 160 try: 161 165 162 self.s.connect( sa ) 163 166 164 except socket.error, msg: 165 167 166 self.s.close() 168 167 self.s = None 169 168 continue 169 170 170 break 171 171 172 172 if self.s is None: 173 173 174 print 'Could not open socket' 174 175 sys.exit(1) … … 186 187 # 187 188 pid = os.fork() 189 188 190 if pid > 0: 189 sys.exit(0) # end parrent 191 192 sys.exit(0) # end parent 190 193 191 194 # creates a session and sets the process group ID … … 196 199 # 197 200 pid = os.fork() 201 198 202 if pid > 0: 199 sys.exit(0) # end parrent 203 204 sys.exit(0) # end parent 200 205 201 206 # Go to the root directory and set the umask … … 221 226 "Main thread" 222 227 223 while 228 while( 1 ): 224 229 225 230 debug_msg( 7, self.printTime() + ' - mainthread() - xmlthread() started' ) … … 268 273 269 274 def __init__( self, config ): 275 270 276 self.config = config 271 277 self.parseValues() 272 278 273 def parseValues( self):279 def parseValues( self ): 274 280 "Parse certain values from gmetad.conf" 275 281 … … 291 297 292 298 for letter in word: 299 293 300 if letter not in string.digits: 301 294 302 valid_interval = 0 295 303 296 304 if valid_interval and len(word) > 0: 305 297 306 source['interval'] = word 298 307 debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) ) … … 300 309 # No interval found, use Ganglia's default 301 310 if not source.has_key( 'interval' ): 311 302 312 source['interval'] = 15 303 313 debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) ) … … 306 316 307 317 def getInterval( self, source_name ): 318 308 319 for source in self.sources: 320 309 321 if source['name'] == source_name: 322 310 323 return source['interval'] 324 311 325 return None 312 326 313 327 class RRDHandler: 328 329 myMetrics = { } 314 330 315 331 def __init__( self, cluster ): … … 317 333 self.gmetad_conf = GangliaConfigParser( GMETAD_CONF ) 318 334 335 def getClusterName( self ): 336 return self.cluster 337 338 def memMetric( self, host, metric ): 339 340 for m in self.myMetrics[ host ]: 341 342 if m['time'] == metric['time']: 343 344 # Allready have this metric, abort 345 return 1 346 347 if not self.myMetrics.has_key( host ): 348 349 self.myMetrics[ host ] = { } 350 351 if not self.myMetrics[ host ].has_key( metric['name'] ): 352 353 self.myMetrics[ host ][ metric['name'] ] = [ ] 354 355 self.myMetrics[ host ][ metric['name'] ].append( metric ) 356 357 def makeUpdateString( self, host, metric ): 358 359 update_string = '' 360 361 for m in self.myMetrics[ host ][ metric['name'] ]: 362 363 update_string = update_string + ' %s:%s' %( metric['time'], metric['val'] ) 364 365 return update_string 366 319 367 def makeTimeSerial( self ): 368 "Generate a time serial. Seconds since epoch" 320 369 321 370 # YYYYMMDDhhmmss: 20050321143411 … … 328 377 329 378 def makeRrdPath( self, host, metric=None, timeserial=None ): 379 """ 380 Make a RRD location/path and filename 381 If a metric or timeserial are supplied the complete locations 382 will be made, else just the host directory 383 """ 330 384 331 385 if not timeserial: … … 341 395 342 396 def getLastRrdTimeSerial( self, host ): 397 """ 398 Find the last timeserial (directory) for this host 399 This is determined once every host 400 """ 343 401 344 402 rrd_dir, rrd_file = self.makeRrdPath( host ) … … 347 405 348 406 if os.path.exists( rrd_dir ): 407 349 408 for root, dirs, files in os.walk( rrd_dir ): 350 409 … … 368 427 369 428 def checkNewRrdPeriod( self, host, current_timeserial ): 429 """ 430 Check if current timeserial belongs to recent time period 431 or should become a new period (and file). 432 433 Returns the serial of the correct time period 434 """ 370 435 371 436 last_timeserial = int( self.getLastRrdTimeSerial( host ) ) … … 426 491 427 492 try: 493 428 494 rrdtool.update( str(rrd_file), str(update_string) ) 495 429 496 except rrdtool.error, detail: 497 430 498 debug_msg( 0, 'EXCEPTION! While trying to update rrd:' ) 431 499 debug_msg( 0, '\trrd %s with %s' %( str(rrd_file), update_string ) ) 432 500 debug_msg( 0, str(detail) ) 501 433 502 sys.exit( 1 ) 434 503
Note: See TracChangeset
for help on using the changeset viewer.