- Timestamp:
- 01/20/14 17:04:55 (10 years ago)
- Location:
- branches
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/1.1/jobarchived/jobarchived.py
r945 r949 23 23 24 24 import getopt, syslog, ConfigParser, sys 25 from collections import deque26 import time27 from pprint import pprint28 #import yappi29 30 #yappi.start()31 32 try:33 from resource import getrusage, RUSAGE_SELF34 except ImportError:35 RUSAGE_SELF = 036 def getrusage(who=0):37 return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.038 39 p_stats = None40 p_start_time = None41 42 def profiler(frame, event, arg):43 if event not in ('call','return'): return profiler44 #### gather stats ####45 rusage = getrusage(RUSAGE_SELF)46 t_cpu = rusage[0] + rusage[1] # user time + system time47 code = frame.f_code48 fun = (code.co_name, code.co_filename, code.co_firstlineno)49 #### get stack with functions entry stats ####50 ct = threading.currentThread()51 try:52 p_stack = ct.p_stack53 except AttributeError:54 ct.p_stack = deque()55 p_stack = ct.p_stack56 #### handle call and return ####57 if event == 'call':58 p_stack.append((time.time(), t_cpu, fun))59 elif event == 'return':60 try:61 t,t_cpu_prev,f = p_stack.pop()62 assert f == fun63 except IndexError: # TODO investigate64 t,t_cpu_prev,f = p_start_time, 0.0, None65 call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))66 p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)67 return profiler68 69 70 def profile_on():71 global p_stats, p_start_time72 p_stats = {}73 p_start_time = time.time()74 threading.setprofile(profiler)75 sys.setprofile(profiler)76 77 debug_msg( 1, 'profile_on(): profiling..' )78 79 def profile_off():80 threading.setprofile(None)81 sys.setprofile(None)82 debug_msg( 1, 'profile_on(): profiling ended..' )83 84 def get_profile_stats():85 """86 returns dict[function_tuple] -> stats_tuple87 where88 function_tuple = (function_name, filename, lineno)89 stats_tuple = (call_cnt, real_time, cpu_time)90 """91 debug_msg( 1, 'get_profile_stats(): dumping stats..' )92 return p_stats93 25 94 26 VERSION='__VERSION__' … … 988 920 for j in timedout_jobs: 989 921 990 try: 991 del self.jobAttrs[ j ] 992 del self.jobAttrsSaved[ j ] 993 except KeyError: 994 pass 922 del self.jobAttrs[ j ] 923 del self.jobAttrsSaved[ j ] 995 924 996 925 debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) ) … … 1003 932 """ 1004 933 934 global ARCHIVE_DATASOURCES 935 1005 936 jobinfo = { } 1006 937 … … 1042 973 1043 974 self.jobs_processed.append( job_id ) 1044 1045 975 1046 976 def endDocument( self ): … … 1202 1132 """Parse Ganglia's XML""" 1203 1133 1204 def __init__( self, config, datastore , cluster):1134 def __init__( self, config, datastore ): 1205 1135 """Setup initial variables and gather info on existing rrd archive""" 1206 1136 1207 1137 self.config = config 1208 self.cluster Name = cluster1138 self.clusters = { } 1209 1139 self.ds = datastore 1210 self.rrd_handler = None 1211 self.cluster_start = False 1212 1213 debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName ) 1214 self.gatherCluster() 1215 debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName ) 1216 1217 def gatherCluster( self ): 1140 1141 debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' ) 1142 self.gatherClusters() 1143 debug_msg( 1, 'Housekeeping: RRD check complete.' ) 1144 1145 def gatherClusters( self ): 1218 1146 """Find all existing clusters in archive dir""" 1219 1147 … … 1228 1156 dirlist = os.listdir( archive_dir ) 1229 1157 1230 if self.clusterName not in dirlist: 1231 1232 # Autocreate a directory for this cluster 1233 # assume it is new 1234 # 1235 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1236 1237 os.mkdir( cluster_dir ) 1238 1239 dirlist.append( cfgcluster ) 1240 1241 for d in dirlist: 1242 1243 if not self.rrd_handler and d == self.clusterName: 1244 1245 self.rrd_handler = RRDHandler( self.config, d ) 1246 1247 debug_msg( 9, 'Found cluster dir: %s' %( d ) ) 1158 for cfgcluster in ARCHIVE_DATASOURCES: 1159 1160 if cfgcluster not in dirlist: 1161 1162 # Autocreate a directory for this cluster 1163 # assume it is new 1164 # 1165 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1166 1167 os.mkdir( cluster_dir ) 1168 1169 dirlist.append( cfgcluster ) 1170 1171 for item in dirlist: 1172 1173 clustername = item 1174 1175 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES: 1176 1177 self.clusters[ clustername ] = RRDHandler( self.config, clustername ) 1178 1179 debug_msg( 9, 'Found cluster dir: %s' %( clustername ) ) 1180 1181 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" ) 1248 1182 1249 1183 def startElement( self, name, attrs ): 1250 1184 """Memorize appropriate data from xml start tags""" 1251 1185 1186 global ARCHIVE_DATASOURCES 1187 1252 1188 if name == 'GANGLIA_XML': 1253 1189 1254 self.XMLSource = attrs.get( 'SOURCE', "")1255 self.gangliaVersion = attrs.get( 'VERSION', "")1190 self.XMLSource = str( attrs.get( 'SOURCE', "" ) ) 1191 self.gangliaVersion = str( attrs.get( 'VERSION', "" ) ) 1256 1192 1257 1193 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) 1258 1194 1259 return 0 1260 1261 if name == 'GRID': 1262 1263 self.gridName = attrs.get( 'NAME', "" ) 1264 self.time = attrs.get( 'LOCALTIME', "" ) 1195 elif name == 'GRID': 1196 1197 self.gridName = str( attrs.get( 'NAME', "" ) ) 1198 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1265 1199 1266 1200 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) 1267 1201 1268 return 0 1269 1270 if name == 'CLUSTER': 1271 1272 xmlClusterName = attrs.get( 'NAME', "" ) 1273 self.time = attrs.get( 'LOCALTIME', "" ) 1274 1275 if self.clusterName == xmlClusterName: 1276 1277 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1278 1279 self.cluster_start = True 1280 1281 if not self.rrd_handler: 1282 1283 self.rrd_handler = RRDHandler( self.config, self.clusterName ) 1284 else: 1285 self.cluster_start = False 1286 1287 return 0 1288 1289 if name == 'HOST' and self.cluster_start: 1290 1291 self.hostName = attrs.get( 'NAME', "" ) 1292 self.hostIp = attrs.get( 'IP', "" ) 1293 self.hostReported = attrs.get( 'REPORTED', "" ) 1202 elif name == 'CLUSTER': 1203 1204 self.clusterName = str( attrs.get( 'NAME', "" ) ) 1205 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1206 1207 if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES: 1208 1209 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName ) 1210 1211 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1212 1213 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 1214 1215 self.hostName = str( attrs.get( 'NAME', "" ) ) 1216 self.hostIp = str( attrs.get( 'IP', "" ) ) 1217 self.hostReported = str( attrs.get( 'REPORTED', "" ) ) 1294 1218 1295 1219 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) 1296 1220 1297 return 0 1298 1299 if name == 'METRIC' and self.cluster_start: 1300 1301 #type = attrs.get( 'TYPE', "" ) 1302 #orig_name = attrs.get( 'NAME', "" ) 1221 elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES: 1222 1223 type = str( attrs.get( 'TYPE', "" ) ) 1303 1224 1304 if attrs.get( 'TYPE', "" ) != 'string': 1305 1306 #myMetric = { } 1307 #myMetric['name'] = attrs.get( 'NAME', "" ) 1308 #myMetric['val'] = attrs.get( 'VAL', "" ) 1309 #myMetric['time'] = self.hostReported 1310 1311 self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL', "" ), 'time': self.hostReported } ) 1312 1313 #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1314 #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1315 1316 1317 def endElement( self, name ): 1318 1319 if name == 'CLUSTER' and self.cluster_start: 1320 1321 self.cluster_start = False 1322 debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) ) 1225 exclude_metric = False 1226 1227 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1228 1229 orig_name = str( attrs.get( 'NAME', "" ) ) 1230 1231 if string.lower( orig_name ) == string.lower( ex_metricstr ): 1232 1233 exclude_metric = True 1234 1235 elif re.match( ex_metricstr, orig_name ): 1236 1237 exclude_metric = True 1238 1239 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 1240 1241 myMetric = { } 1242 myMetric['name'] = str( attrs.get( 'NAME', "" ) ) 1243 myMetric['val'] = str( attrs.get( 'VAL', "" ) ) 1244 myMetric['time'] = self.hostReported 1245 1246 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) 1247 1248 debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1249 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1323 1250 1324 1251 def storeMetrics( self ): 1325 1252 """Store metrics of each cluster rrd handler""" 1326 1253 1327 ret = self.rrd_handler.storeMetrics() 1328 1329 if ret: 1330 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1331 return 1 1254 for clustername, rrdh in self.clusters.items(): 1255 1256 ret = rrdh.storeMetrics() 1257 1258 if ret: 1259 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1260 return 1 1332 1261 1333 1262 return 0 … … 1400 1329 """Setup connection to XML source""" 1401 1330 1402 if self.update_now:1403 return 01404 1405 1331 self.update_now = True 1406 1332 1407 #self.slot.acquire()1333 self.slot.acquire() 1408 1334 1409 1335 self.data = None 1410 1411 debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )1412 1336 1413 1337 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 1445 1369 1446 1370 my_fp = self.s.makefile( 'r' ) 1447 #my_data = my_fp.readlines() 1448 #my_data = string.join( my_data, '' ) 1449 1450 #self.data = my_data 1451 self.data = my_fp.read() 1371 my_data = my_fp.readlines() 1372 my_data = string.join( my_data, '' ) 1373 1374 self.data = my_data 1452 1375 1453 1376 self.LAST_UPDATE = time.time() 1454 1377 1455 self.disconnect() 1456 1457 #self.slot.release() 1458 1459 debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." ) 1378 self.slot.release() 1460 1379 1461 1380 self.update_now = False … … 1477 1396 """Reconnect""" 1478 1397 1479 if self.update_now: 1480 return 0 1481 1482 #while self.update_now: 1398 while self.update_now: 1483 1399 1484 1400 # Must be another update in progress: 1485 1401 # Wait until the update is complete 1486 1402 # 1487 #time.sleep( 1 )1403 time.sleep( 1 ) 1488 1404 1489 1405 if self.s: 1490 1406 self.disconnect() 1491 1407 1492 cur_time = time.time() 1493 1494 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1495 1496 if not self.update_now: 1497 1498 self.retrieveData() 1408 self.retrieveData() 1499 1409 1500 1410 def getData( self ): … … 1510 1420 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1511 1421 1512 if not self.update_now: 1513 1514 self.reGetData() 1422 self.reGetData() 1515 1423 1516 1424 while self.update_now: … … 1542 1450 """Main class for processing XML and acting with it""" 1543 1451 1544 def __init__( self, XMLSource, DataStore , cluster):1452 def __init__( self, XMLSource, DataStore ): 1545 1453 """Setup initial XML connection and handlers""" 1546 1454 … … 1548 1456 self.myXMLSource = XMLSource 1549 1457 self.ds = DataStore 1458 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds ) 1550 1459 self.myXMLError = XMLErrorHandler() 1551 self.clusterName = cluster1552 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )1553 1554 1460 1555 1461 def run( self ): … … 1570 1476 xml_thread.start() 1571 1477 except thread.error, msg: 1572 debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)))1478 debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg)) 1573 1479 #return 1 1574 1480 … … 1582 1488 store_thread.start() 1583 1489 except thread.error, msg: 1584 debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)))1490 debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg)) 1585 1491 #return 1 1586 1492 … … 1607 1513 return 1 1608 1514 1609 debug_msg( 1, 'ganglia_store_thread(): started : cluster %s' %self.clusterName)1610 1611 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss) : cluster %s' %(STORE_INTERVAL, self.clusterName ))1515 debug_msg( 1, 'ganglia_store_thread(): started.' ) 1516 1517 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 1612 1518 time.sleep( STORE_INTERVAL ) 1613 debug_msg( 1, 'ganglia_store_thread(): Done sleeping : cluster %s' %self.clusterName)1519 debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' ) 1614 1520 1615 1521 if store_metric_thread.isAlive(): 1616 1522 1617 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName)1523 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' ) 1618 1524 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1619 1525 1620 1526 if store_metric_thread.isAlive(): 1621 1527 1622 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName)1528 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' ) 1623 1529 else: 1624 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished : cluster %s' %self.clusterName)1625 1626 debug_msg( 1, 'ganglia_store_thread(): finished : cluster %s' %self.clusterName)1530 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' ) 1531 1532 debug_msg( 1, 'ganglia_store_thread(): finished.' ) 1627 1533 1628 1534 return 0 … … 1631 1537 """Actual metric storing thread""" 1632 1538 1633 debug_msg( 1, 'ganglia_store_metric_thread(): started : cluster %s' %self.clusterName)1634 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName)1539 debug_msg( 1, 'ganglia_store_metric_thread(): started.' ) 1540 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' ) 1635 1541 1636 1542 ret = self.myXMLHandler.storeMetrics() 1637 1543 if ret > 0: 1638 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )1639 1640 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing : cluster %s' %self.clusterName)1641 debug_msg( 1, 'ganglia_store_metric_thread(): finished : cluster %s' %self.clusterName)1544 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) ) 1545 1546 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' ) 1547 debug_msg( 1, 'ganglia_store_metric_thread(): finished.' ) 1642 1548 1643 1549 return 0 … … 1650 1556 parsethread.start() 1651 1557 except thread.error, msg: 1652 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg)) )1558 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) ) 1653 1559 return 1 1654 1560 1655 debug_msg( 1, 'ganglia_xml_thread(): started : cluster %s' %self.clusterName)1656 1657 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss) : cluster %s' %(self.config.getLowestInterval(), self.clusterName) )1561 debug_msg( 1, 'ganglia_xml_thread(): started.' ) 1562 1563 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 1658 1564 time.sleep( float( self.config.getLowestInterval() ) ) 1659 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping : cluster %s' %self.clusterName)1565 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' ) 1660 1566 1661 1567 if parsethread.isAlive(): 1662 1568 1663 debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ))1569 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT ) 1664 1570 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1665 1571 1666 1572 if parsethread.isAlive(): 1667 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName)1573 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' ) 1668 1574 else: 1669 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName)1670 1671 debug_msg( 1, 'ganglia_xml_thread(): finished : cluster %s' %self.clusterName)1575 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' ) 1576 1577 debug_msg( 1, 'ganglia_xml_thread(): finished.' ) 1672 1578 1673 1579 return 0 … … 1676 1582 """Actual parsing thread""" 1677 1583 1678 1679 debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName ) 1680 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName ) 1584 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 1585 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' ) 1681 1586 1682 1587 my_data = self.myXMLSource.getData() 1683 1588 1684 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data)) )1589 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) ) 1685 1590 1686 1591 if my_data: 1687 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName ) 1688 1592 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 1689 1593 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 1690 1691 debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName ) 1692 #yappi.print_stats() 1693 1694 debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName ) 1594 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 1595 1596 debug_msg( 1, 'ganglia_parse_thread(): finished.' ) 1695 1597 1696 1598 return 0 … … 1777 1679 """Class for handling RRD activity""" 1778 1680 1681 1779 1682 def __init__( self, config, cluster ): 1780 1683 """Setup initial variables""" 1781 1684 1782 global MODRRDTOOL , ARCHIVE_EXCLUDE_METRICS1685 global MODRRDTOOL 1783 1686 1784 1687 self.block = 0 … … 1798 1701 global DEBUG_LEVEL 1799 1702 1800 if DEBUG_LEVEL <= 0:1703 if DEBUG_LEVEL <= 2: 1801 1704 self.gatherLastUpdates() 1802 1803 self.excludes = [ ]1804 1805 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:1806 1807 self.excludes.append( re.compile( ex_metricstr ) )1808 1705 1809 1706 def gatherLastUpdates( self ): … … 1865 1762 # <ATOMIC> 1866 1763 # 1764 self.slot.acquire() 1867 1765 1868 #if host in self.myMetrics: 1869 1870 #if self.myMetrics[ host ].has_key( metric['name'] ): 1871 1872 # if len( self.myMetrics[ host ][ metric['name'] ] ) > 0: 1873 1874 # if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']: 1875 1876 # return 1 1877 #for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1878 1879 # if mymetric['time'] == metric['time']: 1766 if self.myMetrics.has_key( host ): 1767 1768 if self.myMetrics[ host ].has_key( metric['name'] ): 1769 1770 for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1771 1772 if mymetric['time'] == metric['time']: 1880 1773 1881 1774 # Allready have this metric, abort 1882 # return 11883 #else:1884 #if metric['name'] not in self.myMetrics[ host ]:1885 # self.myMetrics[ host ][ metric['name'] ] = deque()1886 #else:1887 #self.myMetrics[ host ] = { }1888 # self.myMetrics[ host ][ metric['name'] ] = deque()1775 self.slot.release() 1776 return 1 1777 else: 1778 self.myMetrics[ host ][ metric['name'] ] = [ ] 1779 else: 1780 self.myMetrics[ host ] = { } 1781 self.myMetrics[ host ][ metric['name'] ] = [ ] 1889 1782 1890 1783 # Push new metric onto stack 1891 1784 # atomic code; only 1 thread at a time may access the stack 1892 #self.slot.acquire()1893 1894 try:1895 host_metrics = self.myMetrics[ host ]1896 except KeyError:1897 self.myMetrics[ host ] = { }1898 host_metrics = self.myMetrics[ host ]1899 1900 try:1901 metric_values = self.myMetrics[ host ][ metric['name'] ]1902 except KeyError:1903 self.myMetrics[ host ][ metric['name'] ] = deque()1904 metric_values = self.myMetrics[ host ][ metric['name'] ]1905 1906 try:1907 if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:1908 return 11909 except (IndexError, KeyError):1910 pass1911 1785 1912 1786 self.myMetrics[ host ][ metric['name'] ].append( metric ) 1913 1787 1914 #self.slot.release()1788 self.slot.release() 1915 1789 # 1916 1790 # </ATOMIC> … … 1983 1857 """ 1984 1858 1985 debug_msg( 5, "Entering storeMetrics() : cluster %s" %self.cluster)1859 debug_msg( 5, "Entering storeMetrics()") 1986 1860 1987 1861 count_values = 0 … … 2004 1878 count_bytes = count_bits / 8 2005 1879 2006 debug_msg( 1, "size of cluster '" + self.cluster + "': " +1880 debug_msg( 5, "size of cluster '" + self.cluster + "': " + 2007 1881 str( len( self.myMetrics.keys() ) ) + " hosts " + 2008 1882 str( count_metrics ) + " metrics " + str( count_values ) + " values " + … … 2013 1887 for metricname, mymetric in mymetrics.items(): 2014 1888 2015 for e in self.excludes:2016 2017 if e.match( metricname ):2018 2019 del self.myMetrics[ hostname ][ metricname ]2020 continue2021 2022 1889 metrics_to_store = [ ] 2023 1890 … … 2027 1894 # <ATOMIC> 2028 1895 # 2029 #self.slot.acquire() 2030 2031 if metricname in self.myMetrics[ hostname ]: 2032 2033 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2034 2035 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2036 2037 try: 2038 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() ) 2039 except IndexError, msg: 2040 2041 # Somehow sometimes myMetrics[ hostname ][ metricname ] 2042 # is still len 0 when the statement is executed. 2043 # Just ignore indexerror's.. 2044 pass 2045 2046 #self.slot.release() 1896 self.slot.acquire() 1897 1898 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1899 1900 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1901 1902 try: 1903 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) ) 1904 except IndexError, msg: 1905 1906 # Somehow sometimes myMetrics[ hostname ][ metricname ] 1907 # is still len 0 when the statement is executed. 1908 # Just ignore indexerror's.. 1909 pass 1910 1911 self.slot.release() 2047 1912 # 2048 1913 # </ATOMIC> … … 2076 1941 self.memLastUpdate( hostname, metricname, metrics_to_store ) 2077 1942 2078 debug_msg( 5, "Leaving storeMetrics() : cluster %s" %self.cluster)1943 debug_msg( 5, "Leaving storeMetrics()") 2079 1944 2080 1945 def makeTimeSerial( self ): … … 2296 2161 2297 2162 myJobProcessor = JobXMLProcessor( myXMLSource, myDataStore ) 2298 2299 myGangliaProcessors= [ ] 2300 2301 for archive_cluster in ARCHIVE_DATASOURCES: 2302 2303 myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) ) 2304 2305 ganglia_xml_threads = [ ] 2163 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 2306 2164 2307 2165 try: 2308 2166 job_xml_thread = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' ) 2309 2310 t = 0 2311 2312 for ganglia_processor in myGangliaProcessors: 2313 2314 ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) ) 2315 2316 t = t + 1 2167 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 2317 2168 2318 2169 job_xml_thread.start() 2319 2320 for t in ganglia_xml_threads: 2321 2322 t.start() 2170 ganglia_xml_thread.start() 2323 2171 2324 2172 except thread.error, msg: -
branches/1.2/jobarchived/jobarchived.py
r930 r949 23 23 24 24 import getopt, syslog, ConfigParser, sys 25 from collections import deque 26 import time 27 from pprint import pprint 28 #import yappi 29 30 #yappi.start() 31 32 try: 33 from resource import getrusage, RUSAGE_SELF 34 except ImportError: 35 RUSAGE_SELF = 0 36 def getrusage(who=0): 37 return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0 38 39 p_stats = None 40 p_start_time = None 41 42 def profiler(frame, event, arg): 43 if event not in ('call','return'): return profiler 44 #### gather stats #### 45 rusage = getrusage(RUSAGE_SELF) 46 t_cpu = rusage[0] + rusage[1] # user time + system time 47 code = frame.f_code 48 fun = (code.co_name, code.co_filename, code.co_firstlineno) 49 #### get stack with functions entry stats #### 50 ct = threading.currentThread() 51 try: 52 p_stack = ct.p_stack 53 except AttributeError: 54 ct.p_stack = deque() 55 p_stack = ct.p_stack 56 #### handle call and return #### 57 if event == 'call': 58 p_stack.append((time.time(), t_cpu, fun)) 59 elif event == 'return': 60 try: 61 t,t_cpu_prev,f = p_stack.pop() 62 assert f == fun 63 except IndexError: # TODO investigate 64 t,t_cpu_prev,f = p_start_time, 0.0, None 65 call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0)) 66 p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev) 67 return profiler 68 69 70 def profile_on(): 71 global p_stats, p_start_time 72 p_stats = {} 73 p_start_time = time.time() 74 threading.setprofile(profiler) 75 sys.setprofile(profiler) 76 77 debug_msg( 1, 'profile_on(): profiling..' ) 78 79 def profile_off(): 80 threading.setprofile(None) 81 sys.setprofile(None) 82 debug_msg( 1, 'profile_on(): profiling ended..' ) 83 84 def get_profile_stats(): 85 """ 86 returns dict[function_tuple] -> stats_tuple 87 where 88 function_tuple = (function_name, filename, lineno) 89 stats_tuple = (call_cnt, real_time, cpu_time) 90 """ 91 debug_msg( 1, 'get_profile_stats(): dumping stats..' ) 92 return p_stats 25 93 26 94 VERSION='__VERSION__' … … 920 988 for j in timedout_jobs: 921 989 922 del self.jobAttrs[ j ] 923 del self.jobAttrsSaved[ j ] 990 try: 991 del self.jobAttrs[ j ] 992 del self.jobAttrsSaved[ j ] 993 except KeyError: 994 pass 924 995 925 996 debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) ) … … 932 1003 """ 933 1004 934 global ARCHIVE_DATASOURCES935 936 1005 jobinfo = { } 937 1006 … … 973 1042 974 1043 self.jobs_processed.append( job_id ) 1044 975 1045 976 1046 def endDocument( self ): … … 1132 1202 """Parse Ganglia's XML""" 1133 1203 1134 def __init__( self, config, datastore ):1204 def __init__( self, config, datastore, cluster ): 1135 1205 """Setup initial variables and gather info on existing rrd archive""" 1136 1206 1137 1207 self.config = config 1138 self.cluster s = { }1208 self.clusterName = cluster 1139 1209 self.ds = datastore 1140 1141 debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' ) 1142 self.gatherClusters() 1143 debug_msg( 1, 'Housekeeping: RRD check complete.' ) 1144 1145 def gatherClusters( self ): 1210 self.rrd_handler = None 1211 self.cluster_start = False 1212 1213 debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName ) 1214 self.gatherCluster() 1215 debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName ) 1216 1217 def gatherCluster( self ): 1146 1218 """Find all existing clusters in archive dir""" 1147 1219 … … 1156 1228 dirlist = os.listdir( archive_dir ) 1157 1229 1158 for cfgcluster in ARCHIVE_DATASOURCES: 1159 1160 if cfgcluster not in dirlist: 1161 1162 # Autocreate a directory for this cluster 1163 # assume it is new 1164 # 1165 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1166 1167 os.mkdir( cluster_dir ) 1168 1169 dirlist.append( cfgcluster ) 1170 1171 for item in dirlist: 1172 1173 clustername = item 1174 1175 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES: 1176 1177 self.clusters[ clustername ] = RRDHandler( self.config, clustername ) 1178 1179 debug_msg( 9, 'Found cluster dir: %s' %( clustername ) ) 1180 1181 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" cluster dirs" ) 1230 if self.clusterName not in dirlist: 1231 1232 # Autocreate a directory for this cluster 1233 # assume it is new 1234 # 1235 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1236 1237 os.mkdir( cluster_dir ) 1238 1239 dirlist.append( cfgcluster ) 1240 1241 for d in dirlist: 1242 1243 if not self.rrd_handler and d == self.clusterName: 1244 1245 self.rrd_handler = RRDHandler( self.config, d ) 1246 1247 debug_msg( 9, 'Found cluster dir: %s' %( d ) ) 1182 1248 1183 1249 def startElement( self, name, attrs ): 1184 1250 """Memorize appropriate data from xml start tags""" 1185 1251 1186 global ARCHIVE_DATASOURCES1187 1188 1252 if name == 'GANGLIA_XML': 1189 1253 1190 self.XMLSource = str( attrs.get( 'SOURCE', "" ))1191 self.gangliaVersion = str( attrs.get( 'VERSION', "" ))1254 self.XMLSource = attrs.get( 'SOURCE', "" ) 1255 self.gangliaVersion = attrs.get( 'VERSION', "" ) 1192 1256 1193 1257 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) 1194 1258 1195 elif name == 'GRID': 1196 1197 self.gridName = str( attrs.get( 'NAME', "" ) ) 1198 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1259 return 0 1260 1261 if name == 'GRID': 1262 1263 self.gridName = attrs.get( 'NAME', "" ) 1264 self.time = attrs.get( 'LOCALTIME', "" ) 1199 1265 1200 1266 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) 1201 1267 1202 elif name == 'CLUSTER': 1203 1204 self.clusterName = str( attrs.get( 'NAME', "" ) ) 1205 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1206 1207 if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES: 1208 1209 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName ) 1210 1211 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1212 1213 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 1214 1215 self.hostName = str( attrs.get( 'NAME', "" ) ) 1216 self.hostIp = str( attrs.get( 'IP', "" ) ) 1217 self.hostReported = str( attrs.get( 'REPORTED', "" ) ) 1268 return 0 1269 1270 if name == 'CLUSTER': 1271 1272 xmlClusterName = attrs.get( 'NAME', "" ) 1273 self.time = attrs.get( 'LOCALTIME', "" ) 1274 1275 if self.clusterName == xmlClusterName: 1276 1277 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1278 1279 self.cluster_start = True 1280 1281 if not self.rrd_handler: 1282 1283 self.rrd_handler = RRDHandler( self.config, self.clusterName ) 1284 else: 1285 self.cluster_start = False 1286 1287 return 0 1288 1289 if name == 'HOST' and self.cluster_start: 1290 1291 self.hostName = attrs.get( 'NAME', "" ) 1292 self.hostIp = attrs.get( 'IP', "" ) 1293 self.hostReported = attrs.get( 'REPORTED', "" ) 1218 1294 1219 1295 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) 1220 1296 1221 elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES: 1222 1223 type = str( attrs.get( 'TYPE', "" ) ) 1297 return 0 1298 1299 if name == 'METRIC' and self.cluster_start: 1300 1301 #type = attrs.get( 'TYPE', "" ) 1302 #orig_name = attrs.get( 'NAME', "" ) 1224 1303 1225 exclude_metric = False 1226 1227 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1228 1229 orig_name = str( attrs.get( 'NAME', "" ) ) 1230 1231 if string.lower( orig_name ) == string.lower( ex_metricstr ): 1232 1233 exclude_metric = True 1234 1235 elif re.match( ex_metricstr, orig_name ): 1236 1237 exclude_metric = True 1238 1239 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 1240 1241 myMetric = { } 1242 myMetric['name'] = str( attrs.get( 'NAME', "" ) ) 1243 myMetric['val'] = str( attrs.get( 'VAL', "" ) ) 1244 myMetric['time'] = self.hostReported 1245 1246 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) 1247 1248 debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1249 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1304 if attrs.get( 'TYPE', "" ) != 'string': 1305 1306 #myMetric = { } 1307 #myMetric['name'] = attrs.get( 'NAME', "" ) 1308 #myMetric['val'] = attrs.get( 'VAL', "" ) 1309 #myMetric['time'] = self.hostReported 1310 1311 self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL', "" ), 'time': self.hostReported } ) 1312 1313 #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) ) 1314 #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1315 1316 1317 def endElement( self, name ): 1318 1319 if name == 'CLUSTER' and self.cluster_start: 1320 1321 self.cluster_start = False 1322 debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) ) 1250 1323 1251 1324 def storeMetrics( self ): 1252 1325 """Store metrics of each cluster rrd handler""" 1253 1326 1254 for clustername, rrdh in self.clusters.items(): 1255 1256 ret = rrdh.storeMetrics() 1257 1258 if ret: 1259 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1260 return 1 1327 ret = self.rrd_handler.storeMetrics() 1328 1329 if ret: 1330 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1331 return 1 1261 1332 1262 1333 return 0 … … 1329 1400 """Setup connection to XML source""" 1330 1401 1402 if self.update_now: 1403 return 0 1404 1331 1405 self.update_now = True 1332 1406 1333 self.slot.acquire()1407 #self.slot.acquire() 1334 1408 1335 1409 self.data = None 1410 1411 debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." ) 1336 1412 1337 1413 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 1369 1445 1370 1446 my_fp = self.s.makefile( 'r' ) 1371 my_data = my_fp.readlines() 1372 my_data = string.join( my_data, '' ) 1373 1374 self.data = my_data 1447 #my_data = my_fp.readlines() 1448 #my_data = string.join( my_data, '' ) 1449 1450 #self.data = my_data 1451 self.data = my_fp.read() 1375 1452 1376 1453 self.LAST_UPDATE = time.time() 1377 1454 1378 self.slot.release() 1455 self.disconnect() 1456 1457 #self.slot.release() 1458 1459 debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." ) 1379 1460 1380 1461 self.update_now = False … … 1396 1477 """Reconnect""" 1397 1478 1398 while self.update_now: 1479 if self.update_now: 1480 return 0 1481 1482 #while self.update_now: 1399 1483 1400 1484 # Must be another update in progress: 1401 1485 # Wait until the update is complete 1402 1486 # 1403 time.sleep( 1 )1487 # time.sleep( 1 ) 1404 1488 1405 1489 if self.s: 1406 1490 self.disconnect() 1407 1491 1408 self.retrieveData() 1492 cur_time = time.time() 1493 1494 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1495 1496 if not self.update_now: 1497 1498 self.retrieveData() 1409 1499 1410 1500 def getData( self ): … … 1420 1510 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1421 1511 1422 self.reGetData() 1512 if not self.update_now: 1513 1514 self.reGetData() 1423 1515 1424 1516 while self.update_now: … … 1450 1542 """Main class for processing XML and acting with it""" 1451 1543 1452 def __init__( self, XMLSource, DataStore ):1544 def __init__( self, XMLSource, DataStore, cluster ): 1453 1545 """Setup initial XML connection and handlers""" 1454 1546 … … 1456 1548 self.myXMLSource = XMLSource 1457 1549 self.ds = DataStore 1458 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )1459 1550 self.myXMLError = XMLErrorHandler() 1551 self.clusterName = cluster 1552 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName ) 1553 1460 1554 1461 1555 def run( self ): … … 1476 1570 xml_thread.start() 1477 1571 except thread.error, msg: 1478 debug_msg( 0, 'ERROR: Unable to start xml_thread !: '+str(msg))1572 debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) ) 1479 1573 #return 1 1480 1574 … … 1488 1582 store_thread.start() 1489 1583 except thread.error, msg: 1490 debug_msg( 0, 'ERROR: Unable to start store_thread !: '+str(msg))1584 debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) ) 1491 1585 #return 1 1492 1586 … … 1513 1607 return 1 1514 1608 1515 debug_msg( 1, 'ganglia_store_thread(): started .')1516 1517 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss) ' %STORE_INTERVAL)1609 debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName ) 1610 1611 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) ) 1518 1612 time.sleep( STORE_INTERVAL ) 1519 debug_msg( 1, 'ganglia_store_thread(): Done sleeping .')1613 debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName ) 1520 1614 1521 1615 if store_metric_thread.isAlive(): 1522 1616 1523 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..')1617 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName ) 1524 1618 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1525 1619 1526 1620 if store_metric_thread.isAlive(): 1527 1621 1528 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?')1622 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName ) 1529 1623 else: 1530 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished ')1531 1532 debug_msg( 1, 'ganglia_store_thread(): finished .')1624 debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName ) 1625 1626 debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName ) 1533 1627 1534 1628 return 0 … … 1537 1631 """Actual metric storing thread""" 1538 1632 1539 debug_msg( 1, 'ganglia_store_metric_thread(): started .')1540 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. ')1633 debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName ) 1634 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName ) 1541 1635 1542 1636 ret = self.myXMLHandler.storeMetrics() 1543 1637 if ret > 0: 1544 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! ' %str(ret) )1545 1546 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing .')1547 debug_msg( 1, 'ganglia_store_metric_thread(): finished .')1638 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) ) 1639 1640 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName ) 1641 debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName ) 1548 1642 1549 1643 return 0 … … 1556 1650 parsethread.start() 1557 1651 except thread.error, msg: 1558 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! : ' + str(msg) )1652 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) ) 1559 1653 return 1 1560 1654 1561 debug_msg( 1, 'ganglia_xml_thread(): started .')1562 1563 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss) ' %self.config.getLowestInterval() )1655 debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName ) 1656 1657 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) ) 1564 1658 time.sleep( float( self.config.getLowestInterval() ) ) 1565 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping .')1659 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName ) 1566 1660 1567 1661 if parsethread.isAlive(): 1568 1662 1569 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT)1663 debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) ) 1570 1664 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1571 1665 1572 1666 if parsethread.isAlive(): 1573 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?')1667 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName ) 1574 1668 else: 1575 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished')1576 1577 debug_msg( 1, 'ganglia_xml_thread(): finished .')1669 debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName ) 1670 1671 debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName ) 1578 1672 1579 1673 return 0 … … 1582 1676 """Actual parsing thread""" 1583 1677 1584 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 1585 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' ) 1678 1679 debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName ) 1680 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName ) 1586 1681 1587 1682 my_data = self.myXMLSource.getData() 1588 1683 1589 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving : data size %d' %len(my_data) )1684 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) ) 1590 1685 1591 1686 if my_data: 1592 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 1687 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName ) 1688 1593 1689 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 1594 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 1595 1596 debug_msg( 1, 'ganglia_parse_thread(): finished.' ) 1690 1691 debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName ) 1692 #yappi.print_stats() 1693 1694 debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName ) 1597 1695 1598 1696 return 0 … … 1679 1777 """Class for handling RRD activity""" 1680 1778 1681 1682 1779 def __init__( self, config, cluster ): 1683 1780 """Setup initial variables""" 1684 1781 1685 global MODRRDTOOL 1782 global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS 1686 1783 1687 1784 self.block = 0 … … 1701 1798 global DEBUG_LEVEL 1702 1799 1703 if DEBUG_LEVEL <= 2:1800 if DEBUG_LEVEL <= 0: 1704 1801 self.gatherLastUpdates() 1802 1803 self.excludes = [ ] 1804 1805 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1806 1807 self.excludes.append( re.compile( ex_metricstr ) ) 1705 1808 1706 1809 def gatherLastUpdates( self ): … … 1762 1865 # <ATOMIC> 1763 1866 # 1764 self.slot.acquire()1765 1867 1766 if self.myMetrics.has_key( host ): 1767 1768 if self.myMetrics[ host ].has_key( metric['name'] ): 1769 1770 for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1771 1772 if mymetric['time'] == metric['time']: 1868 #if host in self.myMetrics: 1869 1870 #if self.myMetrics[ host ].has_key( metric['name'] ): 1871 1872 # if len( self.myMetrics[ host ][ metric['name'] ] ) > 0: 1873 1874 # if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']: 1875 1876 # return 1 1877 #for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1878 1879 # if mymetric['time'] == metric['time']: 1773 1880 1774 1881 # Allready have this metric, abort 1775 self.slot.release()1776 return 11777 else:1778 self.myMetrics[ host ][ metric['name'] ] = [ ]1779 else:1780 self.myMetrics[ host ] = { }1781 self.myMetrics[ host ][ metric['name'] ] = [ ]1882 # return 1 1883 #else: 1884 #if metric['name'] not in self.myMetrics[ host ]: 1885 # self.myMetrics[ host ][ metric['name'] ] = deque() 1886 #else: 1887 # self.myMetrics[ host ] = { } 1888 # self.myMetrics[ host ][ metric['name'] ] = deque() 1782 1889 1783 1890 # Push new metric onto stack 1784 1891 # atomic code; only 1 thread at a time may access the stack 1892 #self.slot.acquire() 1893 1894 try: 1895 host_metrics = self.myMetrics[ host ] 1896 except KeyError: 1897 self.myMetrics[ host ] = { } 1898 host_metrics = self.myMetrics[ host ] 1899 1900 try: 1901 metric_values = self.myMetrics[ host ][ metric['name'] ] 1902 except KeyError: 1903 self.myMetrics[ host ][ metric['name'] ] = deque() 1904 metric_values = self.myMetrics[ host ][ metric['name'] ] 1905 1906 try: 1907 if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']: 1908 return 1 1909 except (IndexError, KeyError): 1910 pass 1785 1911 1786 1912 self.myMetrics[ host ][ metric['name'] ].append( metric ) 1787 1913 1788 self.slot.release()1914 #self.slot.release() 1789 1915 # 1790 1916 # </ATOMIC> … … 1857 1983 """ 1858 1984 1859 debug_msg( 5, "Entering storeMetrics() ")1985 debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster ) 1860 1986 1861 1987 count_values = 0 … … 1878 2004 count_bytes = count_bits / 8 1879 2005 1880 debug_msg( 5, "size of cluster '" + self.cluster + "': " +2006 debug_msg( 1, "size of cluster '" + self.cluster + "': " + 1881 2007 str( len( self.myMetrics.keys() ) ) + " hosts " + 1882 2008 str( count_metrics ) + " metrics " + str( count_values ) + " values " + … … 1887 2013 for metricname, mymetric in mymetrics.items(): 1888 2014 2015 for e in self.excludes: 2016 2017 if e.match( metricname ): 2018 2019 del self.myMetrics[ hostname ][ metricname ] 2020 continue 2021 1889 2022 metrics_to_store = [ ] 1890 2023 … … 1894 2027 # <ATOMIC> 1895 2028 # 1896 self.slot.acquire() 1897 1898 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1899 1900 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1901 1902 try: 1903 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) ) 1904 except IndexError, msg: 1905 1906 # Somehow sometimes myMetrics[ hostname ][ metricname ] 1907 # is still len 0 when the statement is executed. 1908 # Just ignore indexerror's.. 1909 pass 1910 1911 self.slot.release() 2029 #self.slot.acquire() 2030 2031 if metricname in self.myMetrics[ hostname ]: 2032 2033 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2034 2035 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 2036 2037 try: 2038 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() ) 2039 except IndexError, msg: 2040 2041 # Somehow sometimes myMetrics[ hostname ][ metricname ] 2042 # is still len 0 when the statement is executed. 2043 # Just ignore indexerror's.. 2044 pass 2045 2046 #self.slot.release() 1912 2047 # 1913 2048 # </ATOMIC> … … 1941 2076 self.memLastUpdate( hostname, metricname, metrics_to_store ) 1942 2077 1943 debug_msg( 5, "Leaving storeMetrics() ")2078 debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster ) 1944 2079 1945 2080 def makeTimeSerial( self ): … … 2161 2296 2162 2297 myJobProcessor = JobXMLProcessor( myXMLSource, myDataStore ) 2163 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 2298 2299 myGangliaProcessors= [ ] 2300 2301 for archive_cluster in ARCHIVE_DATASOURCES: 2302 2303 myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) ) 2304 2305 ganglia_xml_threads = [ ] 2164 2306 2165 2307 try: 2166 2308 job_xml_thread = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' ) 2167 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 2309 2310 t = 0 2311 2312 for ganglia_processor in myGangliaProcessors: 2313 2314 ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) ) 2315 2316 t = t + 1 2168 2317 2169 2318 job_xml_thread.start() 2170 ganglia_xml_thread.start() 2319 2320 for t in ganglia_xml_threads: 2321 2322 t.start() 2171 2323 2172 2324 except thread.error, msg:
Note: See TracChangeset
for help on using the changeset viewer.