Changeset 949 for branches/1.1/jobarchived/jobarchived.py
- Timestamp:
- 01/20/14 17:04:55 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.1/jobarchived/jobarchived.py
r945 r949 23 23 24 24 import getopt, syslog, ConfigParser, sys 25 from collections import deque26 import time27 from pprint import pprint28 #import yappi29 30 #yappi.start()31 32 try:33 from resource import getrusage, RUSAGE_SELF34 except ImportError:35 RUSAGE_SELF = 036 def getrusage(who=0):37 return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.038 39 p_stats = None40 p_start_time = None41 42 def profiler(frame, event, arg):43 if event not in ('call','return'): return profiler44 #### gather stats ####45 rusage = getrusage(RUSAGE_SELF)46 t_cpu = rusage[0] + rusage[1] # user time + system time47 code = frame.f_code48 fun = (code.co_name, code.co_filename, code.co_firstlineno)49 #### get stack with functions entry stats ####50 ct = threading.currentThread()51 try:52 p_stack = ct.p_stack53 except AttributeError:54 ct.p_stack = deque()55 p_stack = ct.p_stack56 #### handle call and return ####57 if event == 'call':58 p_stack.append((time.time(), t_cpu, fun))59 elif event == 'return':60 try:61 t,t_cpu_prev,f = p_stack.pop()62 assert f == fun63 except IndexError: # TODO investigate64 t,t_cpu_prev,f = p_start_time, 0.0, None65 call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))66 p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)67 return profiler68 69 70 def profile_on():71 global p_stats, p_start_time72 p_stats = {}73 p_start_time = time.time()74 threading.setprofile(profiler)75 sys.setprofile(profiler)76 77 debug_msg( 1, 'profile_on(): profiling..' )78 79 def profile_off():80 threading.setprofile(None)81 sys.setprofile(None)82 debug_msg( 1, 'profile_on(): profiling ended..' )83 84 def get_profile_stats():85 """86 returns dict[function_tuple] -> stats_tuple87 where88 function_tuple = (function_name, filename, lineno)89 stats_tuple = (call_cnt, real_time, cpu_time)90 """91 debug_msg( 1, 'get_profile_stats(): dumping stats..' )92 return p_stats93 25 94 26 VERSION='__VERSION__' … … 988 920 for j in timedout_jobs: 989 921 990 try: 991 del self.jobAttrs[ j ] 992 del self.jobAttrsSaved[ j ] 993 except KeyError: 994 pass 922 del self.jobAttrs[ j ] 923 del self.jobAttrsSaved[ j ] 995 924 996 925 debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) ) … … 1003 932 """ 1004 933 934 global ARCHIVE_DATASOURCES 935 1005 936 jobinfo = { } 1006 937 … … 1042 973 1043 974 self.jobs_processed.append( job_id ) 1044 1045 975 1046 976 def endDocument( self ): … … 1202 1132 """Parse Ganglia's XML""" 1203 1133 1204 def __init__( self, config, datastore , cluster):1134 def __init__( self, config, datastore ): 1205 1135 """Setup initial variables and gather info on existing rrd archive""" 1206 1136 1207 1137 self.config = config 1208 self.cluster Name = cluster1138 self.clusters = { } 1209 1139 self.ds = datastore 1210 self.rrd_handler = None 1211 self.cluster_start = False 1212 1213 debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName ) 1214 self.gatherCluster() 1215 debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName ) 1216 1217 def gatherCluster( self ): 1140 1141 debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' ) 1142 self.gatherClusters() 1143 debug_msg( 1, 'Housekeeping: RRD check complete.' ) 1144 1145 def gatherClusters( self ): 1218 1146 """Find all existing clusters in archive dir""" 1219 1147 … … 1228 1156 dirlist = os.listdir( archive_dir ) 1229 1157 1230 if self.clusterName not in dirlist: 1231 1232 # Autocreate a directory for this cluster 1233 # assume it is new 1234 # 1235 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1236 1237 os.mkdir( cluster_dir ) 1238 1239 dirlist.append( cfgcluster ) 1240 1241 for d in dirlist: 1242 1243 if not self.rrd_handler and d == self.clusterName: 1244 1245 self.rrd_handler = RRDHandler( self.config, d ) 1246 1247 debug_msg( 9, 'Found cluster dir: %s' %( d ) ) 1158 for cfgcluster in ARCHIVE_DATASOURCES: 1159 1160 if cfgcluster not in dirlist: 1161 1162 # Autocreate a directory for this cluster 1163 # assume it is new 1164 # 1165 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1166 1167 os.mkdir( cluster_dir ) 1168 1169 dirlist.append( cfgcluster ) 1170 1171 for item in dirlist: 1172 1173 clustername = item 1174 1175 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES: 1176 1177 self.clusters[ clustername ] = RRDHandler( self.config, clustername ) 1178 1179 debug_msg( 9, 'Found cluster dir: %s' %( clustername ) ) 1180 1181 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" ) 1248 1182 1249 1183 def startElement( self, name, attrs ): 1250 1184 """Memorize appropriate data from xml start tags""" 1251 1185 1186 global ARCHIVE_DATASOURCES 1187 1252 1188 if name == 'GANGLIA_XML': 1253 1189 1254 self.XMLSource = attrs.get( 'SOURCE', "")1255 self.gangliaVersion = attrs.get( 'VERSION', "")1190 self.XMLSource = str( attrs.get( 'SOURCE', "" ) ) 1191 self.gangliaVersion = str( attrs.get( 'VERSION', "" ) ) 1256 1192 1257 1193 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) 1258 1194 1259 return 0 1260 1261 if name == 'GRID': 1262 1263 self.gridName = attrs.get( 'NAME', "" ) 1264 self.time = attrs.get( 'LOCALTIME', "" ) 1195 elif name == 'GRID': 1196 1197 self.gridName = str( attrs.get( 'NAME', "" ) ) 1198 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1265 1199 1266 1200 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) 1267 1201 1268 return 0 1269 1270 if name == 'CLUSTER': 1271 1272 xmlClusterName = attrs.get( 'NAME', "" ) 1273 self.time = attrs.get( 'LOCALTIME', "" ) 1274 1275 if self.clusterName == xmlClusterName: 1276 1277 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1278 1279 self.cluster_start = True 1280 1281 if not self.rrd_handler: 1282 1283 self.rrd_handler = RRDHandler( self.config, self.clusterName ) 1284 else: 1285 self.cluster_start = False 1286 1287 return 0 1288 1289 if name == 'HOST' and self.cluster_start: 1290 1291 self.hostName = attrs.get( 'NAME', "" ) 1292 self.hostIp = attrs.get( 'IP', "" ) 1293 self.hostReported = attrs.get( 'REPORTED', "" ) 1202 elif name == 'CLUSTER': 1203 1204 self.clusterName = str( attrs.get( 'NAME', "" ) ) 1205 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1206 1207 if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES: 1208 1209 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName ) 1210 1211 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1212 1213 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 1214 1215 self.hostName = str( attrs.get( 'NAME', "" ) ) 1216 self.hostIp = str( attrs.get( 'IP', "" ) ) 1217 self.hostReported = str( attrs.get( 'REPORTED', "" ) ) 1294 1218 1295 1219 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) 1296 1220 1297 return 0 1298 1299 if name == 'METRIC' and self.cluster_start: 1300 1301 #type = attrs.get( 'TYPE', "" ) 1302 #orig_name = attrs.get( 'NAME', "" ) 1221 elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES: 1222 1223 type = str( attrs.get( 'TYPE', "" ) ) 1303 1224 1304 if attrs.get( 'TYPE', "" ) != 'string': 1305 1306 #myMetric = { } 1307 #myMetric['name'] = attrs.get( 'NAME', "" ) 1308 #myMetric['val'] = attrs.get( 'VAL', "" ) 1309 #myMetric['time'] = self.hostReported 1310 1311 self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL', "" ), 'time': self.hostReported } ) 1312 1313 #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1314 #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1315 1316 1317 def endElement( self, name ): 1318 1319 if name == 'CLUSTER' and self.cluster_start: 1320 1321 self.cluster_start = False 1322 debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) ) 1225 exclude_metric = False 1226 1227 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1228 1229 orig_name = str( attrs.get( 'NAME', "" ) ) 1230 1231 if string.lower( orig_name ) == string.lower( ex_metricstr ): 1232 1233 exclude_metric = True 1234 1235 elif re.match( ex_metricstr, orig_name ): 1236 1237 exclude_metric = True 1238 1239 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 1240 1241 myMetric = { } 1242 myMetric['name'] = str( attrs.get( 'NAME', "" ) ) 1243 myMetric['val'] = str( attrs.get( 'VAL', "" ) ) 1244 myMetric['time'] = self.hostReported 1245 1246 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) 1247 1248 debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1249 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1323 1250 1324 1251 def storeMetrics( self ): 1325 1252 """Store metrics of each cluster rrd handler""" 1326 1253 1327 ret = self.rrd_handler.storeMetrics() 1328 1329 if ret: 1330 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1331 return 1 1254 for clustername, rrdh in self.clusters.items(): 1255 1256 ret = rrdh.storeMetrics() 1257 1258 if ret: 1259 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1260 return 1 1332 1261 1333 1262 return 0 … … 1400 1329 """Setup connection to XML source""" 1401 1330 1402 if self.update_now:1403 return 01404 1405 1331 self.update_now = True 1406 1332 1407 #self.slot.acquire()1333 self.slot.acquire() 1408 1334 1409 1335 self.data = None 1410 1411 debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )1412 1336 1413 1337 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 1445 1369 1446 1370 my_fp = self.s.makefile( 'r' ) 1447 #my_data = my_fp.readlines() 1448 #my_data = string.join( my_data, '' ) 1449 1450 #self.data = my_data 1451 self.data = my_fp.read() 1371 my_data = my_fp.readlines() 1372 my_data = string.join( my_data, '' ) 1373 1374 self.data = my_data 1452 1375 1453 1376 self.LAST_UPDATE = time.time() 1454 1377 1455 self.disconnect() 1456 1457 #self.slot.release() 1458 1459 debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." ) 1378 self.slot.release() 1460 1379 1461 1380 self.update_now = False … … 1477 1396 """Reconnect""" 1478 1397 1479 if self.update_now: 1480 return 0 1481 1482 #while self.update_now: 1398 while self.update_now: 1483 1399 1484 1400 # Must be another update in progress: 1485 1401 # Wait until the update is complete 1486 1402 # 1487 #time.sleep( 1 )1403 time.sleep( 1 ) 1488 1404 1489 1405 if self.s: 1490 1406 self.disconnect() 1491 1407 1492 cur_time = time.time() 1493 1494 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1495 1496 if not self.update_now: 1497 1498 self.retrieveData() 1408 self.retrieveData() 1499 1409 1500 1410 def getData( self ): … … 1510 1420 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1511 1421 1512 if not self.update_now: 1513 1514 self.reGetData() 1422 self.reGetData() 1515 1423 1516 1424 while self.update_now: … … 1542 1450 """Main class for processing XML and acting with it""" 1543 1451 1544 def __init__( self, XMLSource, DataStore , cluster):1452 def __init__( self, XMLSource, DataStore ): 1545 1453 """Setup initial XML connection and handlers""" 1546 1454 … … 1548 1456 self.myXMLSource = XMLSource 1549 1457 self.ds = DataStore 1458 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds ) 1550 1459 self.myXMLError = XMLErrorHandler() 1551 self.clusterName = cluster1552 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )1553 1554 1460 1555 1461 def run( self ): … … 1570 1476 xml_thread.start() 1571 1477 except thread.error, msg: 1572 debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)))1478 debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg)) 1573 1479 #return 1 1574 1480 … … 1582 1488 store_thread.start() 1583 1489 except thread.error, msg: 1584 debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)))1490 debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg)) 1585 1491 #return 1 1586 1492 … … 1607 1513 return 1 1608 1514 1609 debug_msg( 1, 'ganglia_store_thread(): started : cluster %s' %self.clusterName)1610 1611 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss) : cluster %s' %(STORE_INTERVAL, self.clusterName ))1515 debug_msg( 1, 'ganglia_store_thread(): started.' ) 1516 1517 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 1612 1518 time.sleep( STORE_INTERVAL ) 1613 debug_msg( 1, 'ganglia_store_thread(): Done sleeping : cluster %s' %self.clusterName)1519 debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' ) 1614 1520 1615 1521 if store_metric_thread.isAlive(): 1616 1522 1617 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName)1523 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' ) 1618 1524 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1619 1525 1620 1526 if store_metric_thread.isAlive(): 1621 1527 1622 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName)1528 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' ) 1623 1529 else: 1624 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished : cluster %s' %self.clusterName)1625 1626 debug_msg( 1, 'ganglia_store_thread(): finished : cluster %s' %self.clusterName)1530 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' ) 1531 1532 debug_msg( 1, 'ganglia_store_thread(): finished.' ) 1627 1533 1628 1534 return 0 … … 1631 1537 """Actual metric storing thread""" 1632 1538 1633 debug_msg( 1, 'ganglia_store_metric_thread(): started : cluster %s' %self.clusterName)1634 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName)1539 debug_msg( 1, 'ganglia_store_metric_thread(): started.' ) 1540 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' ) 1635 1541 1636 1542 ret = self.myXMLHandler.storeMetrics() 1637 1543 if ret > 0: 1638 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )1639 1640 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing : cluster %s' %self.clusterName)1641 debug_msg( 1, 'ganglia_store_metric_thread(): finished : cluster %s' %self.clusterName)1544 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) ) 1545 1546 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' ) 1547 debug_msg( 1, 'ganglia_store_metric_thread(): finished.' ) 1642 1548 1643 1549 return 0 … … 1650 1556 parsethread.start() 1651 1557 except thread.error, msg: 1652 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg)) )1558 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) ) 1653 1559 return 1 1654 1560 1655 debug_msg( 1, 'ganglia_xml_thread(): started : cluster %s' %self.clusterName)1656 1657 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss) : cluster %s' %(self.config.getLowestInterval(), self.clusterName) )1561 debug_msg( 1, 'ganglia_xml_thread(): started.' ) 1562 1563 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 1658 1564 time.sleep( float( self.config.getLowestInterval() ) ) 1659 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping : cluster %s' %self.clusterName)1565 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' ) 1660 1566 1661 1567 if parsethread.isAlive(): 1662 1568 1663 debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ))1569 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT ) 1664 1570 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1665 1571 1666 1572 if parsethread.isAlive(): 1667 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName)1573 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' ) 1668 1574 else: 1669 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName)1670 1671 debug_msg( 1, 'ganglia_xml_thread(): finished : cluster %s' %self.clusterName)1575 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' ) 1576 1577 debug_msg( 1, 'ganglia_xml_thread(): finished.' ) 1672 1578 1673 1579 return 0 … … 1676 1582 """Actual parsing thread""" 1677 1583 1678 1679 debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName ) 1680 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName ) 1584 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 1585 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' ) 1681 1586 1682 1587 my_data = self.myXMLSource.getData() 1683 1588 1684 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data)) )1589 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) ) 1685 1590 1686 1591 if my_data: 1687 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName ) 1688 1592 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 1689 1593 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 1690 1691 debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName ) 1692 #yappi.print_stats() 1693 1694 debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName ) 1594 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 1595 1596 debug_msg( 1, 'ganglia_parse_thread(): finished.' ) 1695 1597 1696 1598 return 0 … … 1777 1679 """Class for handling RRD activity""" 1778 1680 1681 1779 1682 def __init__( self, config, cluster ): 1780 1683 """Setup initial variables""" 1781 1684 1782 global MODRRDTOOL , ARCHIVE_EXCLUDE_METRICS1685 global MODRRDTOOL 1783 1686 1784 1687 self.block = 0 … … 1798 1701 global DEBUG_LEVEL 1799 1702 1800 if DEBUG_LEVEL <= 0:1703 if DEBUG_LEVEL <= 2: 1801 1704 self.gatherLastUpdates() 1802 1803 self.excludes = [ ]1804 1805 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:1806 1807 self.excludes.append( re.compile( ex_metricstr ) )1808 1705 1809 1706 def gatherLastUpdates( self ): … … 1865 1762 # <ATOMIC> 1866 1763 # 1764 self.slot.acquire() 1867 1765 1868 #if host in self.myMetrics: 1869 1870 #if self.myMetrics[ host ].has_key( metric['name'] ): 1871 1872 # if len( self.myMetrics[ host ][ metric['name'] ] ) > 0: 1873 1874 # if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']: 1875 1876 # return 1 1877 #for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1878 1879 # if mymetric['time'] == metric['time']: 1766 if self.myMetrics.has_key( host ): 1767 1768 if self.myMetrics[ host ].has_key( metric['name'] ): 1769 1770 for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1771 1772 if mymetric['time'] == metric['time']: 1880 1773 1881 1774 # Allready have this metric, abort 1882 # return 11883 #else:1884 #if metric['name'] not in self.myMetrics[ host ]:1885 # self.myMetrics[ host ][ metric['name'] ] = deque()1886 #else:1887 #self.myMetrics[ host ] = { }1888 # self.myMetrics[ host ][ metric['name'] ] = deque()1775 self.slot.release() 1776 return 1 1777 else: 1778 self.myMetrics[ host ][ metric['name'] ] = [ ] 1779 else: 1780 self.myMetrics[ host ] = { } 1781 self.myMetrics[ host ][ metric['name'] ] = [ ] 1889 1782 1890 1783 # Push new metric onto stack 1891 1784 # atomic code; only 1 thread at a time may access the stack 1892 #self.slot.acquire()1893 1894 try:1895 host_metrics = self.myMetrics[ host ]1896 except KeyError:1897 self.myMetrics[ host ] = { }1898 host_metrics = self.myMetrics[ host ]1899 1900 try:1901 metric_values = self.myMetrics[ host ][ metric['name'] ]1902 except KeyError:1903 self.myMetrics[ host ][ metric['name'] ] = deque()1904 metric_values = self.myMetrics[ host ][ metric['name'] ]1905 1906 try:1907 if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:1908 return 11909 except (IndexError, KeyError):1910 pass1911 1785 1912 1786 self.myMetrics[ host ][ metric['name'] ].append( metric ) 1913 1787 1914 #self.slot.release()1788 self.slot.release() 1915 1789 # 1916 1790 # </ATOMIC> … … 1983 1857 """ 1984 1858 1985 debug_msg( 5, "Entering storeMetrics() : cluster %s" %self.cluster)1859 debug_msg( 5, "Entering storeMetrics()") 1986 1860 1987 1861 count_values = 0 … … 2004 1878 count_bytes = count_bits / 8 2005 1879 2006 debug_msg( 1, "size of cluster '" + self.cluster + "': " +1880 debug_msg( 5, "size of cluster '" + self.cluster + "': " + 2007 1881 str( len( self.myMetrics.keys() ) ) + " hosts " + 2008 1882 str( count_metrics ) + " metrics " + str( count_values ) + " values " + … … 2013 1887 for metricname, mymetric in mymetrics.items(): 2014 1888 2015 for e in self.excludes:2016 2017 if e.match( metricname ):2018 2019 del self.myMetrics[ hostname ][ metricname ]2020 continue2021 2022 1889 metrics_to_store = [ ] 2023 1890 … … 2027 1894 # <ATOMIC> 2028 1895 # 2029 #self.slot.acquire() 2030 2031 if metricname in self.myMetrics[ hostname ]: 2032 2033 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2034 2035 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2036 2037 try: 2038 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() ) 2039 except IndexError, msg: 2040 2041 # Somehow sometimes myMetrics[ hostname ][ metricname ] 2042 # is still len 0 when the statement is executed. 2043 # Just ignore indexerror's.. 2044 pass 2045 2046 #self.slot.release() 1896 self.slot.acquire() 1897 1898 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1899 1900 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1901 1902 try: 1903 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) ) 1904 except IndexError, msg: 1905 1906 # Somehow sometimes myMetrics[ hostname ][ metricname ] 1907 # is still len 0 when the statement is executed. 1908 # Just ignore indexerror's.. 1909 pass 1910 1911 self.slot.release() 2047 1912 # 2048 1913 # </ATOMIC> … … 2076 1941 self.memLastUpdate( hostname, metricname, metrics_to_store ) 2077 1942 2078 debug_msg( 5, "Leaving storeMetrics() : cluster %s" %self.cluster)1943 debug_msg( 5, "Leaving storeMetrics()") 2079 1944 2080 1945 def makeTimeSerial( self ): … … 2296 2161 2297 2162 myJobProcessor = JobXMLProcessor( myXMLSource, myDataStore ) 2298 2299 myGangliaProcessors= [ ] 2300 2301 for archive_cluster in ARCHIVE_DATASOURCES: 2302 2303 myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) ) 2304 2305 ganglia_xml_threads = [ ] 2163 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 2306 2164 2307 2165 try: 2308 2166 job_xml_thread = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' ) 2309 2310 t = 0 2311 2312 for ganglia_processor in myGangliaProcessors: 2313 2314 ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) ) 2315 2316 t = t + 1 2167 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 2317 2168 2318 2169 job_xml_thread.start() 2319 2320 for t in ganglia_xml_threads: 2321 2322 t.start() 2170 ganglia_xml_thread.start() 2323 2171 2324 2172 except thread.error, msg:
Note: See TracChangeset
for help on using the changeset viewer.