Index: branches/1.2/jobarchived/jobarchived.py
===================================================================
--- branches/1.2/jobarchived/jobarchived.py (revision 948)
+++ branches/1.2/jobarchived/jobarchived.py (revision 949)
@@ -23,4 +23,72 @@
import getopt, syslog, ConfigParser, sys
+from collections import deque
+import time
+from pprint import pprint
+#import yappi
+
+#yappi.start()
+
+try:
+ from resource import getrusage, RUSAGE_SELF
+except ImportError:
+ RUSAGE_SELF = 0
+ def getrusage(who=0):
+ return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0
+
+p_stats = None
+p_start_time = None
+
+def profiler(frame, event, arg):
+ if event not in ('call','return'): return profiler
+ #### gather stats ####
+ rusage = getrusage(RUSAGE_SELF)
+ t_cpu = rusage[0] + rusage[1] # user time + system time
+ code = frame.f_code
+ fun = (code.co_name, code.co_filename, code.co_firstlineno)
+ #### get stack with functions entry stats ####
+ ct = threading.currentThread()
+ try:
+ p_stack = ct.p_stack
+ except AttributeError:
+ ct.p_stack = deque()
+ p_stack = ct.p_stack
+ #### handle call and return ####
+ if event == 'call':
+ p_stack.append((time.time(), t_cpu, fun))
+ elif event == 'return':
+ try:
+ t,t_cpu_prev,f = p_stack.pop()
+ assert f == fun
+ except IndexError: # TODO investigate
+ t,t_cpu_prev,f = p_start_time, 0.0, None
+ call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))
+ p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)
+ return profiler
+
+
+def profile_on():
+ global p_stats, p_start_time
+ p_stats = {}
+ p_start_time = time.time()
+ threading.setprofile(profiler)
+ sys.setprofile(profiler)
+
+ debug_msg( 1, 'profile_on(): profiling..' )
+
+def profile_off():
+ threading.setprofile(None)
+ sys.setprofile(None)
+ debug_msg( 1, 'profile_on(): profiling ended..' )
+
+def get_profile_stats():
+ """
+ returns dict[function_tuple] -> stats_tuple
+ where
+ function_tuple = (function_name, filename, lineno)
+ stats_tuple = (call_cnt, real_time, cpu_time)
+ """
+ debug_msg( 1, 'get_profile_stats(): dumping stats..' )
+ return p_stats
VERSION='__VERSION__'
@@ -920,6 +988,9 @@
for j in timedout_jobs:
- del self.jobAttrs[ j ]
- del self.jobAttrsSaved[ j ]
+ try:
+ del self.jobAttrs[ j ]
+ del self.jobAttrsSaved[ j ]
+ except KeyError:
+ pass
debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
@@ -932,6 +1003,4 @@
"""
- global ARCHIVE_DATASOURCES
-
jobinfo = { }
@@ -973,4 +1042,5 @@
self.jobs_processed.append( job_id )
+
def endDocument( self ):
@@ -1132,16 +1202,18 @@
"""Parse Ganglia's XML"""
- def __init__( self, config, datastore ):
+ def __init__( self, config, datastore, cluster ):
"""Setup initial variables and gather info on existing rrd archive"""
self.config = config
- self.clusters = { }
+ self.clusterName = cluster
self.ds = datastore
-
- debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' )
- self.gatherClusters()
- debug_msg( 1, 'Housekeeping: RRD check complete.' )
-
- def gatherClusters( self ):
+ self.rrd_handler = None
+ self.cluster_start = False
+
+ debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName )
+ self.gatherCluster()
+ debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName )
+
+ def gatherCluster( self ):
"""Find all existing clusters in archive dir"""
@@ -1156,107 +1228,106 @@
dirlist = os.listdir( archive_dir )
- for cfgcluster in ARCHIVE_DATASOURCES:
-
- if cfgcluster not in dirlist:
-
- # Autocreate a directory for this cluster
- # assume it is new
- #
- cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
-
- os.mkdir( cluster_dir )
-
- dirlist.append( cfgcluster )
-
- for item in dirlist:
-
- clustername = item
-
- if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
-
- self.clusters[ clustername ] = RRDHandler( self.config, clustername )
-
- debug_msg( 9, 'Found cluster dir: %s' %( clustername ) )
-
- debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" )
+ if self.clusterName not in dirlist:
+
+ # Autocreate a directory for this cluster
+ # assume it is new
+ #
+ cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
+
+ os.mkdir( cluster_dir )
+
+ dirlist.append( cfgcluster )
+
+ for d in dirlist:
+
+ if not self.rrd_handler and d == self.clusterName:
+
+ self.rrd_handler = RRDHandler( self.config, d )
+
+ debug_msg( 9, 'Found cluster dir: %s' %( d ) )
def startElement( self, name, attrs ):
"""Memorize appropriate data from xml start tags"""
- global ARCHIVE_DATASOURCES
-
if name == 'GANGLIA_XML':
- self.XMLSource = str( attrs.get( 'SOURCE', "" ) )
- self.gangliaVersion = str( attrs.get( 'VERSION', "" ) )
+ self.XMLSource = attrs.get( 'SOURCE', "" )
+ self.gangliaVersion = attrs.get( 'VERSION', "" )
debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
- elif name == 'GRID':
-
- self.gridName = str( attrs.get( 'NAME', "" ) )
- self.time = str( attrs.get( 'LOCALTIME', "" ) )
+ return 0
+
+ if name == 'GRID':
+
+ self.gridName = attrs.get( 'NAME', "" )
+ self.time = attrs.get( 'LOCALTIME', "" )
debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
- elif name == 'CLUSTER':
-
- self.clusterName = str( attrs.get( 'NAME', "" ) )
- self.time = str( attrs.get( 'LOCALTIME', "" ) )
-
- if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
-
- self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
-
- debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
-
- elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:
-
- self.hostName = str( attrs.get( 'NAME', "" ) )
- self.hostIp = str( attrs.get( 'IP', "" ) )
- self.hostReported = str( attrs.get( 'REPORTED', "" ) )
+ return 0
+
+ if name == 'CLUSTER':
+
+ xmlClusterName = attrs.get( 'NAME', "" )
+ self.time = attrs.get( 'LOCALTIME', "" )
+
+ if self.clusterName == xmlClusterName:
+
+ debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
+
+ self.cluster_start = True
+
+ if not self.rrd_handler:
+
+ self.rrd_handler = RRDHandler( self.config, self.clusterName )
+ else:
+ self.cluster_start = False
+
+ return 0
+
+ if name == 'HOST' and self.cluster_start:
+
+ self.hostName = attrs.get( 'NAME', "" )
+ self.hostIp = attrs.get( 'IP', "" )
+ self.hostReported = attrs.get( 'REPORTED', "" )
debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
- elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
-
- type = str( attrs.get( 'TYPE', "" ) )
+ return 0
+
+ if name == 'METRIC' and self.cluster_start:
+
+ #type = attrs.get( 'TYPE', "" )
+ #orig_name = attrs.get( 'NAME', "" )
- exclude_metric = False
-
- for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
-
- orig_name = str( attrs.get( 'NAME', "" ) )
-
- if string.lower( orig_name ) == string.lower( ex_metricstr ):
-
- exclude_metric = True
-
- elif re.match( ex_metricstr, orig_name ):
-
- exclude_metric = True
-
- if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
-
- myMetric = { }
- myMetric['name'] = str( attrs.get( 'NAME', "" ) )
- myMetric['val'] = str( attrs.get( 'VAL', "" ) )
- myMetric['time'] = self.hostReported
-
- self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
-
- debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
- debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
+ if attrs.get( 'TYPE', "" ) != 'string':
+
+ #myMetric = { }
+ #myMetric['name'] = attrs.get( 'NAME', "" )
+ #myMetric['val'] = attrs.get( 'VAL', "" )
+ #myMetric['time'] = self.hostReported
+
+ self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL', "" ), 'time': self.hostReported } )
+
+ #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
+ #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
+
+
+ def endElement( self, name ):
+
+ if name == 'CLUSTER' and self.cluster_start:
+
+ self.cluster_start = False
+ debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) )
def storeMetrics( self ):
"""Store metrics of each cluster rrd handler"""
- for clustername, rrdh in self.clusters.items():
-
- ret = rrdh.storeMetrics()
-
- if ret:
- debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
- return 1
+ ret = self.rrd_handler.storeMetrics()
+
+ if ret:
+ debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
+ return 1
return 0
@@ -1329,9 +1400,14 @@
"""Setup connection to XML source"""
+ if self.update_now:
+ return 0
+
self.update_now = True
- self.slot.acquire()
+ #self.slot.acquire()
self.data = None
+
+ debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )
for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
@@ -1369,12 +1445,17 @@
my_fp = self.s.makefile( 'r' )
- my_data = my_fp.readlines()
- my_data = string.join( my_data, '' )
-
- self.data = my_data
+ #my_data = my_fp.readlines()
+ #my_data = string.join( my_data, '' )
+
+ #self.data = my_data
+ self.data = my_fp.read()
self.LAST_UPDATE = time.time()
- self.slot.release()
+ self.disconnect()
+
+ #self.slot.release()
+
+ debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." )
self.update_now = False
@@ -1396,15 +1477,24 @@
"""Reconnect"""
- while self.update_now:
+ if self.update_now:
+ return 0
+
+ #while self.update_now:
# Must be another update in progress:
# Wait until the update is complete
#
- time.sleep( 1 )
+ # time.sleep( 1 )
if self.s:
self.disconnect()
- self.retrieveData()
+ cur_time = time.time()
+
+ if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
+
+ if not self.update_now:
+
+ self.retrieveData()
def getData( self ):
@@ -1420,5 +1510,7 @@
if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
- self.reGetData()
+ if not self.update_now:
+
+ self.reGetData()
while self.update_now:
@@ -1450,5 +1542,5 @@
"""Main class for processing XML and acting with it"""
- def __init__( self, XMLSource, DataStore ):
+ def __init__( self, XMLSource, DataStore, cluster ):
"""Setup initial XML connection and handlers"""
@@ -1456,6 +1548,8 @@
self.myXMLSource = XMLSource
self.ds = DataStore
- self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )
self.myXMLError = XMLErrorHandler()
+ self.clusterName = cluster
+ self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )
+
def run( self ):
@@ -1476,5 +1570,5 @@
xml_thread.start()
except thread.error, msg:
- debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
+ debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) )
#return 1
@@ -1488,5 +1582,5 @@
store_thread.start()
except thread.error, msg:
- debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
+ debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) )
#return 1
@@ -1513,22 +1607,22 @@
return 1
- debug_msg( 1, 'ganglia_store_thread(): started.' )
-
- debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
+ debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName )
+
+ debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) )
time.sleep( STORE_INTERVAL )
- debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
+ debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName )
if store_metric_thread.isAlive():
- debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
+ debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName )
store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
if store_metric_thread.isAlive():
- debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' )
+ debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName )
else:
- debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' )
-
- debug_msg( 1, 'ganglia_store_thread(): finished.' )
+ debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName )
+
+ debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName )
return 0
@@ -1537,13 +1631,13 @@
"""Actual metric storing thread"""
- debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
- debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
+ debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName )
+ debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName )
ret = self.myXMLHandler.storeMetrics()
if ret > 0:
- debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
-
- debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
- debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
+ debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )
+
+ debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName )
+ debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName )
return 0
@@ -1556,24 +1650,24 @@
parsethread.start()
except thread.error, msg:
- debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
+ debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) )
return 1
- debug_msg( 1, 'ganglia_xml_thread(): started.' )
-
- debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
+ debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName )
+
+ debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) )
time.sleep( float( self.config.getLowestInterval() ) )
- debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
+ debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName )
if parsethread.isAlive():
- debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
+ debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) )
parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
if parsethread.isAlive():
- debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' )
+ debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName )
else:
- debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' )
-
- debug_msg( 1, 'ganglia_xml_thread(): finished.' )
+ debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName )
+
+ debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName )
return 0
@@ -1582,17 +1676,21 @@
"""Actual parsing thread"""
- debug_msg( 1, 'ganglia_parse_thread(): started.' )
- debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
+
+ debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName )
+ debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName )
my_data = self.myXMLSource.getData()
- debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) )
+ debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) )
if my_data:
- debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
+ debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName )
+
xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
- debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
-
- debug_msg( 1, 'ganglia_parse_thread(): finished.' )
+
+ debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName )
+ #yappi.print_stats()
+
+ debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName )
return 0
@@ -1679,9 +1777,8 @@
"""Class for handling RRD activity"""
-
def __init__( self, config, cluster ):
"""Setup initial variables"""
- global MODRRDTOOL
+ global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS
self.block = 0
@@ -1701,6 +1798,12 @@
global DEBUG_LEVEL
- if DEBUG_LEVEL <= 2:
+ if DEBUG_LEVEL <= 0:
self.gatherLastUpdates()
+
+ self.excludes = [ ]
+
+ for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
+
+ self.excludes.append( re.compile( ex_metricstr ) )
def gatherLastUpdates( self ):
@@ -1762,29 +1865,52 @@
#
#
- self.slot.acquire()
- if self.myMetrics.has_key( host ):
-
- if self.myMetrics[ host ].has_key( metric['name'] ):
-
- for mymetric in self.myMetrics[ host ][ metric['name'] ]:
-
- if mymetric['time'] == metric['time']:
+ #if host in self.myMetrics:
+
+ #if self.myMetrics[ host ].has_key( metric['name'] ):
+
+ # if len( self.myMetrics[ host ][ metric['name'] ] ) > 0:
+
+ # if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
+
+ # return 1
+ #for mymetric in self.myMetrics[ host ][ metric['name'] ]:
+
+ # if mymetric['time'] == metric['time']:
# Allready have this metric, abort
- self.slot.release()
- return 1
- else:
- self.myMetrics[ host ][ metric['name'] ] = [ ]
- else:
- self.myMetrics[ host ] = { }
- self.myMetrics[ host ][ metric['name'] ] = [ ]
+ # return 1
+ #else:
+ #if metric['name'] not in self.myMetrics[ host ]:
+ # self.myMetrics[ host ][ metric['name'] ] = deque()
+ #else:
+ # self.myMetrics[ host ] = { }
+ # self.myMetrics[ host ][ metric['name'] ] = deque()
# Push new metric onto stack
# atomic code; only 1 thread at a time may access the stack
+ #self.slot.acquire()
+
+ try:
+ host_metrics = self.myMetrics[ host ]
+ except KeyError:
+ self.myMetrics[ host ] = { }
+ host_metrics = self.myMetrics[ host ]
+
+ try:
+ metric_values = self.myMetrics[ host ][ metric['name'] ]
+ except KeyError:
+ self.myMetrics[ host ][ metric['name'] ] = deque()
+ metric_values = self.myMetrics[ host ][ metric['name'] ]
+
+ try:
+ if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
+ return 1
+ except (IndexError, KeyError):
+ pass
self.myMetrics[ host ][ metric['name'] ].append( metric )
- self.slot.release()
+ #self.slot.release()
#
#
@@ -1857,5 +1983,5 @@
"""
- debug_msg( 5, "Entering storeMetrics()")
+ debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster )
count_values = 0
@@ -1878,5 +2004,5 @@
count_bytes = count_bits / 8
- debug_msg( 5, "size of cluster '" + self.cluster + "': " +
+ debug_msg( 1, "size of cluster '" + self.cluster + "': " +
str( len( self.myMetrics.keys() ) ) + " hosts " +
str( count_metrics ) + " metrics " + str( count_values ) + " values " +
@@ -1887,4 +2013,11 @@
for metricname, mymetric in mymetrics.items():
+ for e in self.excludes:
+
+ if e.match( metricname ):
+
+ del self.myMetrics[ hostname ][ metricname ]
+ continue
+
metrics_to_store = [ ]
@@ -1894,20 +2027,22 @@
#
#
- self.slot.acquire()
-
- while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
-
- if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
-
- try:
- metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
- except IndexError, msg:
-
- # Somehow sometimes myMetrics[ hostname ][ metricname ]
- # is still len 0 when the statement is executed.
- # Just ignore indexerror's..
- pass
-
- self.slot.release()
+ #self.slot.acquire()
+
+ if metricname in self.myMetrics[ hostname ]:
+
+ while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
+
+ if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
+
+ try:
+ metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() )
+ except IndexError, msg:
+
+ # Somehow sometimes myMetrics[ hostname ][ metricname ]
+ # is still len 0 when the statement is executed.
+ # Just ignore indexerror's..
+ pass
+
+ #self.slot.release()
#
#
@@ -1941,5 +2076,5 @@
self.memLastUpdate( hostname, metricname, metrics_to_store )
- debug_msg( 5, "Leaving storeMetrics()")
+ debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster )
def makeTimeSerial( self ):
@@ -2161,12 +2296,29 @@
myJobProcessor = JobXMLProcessor( myXMLSource, myDataStore )
- myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore )
+
+ myGangliaProcessors= [ ]
+
+ for archive_cluster in ARCHIVE_DATASOURCES:
+
+ myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) )
+
+ ganglia_xml_threads = [ ]
try:
job_xml_thread = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
- ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
+
+ t = 0
+
+ for ganglia_processor in myGangliaProcessors:
+
+ ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) )
+
+ t = t + 1
job_xml_thread.start()
- ganglia_xml_thread.start()
+
+ for t in ganglia_xml_threads:
+
+ t.start()
except thread.error, msg: