Changeset 949 for branches/1.2
- Timestamp:
- 01/20/14 17:04:55 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.2/jobarchived/jobarchived.py
r930 r949 23 23 24 24 import getopt, syslog, ConfigParser, sys 25 from collections import deque 26 import time 27 from pprint import pprint 28 #import yappi 29 30 #yappi.start() 31 32 try: 33 from resource import getrusage, RUSAGE_SELF 34 except ImportError: 35 RUSAGE_SELF = 0 36 def getrusage(who=0): 37 return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0 38 39 p_stats = None 40 p_start_time = None 41 42 def profiler(frame, event, arg): 43 if event not in ('call','return'): return profiler 44 #### gather stats #### 45 rusage = getrusage(RUSAGE_SELF) 46 t_cpu = rusage[0] + rusage[1] # user time + system time 47 code = frame.f_code 48 fun = (code.co_name, code.co_filename, code.co_firstlineno) 49 #### get stack with functions entry stats #### 50 ct = threading.currentThread() 51 try: 52 p_stack = ct.p_stack 53 except AttributeError: 54 ct.p_stack = deque() 55 p_stack = ct.p_stack 56 #### handle call and return #### 57 if event == 'call': 58 p_stack.append((time.time(), t_cpu, fun)) 59 elif event == 'return': 60 try: 61 t,t_cpu_prev,f = p_stack.pop() 62 assert f == fun 63 except IndexError: # TODO investigate 64 t,t_cpu_prev,f = p_start_time, 0.0, None 65 call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0)) 66 p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev) 67 return profiler 68 69 70 def profile_on(): 71 global p_stats, p_start_time 72 p_stats = {} 73 p_start_time = time.time() 74 threading.setprofile(profiler) 75 sys.setprofile(profiler) 76 77 debug_msg( 1, 'profile_on(): profiling..' ) 78 79 def profile_off(): 80 threading.setprofile(None) 81 sys.setprofile(None) 82 debug_msg( 1, 'profile_on(): profiling ended..' ) 83 84 def get_profile_stats(): 85 """ 86 returns dict[function_tuple] -> stats_tuple 87 where 88 function_tuple = (function_name, filename, lineno) 89 stats_tuple = (call_cnt, real_time, cpu_time) 90 """ 91 debug_msg( 1, 'get_profile_stats(): dumping stats..' ) 92 return p_stats 25 93 26 94 VERSION='__VERSION__' … … 920 988 for j in timedout_jobs: 921 989 922 del self.jobAttrs[ j ] 923 del self.jobAttrsSaved[ j ] 990 try: 991 del self.jobAttrs[ j ] 992 del self.jobAttrsSaved[ j ] 993 except KeyError: 994 pass 924 995 925 996 debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) ) … … 932 1003 """ 933 1004 934 global ARCHIVE_DATASOURCES935 936 1005 jobinfo = { } 937 1006 … … 973 1042 974 1043 self.jobs_processed.append( job_id ) 1044 975 1045 976 1046 def endDocument( self ): … … 1132 1202 """Parse Ganglia's XML""" 1133 1203 1134 def __init__( self, config, datastore ):1204 def __init__( self, config, datastore, cluster ): 1135 1205 """Setup initial variables and gather info on existing rrd archive""" 1136 1206 1137 1207 self.config = config 1138 self.cluster s = { }1208 self.clusterName = cluster 1139 1209 self.ds = datastore 1140 1141 debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' ) 1142 self.gatherClusters() 1143 debug_msg( 1, 'Housekeeping: RRD check complete.' ) 1144 1145 def gatherClusters( self ): 1210 self.rrd_handler = None 1211 self.cluster_start = False 1212 1213 debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName ) 1214 self.gatherCluster() 1215 debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName ) 1216 1217 def gatherCluster( self ): 1146 1218 """Find all existing clusters in archive dir""" 1147 1219 … … 1156 1228 dirlist = os.listdir( archive_dir ) 1157 1229 1158 for cfgcluster in ARCHIVE_DATASOURCES: 1159 1160 if cfgcluster not in dirlist: 1161 1162 # Autocreate a directory for this cluster 1163 # assume it is new 1164 # 1165 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1166 1167 os.mkdir( cluster_dir ) 1168 1169 dirlist.append( cfgcluster ) 1170 1171 for item in dirlist: 1172 1173 clustername = item 1174 1175 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES: 1176 1177 self.clusters[ clustername ] = RRDHandler( self.config, clustername ) 1178 1179 debug_msg( 9, 'Found cluster dir: %s' %( clustername ) ) 1180 1181 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" ) 1230 if self.clusterName not in dirlist: 1231 1232 # Autocreate a directory for this cluster 1233 # assume it is new 1234 # 1235 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1236 1237 os.mkdir( cluster_dir ) 1238 1239 dirlist.append( cfgcluster ) 1240 1241 for d in dirlist: 1242 1243 if not self.rrd_handler and d == self.clusterName: 1244 1245 self.rrd_handler = RRDHandler( self.config, d ) 1246 1247 debug_msg( 9, 'Found cluster dir: %s' %( d ) ) 1182 1248 1183 1249 def startElement( self, name, attrs ): 1184 1250 """Memorize appropriate data from xml start tags""" 1185 1251 1186 global ARCHIVE_DATASOURCES1187 1188 1252 if name == 'GANGLIA_XML': 1189 1253 1190 self.XMLSource = str( attrs.get( 'SOURCE', "" ))1191 self.gangliaVersion = str( attrs.get( 'VERSION', "" ))1254 self.XMLSource = attrs.get( 'SOURCE', "" ) 1255 self.gangliaVersion = attrs.get( 'VERSION', "" ) 1192 1256 1193 1257 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) 1194 1258 1195 elif name == 'GRID': 1196 1197 self.gridName = str( attrs.get( 'NAME', "" ) ) 1198 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1259 return 0 1260 1261 if name == 'GRID': 1262 1263 self.gridName = attrs.get( 'NAME', "" ) 1264 self.time = attrs.get( 'LOCALTIME', "" ) 1199 1265 1200 1266 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) 1201 1267 1202 elif name == 'CLUSTER': 1203 1204 self.clusterName = str( attrs.get( 'NAME', "" ) ) 1205 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1206 1207 if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES: 1208 1209 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName ) 1210 1211 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1212 1213 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 1214 1215 self.hostName = str( attrs.get( 'NAME', "" ) ) 1216 self.hostIp = str( attrs.get( 'IP', "" ) ) 1217 self.hostReported = str( attrs.get( 'REPORTED', "" ) ) 1268 return 0 1269 1270 if name == 'CLUSTER': 1271 1272 xmlClusterName = attrs.get( 'NAME', "" ) 1273 self.time = attrs.get( 'LOCALTIME', "" ) 1274 1275 if self.clusterName == xmlClusterName: 1276 1277 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1278 1279 self.cluster_start = True 1280 1281 if not self.rrd_handler: 1282 1283 self.rrd_handler = RRDHandler( self.config, self.clusterName ) 1284 else: 1285 self.cluster_start = False 1286 1287 return 0 1288 1289 if name == 'HOST' and self.cluster_start: 1290 1291 self.hostName = attrs.get( 'NAME', "" ) 1292 self.hostIp = attrs.get( 'IP', "" ) 1293 self.hostReported = attrs.get( 'REPORTED', "" ) 1218 1294 1219 1295 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) 1220 1296 1221 elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES: 1222 1223 type = str( attrs.get( 'TYPE', "" ) ) 1297 return 0 1298 1299 if name == 'METRIC' and self.cluster_start: 1300 1301 #type = attrs.get( 'TYPE', "" ) 1302 #orig_name = attrs.get( 'NAME', "" ) 1224 1303 1225 exclude_metric = False 1226 1227 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1228 1229 orig_name = str( attrs.get( 'NAME', "" ) ) 1230 1231 if string.lower( orig_name ) == string.lower( ex_metricstr ): 1232 1233 exclude_metric = True 1234 1235 elif re.match( ex_metricstr, orig_name ): 1236 1237 exclude_metric = True 1238 1239 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 1240 1241 myMetric = { } 1242 myMetric['name'] = str( attrs.get( 'NAME', "" ) ) 1243 myMetric['val'] = str( attrs.get( 'VAL', "" ) ) 1244 myMetric['time'] = self.hostReported 1245 1246 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) 1247 1248 debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1249 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1304 if attrs.get( 'TYPE', "" ) != 'string': 1305 1306 #myMetric = { } 1307 #myMetric['name'] = attrs.get( 'NAME', "" ) 1308 #myMetric['val'] = attrs.get( 'VAL', "" ) 1309 #myMetric['time'] = self.hostReported 1310 1311 self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL', "" ), 'time': self.hostReported } ) 1312 1313 #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1314 #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1315 1316 1317 def endElement( self, name ): 1318 1319 if name == 'CLUSTER' and self.cluster_start: 1320 1321 self.cluster_start = False 1322 debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) ) 1250 1323 1251 1324 def storeMetrics( self ): 1252 1325 """Store metrics of each cluster rrd handler""" 1253 1326 1254 for clustername, rrdh in self.clusters.items(): 1255 1256 ret = rrdh.storeMetrics() 1257 1258 if ret: 1259 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1260 return 1 1327 ret = self.rrd_handler.storeMetrics() 1328 1329 if ret: 1330 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1331 return 1 1261 1332 1262 1333 return 0 … … 1329 1400 """Setup connection to XML source""" 1330 1401 1402 if self.update_now: 1403 return 0 1404 1331 1405 self.update_now = True 1332 1406 1333 self.slot.acquire()1407 #self.slot.acquire() 1334 1408 1335 1409 self.data = None 1410 1411 debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." ) 1336 1412 1337 1413 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 1369 1445 1370 1446 my_fp = self.s.makefile( 'r' ) 1371 my_data = my_fp.readlines() 1372 my_data = string.join( my_data, '' ) 1373 1374 self.data = my_data 1447 #my_data = my_fp.readlines() 1448 #my_data = string.join( my_data, '' ) 1449 1450 #self.data = my_data 1451 self.data = my_fp.read() 1375 1452 1376 1453 self.LAST_UPDATE = time.time() 1377 1454 1378 self.slot.release() 1455 self.disconnect() 1456 1457 #self.slot.release() 1458 1459 debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." ) 1379 1460 1380 1461 self.update_now = False … … 1396 1477 """Reconnect""" 1397 1478 1398 while self.update_now: 1479 if self.update_now: 1480 return 0 1481 1482 #while self.update_now: 1399 1483 1400 1484 # Must be another update in progress: 1401 1485 # Wait until the update is complete 1402 1486 # 1403 time.sleep( 1 )1487 # time.sleep( 1 ) 1404 1488 1405 1489 if self.s: 1406 1490 self.disconnect() 1407 1491 1408 self.retrieveData() 1492 cur_time = time.time() 1493 1494 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1495 1496 if not self.update_now: 1497 1498 self.retrieveData() 1409 1499 1410 1500 def getData( self ): … … 1420 1510 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1421 1511 1422 self.reGetData() 1512 if not self.update_now: 1513 1514 self.reGetData() 1423 1515 1424 1516 while self.update_now: … … 1450 1542 """Main class for processing XML and acting with it""" 1451 1543 1452 def __init__( self, XMLSource, DataStore ):1544 def __init__( self, XMLSource, DataStore, cluster ): 1453 1545 """Setup initial XML connection and handlers""" 1454 1546 … … 1456 1548 self.myXMLSource = XMLSource 1457 1549 self.ds = DataStore 1458 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )1459 1550 self.myXMLError = XMLErrorHandler() 1551 self.clusterName = cluster 1552 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName ) 1553 1460 1554 1461 1555 def run( self ): … … 1476 1570 xml_thread.start() 1477 1571 except thread.error, msg: 1478 debug_msg( 0, 'ERROR: Unable to start xml_thread !: '+str(msg))1572 debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) ) 1479 1573 #return 1 1480 1574 … … 1488 1582 store_thread.start() 1489 1583 except thread.error, msg: 1490 debug_msg( 0, 'ERROR: Unable to start store_thread !: '+str(msg))1584 debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) ) 1491 1585 #return 1 1492 1586 … … 1513 1607 return 1 1514 1608 1515 debug_msg( 1, 'ganglia_store_thread(): started .')1516 1517 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss) ' %STORE_INTERVAL)1609 debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName ) 1610 1611 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) ) 1518 1612 time.sleep( STORE_INTERVAL ) 1519 debug_msg( 1, 'ganglia_store_thread(): Done sleeping .')1613 debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName ) 1520 1614 1521 1615 if store_metric_thread.isAlive(): 1522 1616 1523 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..')1617 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName ) 1524 1618 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1525 1619 1526 1620 if store_metric_thread.isAlive(): 1527 1621 1528 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?')1622 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName ) 1529 1623 else: 1530 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished ')1531 1532 debug_msg( 1, 'ganglia_store_thread(): finished .')1624 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName ) 1625 1626 debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName ) 1533 1627 1534 1628 return 0 … … 1537 1631 """Actual metric storing thread""" 1538 1632 1539 debug_msg( 1, 'ganglia_store_metric_thread(): started .')1540 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. ')1633 debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName ) 1634 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName ) 1541 1635 1542 1636 ret = self.myXMLHandler.storeMetrics() 1543 1637 if ret > 0: 1544 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! ' %str(ret) )1545 1546 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing .')1547 debug_msg( 1, 'ganglia_store_metric_thread(): finished .')1638 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) ) 1639 1640 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName ) 1641 debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName ) 1548 1642 1549 1643 return 0 … … 1556 1650 parsethread.start() 1557 1651 except thread.error, msg: 1558 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! : ' + str(msg) )1652 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) ) 1559 1653 return 1 1560 1654 1561 debug_msg( 1, 'ganglia_xml_thread(): started .')1562 1563 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss) ' %self.config.getLowestInterval() )1655 debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName ) 1656 1657 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) ) 1564 1658 time.sleep( float( self.config.getLowestInterval() ) ) 1565 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping .')1659 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName ) 1566 1660 1567 1661 if parsethread.isAlive(): 1568 1662 1569 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT)1663 debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) ) 1570 1664 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1571 1665 1572 1666 if parsethread.isAlive(): 1573 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?')1667 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName ) 1574 1668 else: 1575 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished')1576 1577 debug_msg( 1, 'ganglia_xml_thread(): finished .')1669 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName ) 1670 1671 debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName ) 1578 1672 1579 1673 return 0 … … 1582 1676 """Actual parsing thread""" 1583 1677 1584 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 1585 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' ) 1678 1679 debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName ) 1680 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName ) 1586 1681 1587 1682 my_data = self.myXMLSource.getData() 1588 1683 1589 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving : data size %d' %len(my_data) )1684 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) ) 1590 1685 1591 1686 if my_data: 1592 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 1687 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName ) 1688 1593 1689 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 1594 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 1595 1596 debug_msg( 1, 'ganglia_parse_thread(): finished.' ) 1690 1691 debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName ) 1692 #yappi.print_stats() 1693 1694 debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName ) 1597 1695 1598 1696 return 0 … … 1679 1777 """Class for handling RRD activity""" 1680 1778 1681 1682 1779 def __init__( self, config, cluster ): 1683 1780 """Setup initial variables""" 1684 1781 1685 global MODRRDTOOL 1782 global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS 1686 1783 1687 1784 self.block = 0 … … 1701 1798 global DEBUG_LEVEL 1702 1799 1703 if DEBUG_LEVEL <= 2:1800 if DEBUG_LEVEL <= 0: 1704 1801 self.gatherLastUpdates() 1802 1803 self.excludes = [ ] 1804 1805 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1806 1807 self.excludes.append( re.compile( ex_metricstr ) ) 1705 1808 1706 1809 def gatherLastUpdates( self ): … … 1762 1865 # <ATOMIC> 1763 1866 # 1764 self.slot.acquire()1765 1867 1766 if self.myMetrics.has_key( host ): 1767 1768 if self.myMetrics[ host ].has_key( metric['name'] ): 1769 1770 for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1771 1772 if mymetric['time'] == metric['time']: 1868 #if host in self.myMetrics: 1869 1870 #if self.myMetrics[ host ].has_key( metric['name'] ): 1871 1872 # if len( self.myMetrics[ host ][ metric['name'] ] ) > 0: 1873 1874 # if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']: 1875 1876 # return 1 1877 #for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1878 1879 # if mymetric['time'] == metric['time']: 1773 1880 1774 1881 # Allready have this metric, abort 1775 self.slot.release()1776 return 11777 else:1778 self.myMetrics[ host ][ metric['name'] ] = [ ]1779 else:1780 self.myMetrics[ host ] = { }1781 self.myMetrics[ host ][ metric['name'] ] = [ ]1882 # return 1 1883 #else: 1884 #if metric['name'] not in self.myMetrics[ host ]: 1885 # self.myMetrics[ host ][ metric['name'] ] = deque() 1886 #else: 1887 # self.myMetrics[ host ] = { } 1888 # self.myMetrics[ host ][ metric['name'] ] = deque() 1782 1889 1783 1890 # Push new metric onto stack 1784 1891 # atomic code; only 1 thread at a time may access the stack 1892 #self.slot.acquire() 1893 1894 try: 1895 host_metrics = self.myMetrics[ host ] 1896 except KeyError: 1897 self.myMetrics[ host ] = { } 1898 host_metrics = self.myMetrics[ host ] 1899 1900 try: 1901 metric_values = self.myMetrics[ host ][ metric['name'] ] 1902 except KeyError: 1903 self.myMetrics[ host ][ metric['name'] ] = deque() 1904 metric_values = self.myMetrics[ host ][ metric['name'] ] 1905 1906 try: 1907 if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']: 1908 return 1 1909 except (IndexError, KeyError): 1910 pass 1785 1911 1786 1912 self.myMetrics[ host ][ metric['name'] ].append( metric ) 1787 1913 1788 self.slot.release()1914 #self.slot.release() 1789 1915 # 1790 1916 # </ATOMIC> … … 1857 1983 """ 1858 1984 1859 debug_msg( 5, "Entering storeMetrics() ")1985 debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster ) 1860 1986 1861 1987 count_values = 0 … … 1878 2004 count_bytes = count_bits / 8 1879 2005 1880 debug_msg( 5, "size of cluster '" + self.cluster + "': " +2006 debug_msg( 1, "size of cluster '" + self.cluster + "': " + 1881 2007 str( len( self.myMetrics.keys() ) ) + " hosts " + 1882 2008 str( count_metrics ) + " metrics " + str( count_values ) + " values " + … … 1887 2013 for metricname, mymetric in mymetrics.items(): 1888 2014 2015 for e in self.excludes: 2016 2017 if e.match( metricname ): 2018 2019 del self.myMetrics[ hostname ][ metricname ] 2020 continue 2021 1889 2022 metrics_to_store = [ ] 1890 2023 … … 1894 2027 # <ATOMIC> 1895 2028 # 1896 self.slot.acquire() 1897 1898 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1899 1900 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1901 1902 try: 1903 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) ) 1904 except IndexError, msg: 1905 1906 # Somehow sometimes myMetrics[ hostname ][ metricname ] 1907 # is still len 0 when the statement is executed. 1908 # Just ignore indexerror's.. 1909 pass 1910 1911 self.slot.release() 2029 #self.slot.acquire() 2030 2031 if metricname in self.myMetrics[ hostname ]: 2032 2033 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2034 2035 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2036 2037 try: 2038 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() ) 2039 except IndexError, msg: 2040 2041 # Somehow sometimes myMetrics[ hostname ][ metricname ] 2042 # is still len 0 when the statement is executed. 2043 # Just ignore indexerror's.. 2044 pass 2045 2046 #self.slot.release() 1912 2047 # 1913 2048 # </ATOMIC> … … 1941 2076 self.memLastUpdate( hostname, metricname, metrics_to_store ) 1942 2077 1943 debug_msg( 5, "Leaving storeMetrics() ")2078 debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster ) 1944 2079 1945 2080 def makeTimeSerial( self ): … … 2161 2296 2162 2297 myJobProcessor = JobXMLProcessor( myXMLSource, myDataStore ) 2163 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 2298 2299 myGangliaProcessors= [ ] 2300 2301 for archive_cluster in ARCHIVE_DATASOURCES: 2302 2303 myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) ) 2304 2305 ganglia_xml_threads = [ ] 2164 2306 2165 2307 try: 2166 2308 job_xml_thread = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' ) 2167 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 2309 2310 t = 0 2311 2312 for ganglia_processor in myGangliaProcessors: 2313 2314 ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) ) 2315 2316 t = t + 1 2168 2317 2169 2318 job_xml_thread.start() 2170 ganglia_xml_thread.start() 2319 2320 for t in ganglia_xml_threads: 2321 2322 t.start() 2171 2323 2172 2324 except thread.error, msg:
Note: See TracChangeset
for help on using the changeset viewer.