Ignore:
Timestamp:
01/20/14 17:04:55 (7 years ago)
Author:
ramonb
Message:

1.1/jobarchived/jobarchived.py:

  • reverted change that was supposed to go in 1.2

1.2/jobarchived/jobarchived.py:

  • see #173
  • changed threading: now each cluster gets it's own ganglia xml/store threads
  • added some yappi profiling functions for debugging
  • better debug statements
  • many performance improvements:
    • now use deque collections in stead of lists for storing metrics: faster appends and pops
    • remove some typecasts
    • replaced xml readlines() with read()
    • XMLDataGatherer now truly caches and prevents unnecessary data retrieval
    • replaced some if statements with catch/excepts: is faster
    • disabled thread locking
    • excluded metrics are now ignored while storing: not while parsing, and matched with compiled regexp
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/1.1/jobarchived/jobarchived.py

    r945 r949  
    2323
    2424import getopt, syslog, ConfigParser, sys
    25 from collections import deque
    26 import time
    27 from pprint import pprint
    28 #import yappi
    29 
    30 #yappi.start()
    31 
    32 try:
    33     from resource import getrusage, RUSAGE_SELF
    34 except ImportError:
    35     RUSAGE_SELF = 0
    36     def getrusage(who=0):
    37         return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0
    38 
    39 p_stats = None
    40 p_start_time = None
    41 
    42 def profiler(frame, event, arg):
    43     if event not in ('call','return'): return profiler
    44     #### gather stats ####
    45     rusage = getrusage(RUSAGE_SELF)
    46     t_cpu = rusage[0] + rusage[1] # user time + system time
    47     code = frame.f_code
    48     fun = (code.co_name, code.co_filename, code.co_firstlineno)
    49     #### get stack with functions entry stats ####
    50     ct = threading.currentThread()
    51     try:
    52         p_stack = ct.p_stack
    53     except AttributeError:
    54         ct.p_stack = deque()
    55         p_stack = ct.p_stack
    56     #### handle call and return ####
    57     if event == 'call':
    58         p_stack.append((time.time(), t_cpu, fun))
    59     elif event == 'return':
    60         try:
    61             t,t_cpu_prev,f = p_stack.pop()
    62             assert f == fun
    63         except IndexError: # TODO investigate
    64             t,t_cpu_prev,f = p_start_time, 0.0, None
    65         call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))
    66         p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)
    67     return profiler
    68 
    69 
    70 def profile_on():
    71     global p_stats, p_start_time
    72     p_stats = {}
    73     p_start_time = time.time()
    74     threading.setprofile(profiler)
    75     sys.setprofile(profiler)
    76 
    77     debug_msg( 1, 'profile_on(): profiling..' )
    78 
    79 def profile_off():
    80     threading.setprofile(None)
    81     sys.setprofile(None)
    82     debug_msg( 1, 'profile_on(): profiling ended..' )
    83 
    84 def get_profile_stats():
    85     """
    86     returns dict[function_tuple] -> stats_tuple
    87     where
    88       function_tuple = (function_name, filename, lineno)
    89       stats_tuple = (call_cnt, real_time, cpu_time)
    90     """
    91     debug_msg( 1, 'get_profile_stats(): dumping stats..' )
    92     return p_stats
    9325
    9426VERSION='__VERSION__'
     
    988920                for j in timedout_jobs:
    989921
    990                     try:
    991                         del self.jobAttrs[ j ]
    992                         del self.jobAttrsSaved[ j ]
    993                     except KeyError:
    994                         pass
     922                    del self.jobAttrs[ j ]
     923                    del self.jobAttrsSaved[ j ]
    995924
    996925        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
     
    1003932        """
    1004933
     934        global ARCHIVE_DATASOURCES
     935
    1005936        jobinfo = { }
    1006937
     
    1042973
    1043974                self.jobs_processed.append( job_id )
    1044 
    1045975                   
    1046976    def endDocument( self ):
     
    12021132    """Parse Ganglia's XML"""
    12031133
    1204     def __init__( self, config, datastore, cluster ):
     1134    def __init__( self, config, datastore ):
    12051135        """Setup initial variables and gather info on existing rrd archive"""
    12061136
    12071137        self.config          = config
    1208         self.clusterName     = cluster
     1138        self.clusters        = { }
    12091139        self.ds              = datastore
    1210         self.rrd_handler     = None
    1211         self.cluster_start   = False
    1212 
    1213         debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName )
    1214         self.gatherCluster()
    1215         debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName )
    1216 
    1217     def gatherCluster( self ):
     1140
     1141        debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' )
     1142        self.gatherClusters()
     1143        debug_msg( 1, 'Housekeeping: RRD check complete.' )
     1144
     1145    def gatherClusters( self ):
    12181146        """Find all existing clusters in archive dir"""
    12191147
     
    12281156            dirlist    = os.listdir( archive_dir )
    12291157
    1230             if self.clusterName not in dirlist:
    1231 
    1232                 # Autocreate a directory for this cluster
    1233                 # assume it is new
    1234                 #
    1235                 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
    1236 
    1237                 os.mkdir( cluster_dir )
    1238 
    1239                 dirlist.append( cfgcluster )
    1240 
    1241             for d in dirlist:
    1242 
    1243                 if not self.rrd_handler and d == self.clusterName:
    1244 
    1245                     self.rrd_handler = RRDHandler( self.config, d )
    1246 
    1247                     debug_msg( 9, 'Found cluster dir: %s' %( d ) )
     1158            for cfgcluster in ARCHIVE_DATASOURCES:
     1159
     1160                if cfgcluster not in dirlist:
     1161
     1162                    # Autocreate a directory for this cluster
     1163                    # assume it is new
     1164                    #
     1165                    cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
     1166
     1167                    os.mkdir( cluster_dir )
     1168
     1169                    dirlist.append( cfgcluster )
     1170
     1171            for item in dirlist:
     1172
     1173                clustername = item
     1174
     1175                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
     1176
     1177                    self.clusters[ clustername ] = RRDHandler( self.config, clustername )
     1178
     1179                    debug_msg( 9, 'Found cluster dir: %s' %( clustername ) )
     1180
     1181        debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" )
    12481182
    12491183    def startElement( self, name, attrs ):
    12501184        """Memorize appropriate data from xml start tags"""
    12511185
     1186        global ARCHIVE_DATASOURCES
     1187
    12521188        if name == 'GANGLIA_XML':
    12531189
    1254             self.XMLSource      = attrs.get( 'SOURCE',  "" )
    1255             self.gangliaVersion = attrs.get( 'VERSION', "" )
     1190            self.XMLSource      = str( attrs.get( 'SOURCE',  "" ) )
     1191            self.gangliaVersion = str( attrs.get( 'VERSION', "" ) )
    12561192
    12571193            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
    12581194
    1259             return 0
    1260 
    1261         if name == 'GRID':
    1262 
    1263             self.gridName    = attrs.get( 'NAME', "" )
    1264             self.time        = attrs.get( 'LOCALTIME', "" )
     1195        elif name == 'GRID':
     1196
     1197            self.gridName    = str( attrs.get( 'NAME', "" ) )
     1198            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
    12651199
    12661200            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
    12671201
    1268             return 0
    1269 
    1270         if name == 'CLUSTER':
    1271 
    1272             xmlClusterName   = attrs.get( 'NAME',      "" )
    1273             self.time        = attrs.get( 'LOCALTIME', "" )
    1274 
    1275             if self.clusterName == xmlClusterName:
    1276 
    1277                 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
    1278 
    1279                 self.cluster_start = True
    1280 
    1281                 if not self.rrd_handler:
    1282 
    1283                     self.rrd_handler = RRDHandler( self.config, self.clusterName )
    1284             else:
    1285                 self.cluster_start = False
    1286 
    1287             return 0
    1288 
    1289         if name == 'HOST' and self.cluster_start:
    1290 
    1291             self.hostName     = attrs.get( 'NAME',     "" )
    1292             self.hostIp       = attrs.get( 'IP',       "" )
    1293             self.hostReported = attrs.get( 'REPORTED', "" )
     1202        elif name == 'CLUSTER':
     1203
     1204            self.clusterName = str( attrs.get( 'NAME',      "" ) )
     1205            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
     1206
     1207            if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
     1208
     1209                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
     1210
     1211            debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
     1212
     1213        elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
     1214
     1215            self.hostName     = str( attrs.get( 'NAME',     "" ) )
     1216            self.hostIp       = str( attrs.get( 'IP',       "" ) )
     1217            self.hostReported = str( attrs.get( 'REPORTED', "" ) )
    12941218
    12951219            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
    12961220
    1297             return 0
    1298 
    1299         if name == 'METRIC' and self.cluster_start:
    1300 
    1301             #type = attrs.get( 'TYPE', "" )
    1302             #orig_name = attrs.get( 'NAME', "" )
     1221        elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
     1222
     1223            type = str( attrs.get( 'TYPE', "" ) )
    13031224           
    1304             if attrs.get( 'TYPE', "" ) != 'string':
    1305 
    1306                 #myMetric         = { }
    1307                 #myMetric['name'] = attrs.get( 'NAME', "" )
    1308                 #myMetric['val']  = attrs.get( 'VAL',  "" )
    1309                 #myMetric['time'] = self.hostReported
    1310 
    1311                 self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL',  "" ), 'time': self.hostReported } )
    1312 
    1313                 #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
    1314                 #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
    1315 
    1316 
    1317     def endElement( self, name ):
    1318 
    1319         if name == 'CLUSTER' and self.cluster_start:
    1320 
    1321             self.cluster_start = False
    1322             debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) )
     1225            exclude_metric = False
     1226           
     1227            for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
     1228
     1229                orig_name = str( attrs.get( 'NAME', "" ) )
     1230
     1231                if string.lower( orig_name ) == string.lower( ex_metricstr ):
     1232               
     1233                    exclude_metric = True
     1234
     1235                elif re.match( ex_metricstr, orig_name ):
     1236
     1237                    exclude_metric = True
     1238
     1239            if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
     1240
     1241                myMetric         = { }
     1242                myMetric['name'] = str( attrs.get( 'NAME', "" ) )
     1243                myMetric['val']  = str( attrs.get( 'VAL',  "" ) )
     1244                myMetric['time'] = self.hostReported
     1245
     1246                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
     1247
     1248                debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
     1249                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
    13231250
    13241251    def storeMetrics( self ):
    13251252        """Store metrics of each cluster rrd handler"""
    13261253
    1327         ret = self.rrd_handler.storeMetrics()
    1328 
    1329         if ret:
    1330             debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
    1331             return 1
     1254        for clustername, rrdh in self.clusters.items():
     1255
     1256            ret = rrdh.storeMetrics()
     1257
     1258            if ret:
     1259                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
     1260                return 1
    13321261
    13331262        return 0
     
    14001329        """Setup connection to XML source"""
    14011330
    1402         if self.update_now:
    1403             return 0
    1404 
    14051331        self.update_now = True
    14061332
    1407         #self.slot.acquire()
     1333        self.slot.acquire()
    14081334
    14091335        self.data       = None
    1410 
    1411         debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )
    14121336
    14131337        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
     
    14451369
    14461370            my_fp            = self.s.makefile( 'r' )
    1447             #my_data          = my_fp.readlines()
    1448             #my_data          = string.join( my_data, '' )
    1449 
    1450             #self.data        = my_data
    1451             self.data        = my_fp.read()
     1371            my_data          = my_fp.readlines()
     1372            my_data          = string.join( my_data, '' )
     1373
     1374            self.data        = my_data
    14521375
    14531376            self.LAST_UPDATE = time.time()
    14541377
    1455             self.disconnect()
    1456 
    1457         #self.slot.release()
    1458 
    1459         debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." )
     1378        self.slot.release()
    14601379
    14611380        self.update_now    = False
     
    14771396        """Reconnect"""
    14781397
    1479         if self.update_now:
    1480             return 0
    1481 
    1482         #while self.update_now:
     1398        while self.update_now:
    14831399
    14841400            # Must be another update in progress:
    14851401            # Wait until the update is complete
    14861402            #
    1487         #    time.sleep( 1 )
     1403            time.sleep( 1 )
    14881404
    14891405        if self.s:
    14901406            self.disconnect()
    14911407
    1492         cur_time    = time.time()
    1493 
    1494         if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
    1495 
    1496             if not self.update_now:
    1497 
    1498                 self.retrieveData()
     1408        self.retrieveData()
    14991409
    15001410    def getData( self ):
     
    15101420        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
    15111421
    1512             if not self.update_now:
    1513 
    1514                 self.reGetData()
     1422            self.reGetData()
    15151423
    15161424        while self.update_now:
     
    15421450    """Main class for processing XML and acting with it"""
    15431451
    1544     def __init__( self, XMLSource, DataStore, cluster ):
     1452    def __init__( self, XMLSource, DataStore ):
    15451453        """Setup initial XML connection and handlers"""
    15461454
     
    15481456        self.myXMLSource  = XMLSource
    15491457        self.ds           = DataStore
     1458        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )
    15501459        self.myXMLError   = XMLErrorHandler()
    1551         self.clusterName  = cluster
    1552         self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )
    1553 
    15541460
    15551461    def run( self ):
     
    15701476                    xml_thread.start()
    15711477                except thread.error, msg:
    1572                     debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) )
     1478                    debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
    15731479                    #return 1
    15741480
     
    15821488                    store_thread.start()
    15831489                except thread.error, msg:
    1584                     debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) )
     1490                    debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
    15851491                    #return 1
    15861492       
     
    16071513            return 1
    16081514
    1609         debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName )
    1610 
    1611         debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) )
     1515        debug_msg( 1, 'ganglia_store_thread(): started.' )
     1516
     1517        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
    16121518        time.sleep( STORE_INTERVAL )
    1613         debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName )
     1519        debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
    16141520
    16151521        if store_metric_thread.isAlive():
    16161522
    1617             debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName )
     1523            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
    16181524            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    16191525
    16201526            if store_metric_thread.isAlive():
    16211527
    1622                 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName )
     1528                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' )
    16231529            else:
    1624                 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName )
    1625 
    1626         debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName )
     1530                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' )
     1531
     1532        debug_msg( 1, 'ganglia_store_thread(): finished.' )
    16271533
    16281534        return 0
     
    16311537        """Actual metric storing thread"""
    16321538
    1633         debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName )
    1634         debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName )
     1539        debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
     1540        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
    16351541
    16361542        ret = self.myXMLHandler.storeMetrics()
    16371543        if ret > 0:
    1638             debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )
    1639 
    1640         debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName )
    1641         debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName )
     1544            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
     1545
     1546        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
     1547        debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
    16421548       
    16431549        return 0
     
    16501556            parsethread.start()
    16511557        except thread.error, msg:
    1652             debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) )
     1558            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
    16531559            return 1
    16541560
    1655         debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName )
    1656 
    1657         debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) )
     1561        debug_msg( 1, 'ganglia_xml_thread(): started.' )
     1562
     1563        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
    16581564        time.sleep( float( self.config.getLowestInterval() ) )   
    1659         debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName )
     1565        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
    16601566
    16611567        if parsethread.isAlive():
    16621568
    1663             debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) )
     1569            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
    16641570            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    16651571
    16661572            if parsethread.isAlive():
    1667                 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName )
     1573                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' )
    16681574            else:
    1669                 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName )
    1670 
    1671         debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName )
     1575                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' )
     1576
     1577        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
    16721578
    16731579        return 0
     
    16761582        """Actual parsing thread"""
    16771583
    1678 
    1679         debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName )
    1680         debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName )
     1584        debug_msg( 1, 'ganglia_parse_thread(): started.' )
     1585        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
    16811586       
    16821587        my_data    = self.myXMLSource.getData()
    16831588
    1684         debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) )
     1589        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) )
    16851590
    16861591        if my_data:
    1687             debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName )
    1688 
     1592            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
    16891593            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
    1690 
    1691             debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName )
    1692             #yappi.print_stats()
    1693 
    1694         debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName )
     1594            debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
     1595
     1596        debug_msg( 1, 'ganglia_parse_thread(): finished.' )
    16951597
    16961598        return 0
     
    17771679    """Class for handling RRD activity"""
    17781680
     1681
    17791682    def __init__( self, config, cluster ):
    17801683        """Setup initial variables"""
    17811684
    1782         global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS
     1685        global MODRRDTOOL
    17831686
    17841687        self.block   = 0
     
    17981701        global DEBUG_LEVEL
    17991702
    1800         if DEBUG_LEVEL <= 0:
     1703        if DEBUG_LEVEL <= 2:
    18011704            self.gatherLastUpdates()
    1802 
    1803         self.excludes        = [ ]
    1804 
    1805         for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
    1806 
    1807             self.excludes.append( re.compile( ex_metricstr ) )
    18081705
    18091706    def gatherLastUpdates( self ):
     
    18651762        # <ATOMIC>
    18661763        #
     1764        self.slot.acquire()
    18671765       
    1868         #if host in self.myMetrics:
    1869 
    1870             #if self.myMetrics[ host ].has_key( metric['name'] ):
    1871 
    1872             #    if len( self.myMetrics[ host ][ metric['name'] ] ) > 0:
    1873      
    1874             #        if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
    1875 
    1876             #            return 1
    1877                 #for mymetric in self.myMetrics[ host ][ metric['name'] ]:
    1878 
    1879                 #    if mymetric['time'] == metric['time']:
     1766        if self.myMetrics.has_key( host ):
     1767
     1768            if self.myMetrics[ host ].has_key( metric['name'] ):
     1769
     1770                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
     1771
     1772                    if mymetric['time'] == metric['time']:
    18801773
    18811774                        # Allready have this metric, abort
    1882                 #        return 1
    1883             #else:
    1884             #if metric['name'] not in self.myMetrics[ host ]:
    1885             #    self.myMetrics[ host ][ metric['name'] ] = deque()
    1886         #else:
    1887         #    self.myMetrics[ host ]                   = { }
    1888         #    self.myMetrics[ host ][ metric['name'] ] = deque()
     1775                        self.slot.release()
     1776                        return 1
     1777            else:
     1778                self.myMetrics[ host ][ metric['name'] ] = [ ]
     1779        else:
     1780            self.myMetrics[ host ]                   = { }
     1781            self.myMetrics[ host ][ metric['name'] ] = [ ]
    18891782
    18901783        # Push new metric onto stack
    18911784        # atomic code; only 1 thread at a time may access the stack
    1892         #self.slot.acquire()
    1893 
    1894         try:
    1895             host_metrics = self.myMetrics[ host ]
    1896         except KeyError:
    1897             self.myMetrics[ host ] = { }
    1898             host_metrics = self.myMetrics[ host ]
    1899 
    1900         try:
    1901             metric_values = self.myMetrics[ host ][ metric['name'] ]
    1902         except KeyError:
    1903             self.myMetrics[ host ][ metric['name'] ] = deque()
    1904             metric_values = self.myMetrics[ host ][ metric['name'] ]
    1905 
    1906         try:
    1907             if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
    1908                 return 1
    1909         except (IndexError, KeyError):
    1910             pass
    19111785
    19121786        self.myMetrics[ host ][ metric['name'] ].append( metric )
    19131787
    1914         #self.slot.release()
     1788        self.slot.release()
    19151789        #
    19161790        # </ATOMIC>
     
    19831857        """
    19841858
    1985         debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster )
     1859        debug_msg( 5, "Entering storeMetrics()")
    19861860
    19871861        count_values  = 0
     
    20041878        count_bytes    = count_bits / 8
    20051879
    2006         debug_msg( 1, "size of cluster '" + self.cluster + "': " +
     1880        debug_msg( 5, "size of cluster '" + self.cluster + "': " +
    20071881            str( len( self.myMetrics.keys() ) ) + " hosts " +
    20081882            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
     
    20131887            for metricname, mymetric in mymetrics.items():
    20141888
    2015                 for e in self.excludes:
    2016 
    2017                     if e.match( metricname ):
    2018 
    2019                         del self.myMetrics[ hostname ][ metricname ]
    2020                         continue
    2021 
    20221889                metrics_to_store = [ ]
    20231890
     
    20271894                # <ATOMIC>
    20281895                #
    2029                 #self.slot.acquire()
    2030 
    2031                 if metricname in self.myMetrics[ hostname ]:
    2032 
    2033                     while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
    2034 
    2035                         if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
    2036 
    2037                             try:
    2038                                 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() )
    2039                             except IndexError, msg:
    2040 
    2041                                 # Somehow sometimes myMetrics[ hostname ][ metricname ]
    2042                                 # is still len 0 when the statement is executed.
    2043                                 # Just ignore indexerror's..
    2044                                 pass
    2045 
    2046                 #self.slot.release()
     1896                self.slot.acquire()
     1897
     1898                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     1899
     1900                    if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
     1901
     1902                        try:
     1903                            metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
     1904                        except IndexError, msg:
     1905
     1906                            # Somehow sometimes myMetrics[ hostname ][ metricname ]
     1907                            # is still len 0 when the statement is executed.
     1908                            # Just ignore indexerror's..
     1909                            pass
     1910
     1911                self.slot.release()
    20471912                #
    20481913                # </ATOMIC>
     
    20761941                self.memLastUpdate( hostname, metricname, metrics_to_store )
    20771942
    2078         debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster )
     1943        debug_msg( 5, "Leaving storeMetrics()")
    20791944
    20801945    def makeTimeSerial( self ):
     
    22962161
    22972162    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
    2298 
    2299     myGangliaProcessors= [ ]
    2300 
    2301     for archive_cluster in ARCHIVE_DATASOURCES:
    2302 
    2303         myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) )
    2304 
    2305     ganglia_xml_threads = [ ]
     2163    myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore )
    23062164
    23072165    try:
    23082166        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
    2309 
    2310         t = 0
    2311 
    2312         for ganglia_processor in myGangliaProcessors:
    2313 
    2314             ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) )
    2315 
    2316             t = t + 1
     2167        ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
    23172168
    23182169        job_xml_thread.start()
    2319 
    2320         for t in ganglia_xml_threads:
    2321 
    2322             t.start()
     2170        ganglia_xml_thread.start()
    23232171       
    23242172    except thread.error, msg:
Note: See TracChangeset for help on using the changeset viewer.