Changeset 949 for branches/1.2


Ignore:
Timestamp:
01/20/14 17:04:55 (10 years ago)
Author:
ramonb
Message:

1.1/jobarchived/jobarchived.py:

  • reverted change that was supposed to go in 1.2

1.2/jobarchived/jobarchived.py:

  • see #173
  • changed threading: now each cluster gets it's own ganglia xml/store threads
  • added some yappi profiling functions for debugging
  • better debug statements
  • many performance improvements:
    • now use deque collections in stead of lists for storing metrics: faster appends and pops
    • remove some typecasts
    • replaced xml readlines() with read()
    • XMLDataGatherer now truly caches and prevents unnecessary data retrieval
    • replaced some if statements with catch/excepts: is faster
    • disabled thread locking
    • excluded metrics are now ignored while storing: not while parsing, and matched with compiled regexp
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/1.2/jobarchived/jobarchived.py

    r930 r949  
    2323
    2424import getopt, syslog, ConfigParser, sys
     25from collections import deque
     26import time
     27from pprint import pprint
     28#import yappi
     29
     30#yappi.start()
     31
     32try:
     33    from resource import getrusage, RUSAGE_SELF
     34except ImportError:
     35    RUSAGE_SELF = 0
     36    def getrusage(who=0):
     37        return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0
     38
     39p_stats = None
     40p_start_time = None
     41
     42def profiler(frame, event, arg):
     43    if event not in ('call','return'): return profiler
     44    #### gather stats ####
     45    rusage = getrusage(RUSAGE_SELF)
     46    t_cpu = rusage[0] + rusage[1] # user time + system time
     47    code = frame.f_code
     48    fun = (code.co_name, code.co_filename, code.co_firstlineno)
     49    #### get stack with functions entry stats ####
     50    ct = threading.currentThread()
     51    try:
     52        p_stack = ct.p_stack
     53    except AttributeError:
     54        ct.p_stack = deque()
     55        p_stack = ct.p_stack
     56    #### handle call and return ####
     57    if event == 'call':
     58        p_stack.append((time.time(), t_cpu, fun))
     59    elif event == 'return':
     60        try:
     61            t,t_cpu_prev,f = p_stack.pop()
     62            assert f == fun
     63        except IndexError: # TODO investigate
     64            t,t_cpu_prev,f = p_start_time, 0.0, None
     65        call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))
     66        p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)
     67    return profiler
     68
     69
     70def profile_on():
     71    global p_stats, p_start_time
     72    p_stats = {}
     73    p_start_time = time.time()
     74    threading.setprofile(profiler)
     75    sys.setprofile(profiler)
     76
     77    debug_msg( 1, 'profile_on(): profiling..' )
     78
     79def profile_off():
     80    threading.setprofile(None)
     81    sys.setprofile(None)
     82    debug_msg( 1, 'profile_on(): profiling ended..' )
     83
     84def get_profile_stats():
     85    """
     86    returns dict[function_tuple] -> stats_tuple
     87    where
     88      function_tuple = (function_name, filename, lineno)
     89      stats_tuple = (call_cnt, real_time, cpu_time)
     90    """
     91    debug_msg( 1, 'get_profile_stats(): dumping stats..' )
     92    return p_stats
    2593
    2694VERSION='__VERSION__'
     
    920988                for j in timedout_jobs:
    921989
    922                     del self.jobAttrs[ j ]
    923                     del self.jobAttrsSaved[ j ]
     990                    try:
     991                        del self.jobAttrs[ j ]
     992                        del self.jobAttrsSaved[ j ]
     993                    except KeyError:
     994                        pass
    924995
    925996        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
     
    9321003        """
    9331004
    934         global ARCHIVE_DATASOURCES
    935 
    9361005        jobinfo = { }
    9371006
     
    9731042
    9741043                self.jobs_processed.append( job_id )
     1044
    9751045                   
    9761046    def endDocument( self ):
     
    11321202    """Parse Ganglia's XML"""
    11331203
    1134     def __init__( self, config, datastore ):
     1204    def __init__( self, config, datastore, cluster ):
    11351205        """Setup initial variables and gather info on existing rrd archive"""
    11361206
    11371207        self.config          = config
    1138         self.clusters        = { }
     1208        self.clusterName     = cluster
    11391209        self.ds              = datastore
    1140 
    1141         debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' )
    1142         self.gatherClusters()
    1143         debug_msg( 1, 'Housekeeping: RRD check complete.' )
    1144 
    1145     def gatherClusters( self ):
     1210        self.rrd_handler     = None
     1211        self.cluster_start   = False
     1212
     1213        debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName )
     1214        self.gatherCluster()
     1215        debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName )
     1216
     1217    def gatherCluster( self ):
    11461218        """Find all existing clusters in archive dir"""
    11471219
     
    11561228            dirlist    = os.listdir( archive_dir )
    11571229
    1158             for cfgcluster in ARCHIVE_DATASOURCES:
    1159 
    1160                 if cfgcluster not in dirlist:
    1161 
    1162                     # Autocreate a directory for this cluster
    1163                     # assume it is new
    1164                     #
    1165                     cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
    1166 
    1167                     os.mkdir( cluster_dir )
    1168 
    1169                     dirlist.append( cfgcluster )
    1170 
    1171             for item in dirlist:
    1172 
    1173                 clustername = item
    1174 
    1175                 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
    1176 
    1177                     self.clusters[ clustername ] = RRDHandler( self.config, clustername )
    1178 
    1179                     debug_msg( 9, 'Found cluster dir: %s' %( clustername ) )
    1180 
    1181         debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" )
     1230            if self.clusterName not in dirlist:
     1231
     1232                # Autocreate a directory for this cluster
     1233                # assume it is new
     1234                #
     1235                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
     1236
     1237                os.mkdir( cluster_dir )
     1238
     1239                dirlist.append( cfgcluster )
     1240
     1241            for d in dirlist:
     1242
     1243                if not self.rrd_handler and d == self.clusterName:
     1244
     1245                    self.rrd_handler = RRDHandler( self.config, d )
     1246
     1247                    debug_msg( 9, 'Found cluster dir: %s' %( d ) )
    11821248
    11831249    def startElement( self, name, attrs ):
    11841250        """Memorize appropriate data from xml start tags"""
    11851251
    1186         global ARCHIVE_DATASOURCES
    1187 
    11881252        if name == 'GANGLIA_XML':
    11891253
    1190             self.XMLSource      = str( attrs.get( 'SOURCE',  "" ) )
    1191             self.gangliaVersion = str( attrs.get( 'VERSION', "" ) )
     1254            self.XMLSource      = attrs.get( 'SOURCE',  "" )
     1255            self.gangliaVersion = attrs.get( 'VERSION', "" )
    11921256
    11931257            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
    11941258
    1195         elif name == 'GRID':
    1196 
    1197             self.gridName    = str( attrs.get( 'NAME', "" ) )
    1198             self.time        = str( attrs.get( 'LOCALTIME', "" ) )
     1259            return 0
     1260
     1261        if name == 'GRID':
     1262
     1263            self.gridName    = attrs.get( 'NAME', "" )
     1264            self.time        = attrs.get( 'LOCALTIME', "" )
    11991265
    12001266            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
    12011267
    1202         elif name == 'CLUSTER':
    1203 
    1204             self.clusterName = str( attrs.get( 'NAME',      "" ) )
    1205             self.time        = str( attrs.get( 'LOCALTIME', "" ) )
    1206 
    1207             if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
    1208 
    1209                 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
    1210 
    1211             debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
    1212 
    1213         elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
    1214 
    1215             self.hostName     = str( attrs.get( 'NAME',     "" ) )
    1216             self.hostIp       = str( attrs.get( 'IP',       "" ) )
    1217             self.hostReported = str( attrs.get( 'REPORTED', "" ) )
     1268            return 0
     1269
     1270        if name == 'CLUSTER':
     1271
     1272            xmlClusterName   = attrs.get( 'NAME',      "" )
     1273            self.time        = attrs.get( 'LOCALTIME', "" )
     1274
     1275            if self.clusterName == xmlClusterName:
     1276
     1277                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
     1278
     1279                self.cluster_start = True
     1280
     1281                if not self.rrd_handler:
     1282
     1283                    self.rrd_handler = RRDHandler( self.config, self.clusterName )
     1284            else:
     1285                self.cluster_start = False
     1286
     1287            return 0
     1288
     1289        if name == 'HOST' and self.cluster_start:
     1290
     1291            self.hostName     = attrs.get( 'NAME',     "" )
     1292            self.hostIp       = attrs.get( 'IP',       "" )
     1293            self.hostReported = attrs.get( 'REPORTED', "" )
    12181294
    12191295            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
    12201296
    1221         elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
    1222 
    1223             type = str( attrs.get( 'TYPE', "" ) )
     1297            return 0
     1298
     1299        if name == 'METRIC' and self.cluster_start:
     1300
     1301            #type = attrs.get( 'TYPE', "" )
     1302            #orig_name = attrs.get( 'NAME', "" )
    12241303           
    1225             exclude_metric = False
    1226            
    1227             for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
    1228 
    1229                 orig_name = str( attrs.get( 'NAME', "" ) )
    1230 
    1231                 if string.lower( orig_name ) == string.lower( ex_metricstr ):
    1232                
    1233                     exclude_metric = True
    1234 
    1235                 elif re.match( ex_metricstr, orig_name ):
    1236 
    1237                     exclude_metric = True
    1238 
    1239             if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
    1240 
    1241                 myMetric         = { }
    1242                 myMetric['name'] = str( attrs.get( 'NAME', "" ) )
    1243                 myMetric['val']  = str( attrs.get( 'VAL',  "" ) )
    1244                 myMetric['time'] = self.hostReported
    1245 
    1246                 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
    1247 
    1248                 debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
    1249                 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
     1304            if attrs.get( 'TYPE', "" ) != 'string':
     1305
     1306                #myMetric         = { }
     1307                #myMetric['name'] = attrs.get( 'NAME', "" )
     1308                #myMetric['val']  = attrs.get( 'VAL',  "" )
     1309                #myMetric['time'] = self.hostReported
     1310
     1311                self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL',  "" ), 'time': self.hostReported } )
     1312
     1313                #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
     1314                #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
     1315
     1316
     1317    def endElement( self, name ):
     1318
     1319        if name == 'CLUSTER' and self.cluster_start:
     1320
     1321            self.cluster_start = False
     1322            debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) )
    12501323
    12511324    def storeMetrics( self ):
    12521325        """Store metrics of each cluster rrd handler"""
    12531326
    1254         for clustername, rrdh in self.clusters.items():
    1255 
    1256             ret = rrdh.storeMetrics()
    1257 
    1258             if ret:
    1259                 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
    1260                 return 1
     1327        ret = self.rrd_handler.storeMetrics()
     1328
     1329        if ret:
     1330            debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
     1331            return 1
    12611332
    12621333        return 0
     
    13291400        """Setup connection to XML source"""
    13301401
     1402        if self.update_now:
     1403            return 0
     1404
    13311405        self.update_now = True
    13321406
    1333         self.slot.acquire()
     1407        #self.slot.acquire()
    13341408
    13351409        self.data       = None
     1410
     1411        debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )
    13361412
    13371413        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
     
    13691445
    13701446            my_fp            = self.s.makefile( 'r' )
    1371             my_data          = my_fp.readlines()
    1372             my_data          = string.join( my_data, '' )
    1373 
    1374             self.data        = my_data
     1447            #my_data          = my_fp.readlines()
     1448            #my_data          = string.join( my_data, '' )
     1449
     1450            #self.data        = my_data
     1451            self.data        = my_fp.read()
    13751452
    13761453            self.LAST_UPDATE = time.time()
    13771454
    1378         self.slot.release()
     1455            self.disconnect()
     1456
     1457        #self.slot.release()
     1458
     1459        debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." )
    13791460
    13801461        self.update_now    = False
     
    13961477        """Reconnect"""
    13971478
    1398         while self.update_now:
     1479        if self.update_now:
     1480            return 0
     1481
     1482        #while self.update_now:
    13991483
    14001484            # Must be another update in progress:
    14011485            # Wait until the update is complete
    14021486            #
    1403             time.sleep( 1 )
     1487        #    time.sleep( 1 )
    14041488
    14051489        if self.s:
    14061490            self.disconnect()
    14071491
    1408         self.retrieveData()
     1492        cur_time    = time.time()
     1493
     1494        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
     1495
     1496            if not self.update_now:
     1497
     1498                self.retrieveData()
    14091499
    14101500    def getData( self ):
     
    14201510        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
    14211511
    1422             self.reGetData()
     1512            if not self.update_now:
     1513
     1514                self.reGetData()
    14231515
    14241516        while self.update_now:
     
    14501542    """Main class for processing XML and acting with it"""
    14511543
    1452     def __init__( self, XMLSource, DataStore ):
     1544    def __init__( self, XMLSource, DataStore, cluster ):
    14531545        """Setup initial XML connection and handlers"""
    14541546
     
    14561548        self.myXMLSource  = XMLSource
    14571549        self.ds           = DataStore
    1458         self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )
    14591550        self.myXMLError   = XMLErrorHandler()
     1551        self.clusterName  = cluster
     1552        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )
     1553
    14601554
    14611555    def run( self ):
     
    14761570                    xml_thread.start()
    14771571                except thread.error, msg:
    1478                     debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
     1572                    debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) )
    14791573                    #return 1
    14801574
     
    14881582                    store_thread.start()
    14891583                except thread.error, msg:
    1490                     debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
     1584                    debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) )
    14911585                    #return 1
    14921586       
     
    15131607            return 1
    15141608
    1515         debug_msg( 1, 'ganglia_store_thread(): started.' )
    1516 
    1517         debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
     1609        debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName )
     1610
     1611        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) )
    15181612        time.sleep( STORE_INTERVAL )
    1519         debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
     1613        debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName )
    15201614
    15211615        if store_metric_thread.isAlive():
    15221616
    1523             debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
     1617            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName )
    15241618            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    15251619
    15261620            if store_metric_thread.isAlive():
    15271621
    1528                 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' )
     1622                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName )
    15291623            else:
    1530                 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' )
    1531 
    1532         debug_msg( 1, 'ganglia_store_thread(): finished.' )
     1624                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName )
     1625
     1626        debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName )
    15331627
    15341628        return 0
     
    15371631        """Actual metric storing thread"""
    15381632
    1539         debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
    1540         debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
     1633        debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName )
     1634        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName )
    15411635
    15421636        ret = self.myXMLHandler.storeMetrics()
    15431637        if ret > 0:
    1544             debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
    1545 
    1546         debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
    1547         debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
     1638            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )
     1639
     1640        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName )
     1641        debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName )
    15481642       
    15491643        return 0
     
    15561650            parsethread.start()
    15571651        except thread.error, msg:
    1558             debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
     1652            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) )
    15591653            return 1
    15601654
    1561         debug_msg( 1, 'ganglia_xml_thread(): started.' )
    1562 
    1563         debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
     1655        debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName )
     1656
     1657        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) )
    15641658        time.sleep( float( self.config.getLowestInterval() ) )   
    1565         debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
     1659        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName )
    15661660
    15671661        if parsethread.isAlive():
    15681662
    1569             debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
     1663            debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) )
    15701664            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    15711665
    15721666            if parsethread.isAlive():
    1573                 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' )
     1667                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName )
    15741668            else:
    1575                 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' )
    1576 
    1577         debug_msg( 1, 'ganglia_xml_thread(): finished.' )
     1669                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName )
     1670
     1671        debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName )
    15781672
    15791673        return 0
     
    15821676        """Actual parsing thread"""
    15831677
    1584         debug_msg( 1, 'ganglia_parse_thread(): started.' )
    1585         debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
     1678
     1679        debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName )
     1680        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName )
    15861681       
    15871682        my_data    = self.myXMLSource.getData()
    15881683
    1589         debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) )
     1684        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) )
    15901685
    15911686        if my_data:
    1592             debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
     1687            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName )
     1688
    15931689            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    1594             debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
    1595 
    1596         debug_msg( 1, 'ganglia_parse_thread(): finished.' )
     1690
     1691            debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName )
     1692            #yappi.print_stats()
     1693
     1694        debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName )
    15971695
    15981696        return 0
     
    16791777    """Class for handling RRD activity"""
    16801778
    1681 
    16821779    def __init__( self, config, cluster ):
    16831780        """Setup initial variables"""
    16841781
    1685         global MODRRDTOOL
     1782        global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS
    16861783
    16871784        self.block   = 0
     
    17011798        global DEBUG_LEVEL
    17021799
    1703         if DEBUG_LEVEL <= 2:
     1800        if DEBUG_LEVEL <= 0:
    17041801            self.gatherLastUpdates()
     1802
     1803        self.excludes        = [ ]
     1804
     1805        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
     1806
     1807            self.excludes.append( re.compile( ex_metricstr ) )
    17051808
    17061809    def gatherLastUpdates( self ):
     
    17621865        # <ATOMIC>
    17631866        #
    1764         self.slot.acquire()
    17651867       
    1766         if self.myMetrics.has_key( host ):
    1767 
    1768             if self.myMetrics[ host ].has_key( metric['name'] ):
    1769 
    1770                 for mymetric in self.myMetrics[ host ][ metric['name'] ]:
    1771 
    1772                     if mymetric['time'] == metric['time']:
     1868        #if host in self.myMetrics:
     1869
     1870            #if self.myMetrics[ host ].has_key( metric['name'] ):
     1871
     1872            #    if len( self.myMetrics[ host ][ metric['name'] ] ) > 0:
     1873     
     1874            #        if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
     1875
     1876            #            return 1
     1877                #for mymetric in self.myMetrics[ host ][ metric['name'] ]:
     1878
     1879                #    if mymetric['time'] == metric['time']:
    17731880
    17741881                        # Allready have this metric, abort
    1775                         self.slot.release()
    1776                         return 1
    1777             else:
    1778                 self.myMetrics[ host ][ metric['name'] ] = [ ]
    1779         else:
    1780             self.myMetrics[ host ]                   = { }
    1781             self.myMetrics[ host ][ metric['name'] ] = [ ]
     1882                #        return 1
     1883            #else:
     1884            #if metric['name'] not in self.myMetrics[ host ]:
     1885            #    self.myMetrics[ host ][ metric['name'] ] = deque()
     1886        #else:
     1887        #    self.myMetrics[ host ]                   = { }
     1888        #    self.myMetrics[ host ][ metric['name'] ] = deque()
    17821889
    17831890        # Push new metric onto stack
    17841891        # atomic code; only 1 thread at a time may access the stack
     1892        #self.slot.acquire()
     1893
     1894        try:
     1895            host_metrics = self.myMetrics[ host ]
     1896        except KeyError:
     1897            self.myMetrics[ host ] = { }
     1898            host_metrics = self.myMetrics[ host ]
     1899
     1900        try:
     1901            metric_values = self.myMetrics[ host ][ metric['name'] ]
     1902        except KeyError:
     1903            self.myMetrics[ host ][ metric['name'] ] = deque()
     1904            metric_values = self.myMetrics[ host ][ metric['name'] ]
     1905
     1906        try:
     1907            if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
     1908                return 1
     1909        except (IndexError, KeyError):
     1910            pass
    17851911
    17861912        self.myMetrics[ host ][ metric['name'] ].append( metric )
    17871913
    1788         self.slot.release()
     1914        #self.slot.release()
    17891915        #
    17901916        # </ATOMIC>
     
    18571983        """
    18581984
    1859         debug_msg( 5, "Entering storeMetrics()")
     1985        debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster )
    18601986
    18611987        count_values  = 0
     
    18782004        count_bytes    = count_bits / 8
    18792005
    1880         debug_msg( 5, "size of cluster '" + self.cluster + "': " +
     2006        debug_msg( 1, "size of cluster '" + self.cluster + "': " +
    18812007            str( len( self.myMetrics.keys() ) ) + " hosts " +
    18822008            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
     
    18872013            for metricname, mymetric in mymetrics.items():
    18882014
     2015                for e in self.excludes:
     2016
     2017                    if e.match( metricname ):
     2018
     2019                        del self.myMetrics[ hostname ][ metricname ]
     2020                        continue
     2021
    18892022                metrics_to_store = [ ]
    18902023
     
    18942027                # <ATOMIC>
    18952028                #
    1896                 self.slot.acquire()
    1897 
    1898                 while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
    1899 
    1900                     if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
    1901 
    1902                         try:
    1903                             metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
    1904                         except IndexError, msg:
    1905 
    1906                             # Somehow sometimes myMetrics[ hostname ][ metricname ]
    1907                             # is still len 0 when the statement is executed.
    1908                             # Just ignore indexerror's..
    1909                             pass
    1910 
    1911                 self.slot.release()
     2029                #self.slot.acquire()
     2030
     2031                if metricname in self.myMetrics[ hostname ]:
     2032
     2033                    while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     2034
     2035                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     2036
     2037                            try:
     2038                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() )
     2039                            except IndexError, msg:
     2040
     2041                                # Somehow sometimes myMetrics[ hostname ][ metricname ]
     2042                                # is still len 0 when the statement is executed.
     2043                                # Just ignore indexerror's..
     2044                                pass
     2045
     2046                #self.slot.release()
    19122047                #
    19132048                # </ATOMIC>
     
    19412076                self.memLastUpdate( hostname, metricname, metrics_to_store )
    19422077
    1943         debug_msg( 5, "Leaving storeMetrics()")
     2078        debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster )
    19442079
    19452080    def makeTimeSerial( self ):
     
    21612296
    21622297    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
    2163     myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore )
     2298
     2299    myGangliaProcessors= [ ]
     2300
     2301    for archive_cluster in ARCHIVE_DATASOURCES:
     2302
     2303        myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) )
     2304
     2305    ganglia_xml_threads = [ ]
    21642306
    21652307    try:
    21662308        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
    2167         ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
     2309
     2310        t = 0
     2311
     2312        for ganglia_processor in myGangliaProcessors:
     2313
     2314            ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) )
     2315
     2316            t = t + 1
    21682317
    21692318        job_xml_thread.start()
    2170         ganglia_xml_thread.start()
     2319
     2320        for t in ganglia_xml_threads:
     2321
     2322            t.start()
    21712323       
    21722324    except thread.error, msg:
Note: See TracChangeset for help on using the changeset viewer.