Changeset 292 for trunk/jobarchived/jobarchived.py
- Timestamp:
- 03/30/07 09:42:06 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobarchived/jobarchived.py
r289 r292 34 34 def processArgs( args ): 35 35 36 SHORT_L 37 LONG_L 36 SHORT_L = 'c:' 37 LONG_L = 'config=' 38 38 39 39 config_filename = None … … 106 106 global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL 107 107 108 ARCHIVE_PATH 109 110 ARCHIVE_HOURS_PER_RRD 111 112 DEBUG_LEVEL 113 114 USE_SYSLOG 115 116 SYSLOG_LEVEL 108 ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' ) 109 110 ARCHIVE_HOURS_PER_RRD = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' ) 111 112 DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) 113 114 USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' ) 115 116 SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' ) 117 117 118 118 try: 119 119 120 SYSLOG_FACILITY 120 SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) ) 121 121 122 122 except AttributeError, detail: … … 125 125 sys.exit( 1 ) 126 126 127 GMETAD_CONF 128 129 ARCHIVE_XMLSOURCE 130 131 ARCHIVE_DATASOURCES 132 133 ARCHIVE_EXCLUDE_METRICS 134 135 JOB_SQL_DBASE 136 137 DAEMONIZE 138 139 RRDTOOL 127 GMETAD_CONF = cfg.get( 'DEFAULT', 'GMETAD_CONF' ) 128 129 ARCHIVE_XMLSOURCE = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' ) 130 131 ARCHIVE_DATASOURCES = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) ) 132 133 ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) ) 134 135 JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' ) 136 137 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) 138 139 RRDTOOL = cfg.get( 'DEFAULT', 'RRDTOOL' ) 140 140 141 141 return True … … 264 264 def mutateJob( self, action, job_id, jobattrs ): 265 265 266 job_values 267 268 insert_col_str 269 insert_val_str 270 update_str 266 job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ] 267 268 insert_col_str = 'job_id' 269 insert_val_str = "'%s'" %job_id 270 update_str = None 271 271 272 272 debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id)) … … 340 340 for node in hostnames: 341 341 342 node 343 id 342 node = '%s.%s' %( node, domain ) 343 id = self.getNodeId( node ) 344 344 345 345 if not id: … … 395 395 debug_msg( 8, self.binary + ' info "' + filename + '"' ) 396 396 397 for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines(): 397 my_pipe = os.popen( self.binary + ' info "' + filename + '"' ) 398 399 for line in my_pipe.readlines(): 398 400 399 401 if line.find( 'last_update') != -1: 400 402 401 403 last_update = line.split( ' = ' )[1] 404 405 if my_pipe: 406 407 my_pipe.close() 402 408 403 409 if last_update: … … 425 431 debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 426 432 427 cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 428 lines = cmd.readlines() 433 cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 434 lines = cmd.readlines() 435 429 436 cmd.close() 430 437 … … 489 496 def __init__( self ): 490 497 491 self.ds 492 self.jobs_processed 493 self.jobs_to_store 498 self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 499 self.jobs_processed = [ ] 500 self.jobs_to_store = [ ] 494 501 495 502 def startDocument( self ): 496 503 497 self.heartbeat 504 self.heartbeat = 0 498 505 499 506 def startElement( self, name, attrs ): … … 519 526 elif metricname.find( 'MONARCH-JOB' ) != -1: 520 527 521 job_id 522 val 528 job_id = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0] 529 val = attrs.get( 'VAL', "" ) 523 530 524 531 if not job_id in self.jobs_processed: 532 525 533 self.jobs_processed.append( job_id ) 526 534 … … 528 536 529 537 if self.jobAttrs.has_key( job_id ): 538 530 539 check_change = 1 531 540 … … 536 545 if len( myval.split( '=' ) ) > 1: 537 546 538 valname 539 value 547 valname = myval.split( '=' )[0] 548 value = myval.split( '=' )[1] 540 549 541 550 if valname == 'nodes': … … 546 555 if check_change: 547 556 if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]: 548 self.jobAttrs[ job_id ]['stop_timestamp'] 549 self.jobAttrs[ job_id ] 557 self.jobAttrs[ job_id ]['stop_timestamp'] = '' 558 self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo ) 550 559 if not job_id in self.jobs_to_store: 551 560 self.jobs_to_store.append( job_id ) … … 594 603 debug_msg( 1, 'torque_xml_thread(): Done storing.' ) 595 604 596 self.jobs_processed 597 self.jobs_to_store 605 self.jobs_processed = [ ] 606 self.jobs_to_store = [ ] 598 607 599 608 def setJobAttrs( self, old, new ): … … 642 651 """Setup initial variables and gather info on existing rrd archive""" 643 652 644 self.config 645 self.clusters 653 self.config = config 654 self.clusters = { } 646 655 debug_msg( 1, 'Checking existing toga rrd archive..' ) 647 self.gatherClusters()656 #self.gatherClusters() 648 657 debug_msg( 1, 'Check done.' ) 649 658 … … 651 660 """Find all existing clusters in archive dir""" 652 661 653 archive_dir 654 655 hosts 662 archive_dir = check_dir(ARCHIVE_PATH) 663 664 hosts = [ ] 656 665 657 666 if os.path.exists( archive_dir ): 658 667 659 dirlist 668 dirlist = os.listdir( archive_dir ) 660 669 661 670 for item in dirlist: … … 672 681 if name == 'GANGLIA_XML': 673 682 674 self.XMLSource 675 self.gangliaVersion 683 self.XMLSource = attrs.get( 'SOURCE', "" ) 684 self.gangliaVersion = attrs.get( 'VERSION', "" ) 676 685 677 686 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) … … 679 688 elif name == 'GRID': 680 689 681 self.gridName 682 self.time 690 self.gridName = attrs.get( 'NAME', "" ) 691 self.time = attrs.get( 'LOCALTIME', "" ) 683 692 684 693 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) … … 686 695 elif name == 'CLUSTER': 687 696 688 self.clusterName 689 self.time 697 self.clusterName = attrs.get( 'NAME', "" ) 698 self.time = attrs.get( 'LOCALTIME', "" ) 690 699 691 700 if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES: … … 697 706 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 698 707 699 self.hostName 700 self.hostIp 701 self.hostReported 708 self.hostName = attrs.get( 'NAME', "" ) 709 self.hostIp = attrs.get( 'IP', "" ) 710 self.hostReported = attrs.get( 'REPORTED', "" ) 702 711 703 712 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) … … 723 732 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 724 733 725 myMetric 726 myMetric['name'] 727 myMetric['val'] 728 myMetric['time'] 734 myMetric = { } 735 myMetric['name'] = attrs.get( 'NAME', "" ) 736 myMetric['val'] = attrs.get( 'VAL', "" ) 737 myMetric['time'] = self.hostReported 729 738 730 739 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) … … 1070 1079 if line.find( 'data_source' ) != -1 and line[0] != '#': 1071 1080 1072 source 1073 source['name'] 1074 source_words 1081 source = { } 1082 source['name'] = line.split( '"' )[1] 1083 source_words = line.split( '"' )[2].split( ' ' ) 1075 1084 1076 1085 for word in source_words: … … 1135 1144 """Setup initial variables""" 1136 1145 1137 self.block = 0 1138 self.cluster = cluster 1139 self.config = config 1140 self.slot = threading.Lock() 1141 self.rrdm = RRDMutator( RRDTOOL ) 1146 self.block = 0 1147 self.cluster = cluster 1148 self.config = config 1149 self.slot = threading.Lock() 1150 self.rrdm = RRDMutator( RRDTOOL ) 1151 1142 1152 self.gatherLastUpdates() 1143 1153 … … 1159 1169 for host in hosts: 1160 1170 1161 host_dir 1162 dirlist 1171 host_dir = cluster_dir + '/' + host 1172 dirlist = os.listdir( host_dir ) 1163 1173 1164 1174 for dir in dirlist: … … 1171 1181 1172 1182 last_serial = self.getLastRrdTimeSerial( host ) 1183 1173 1184 if last_serial: 1174 1185 1175 1186 metric_dir = cluster_dir + '/' + host + '/' + last_serial 1187 1176 1188 if os.path.exists( metric_dir ): 1177 1189 … … 1214 1226 self.myMetrics[ host ][ metric['name'] ] = [ ] 1215 1227 else: 1216 self.myMetrics[ host ] 1217 self.myMetrics[ host ][ metric['name'] ] 1228 self.myMetrics[ host ] = { } 1229 self.myMetrics[ host ][ metric['name'] ] = [ ] 1218 1230 1219 1231 # Push new metric onto stack … … 1232 1244 """ 1233 1245 1234 update_list 1235 metric 1246 update_list = [ ] 1247 metric = None 1236 1248 1237 1249 while len( metriclist ) > 0: … … 1240 1252 1241 1253 if self.checkStoreMetric( host, metric ): 1254 1242 1255 update_list.append( '%s:%s' %( metric['time'], metric['val'] ) ) 1243 1256 … … 1359 1372 """Make a RRD location/path and filename""" 1360 1373 1361 rrd_dir = '%s/%s/%s/%s'%( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )1362 rrd_file = '%s/%s.rrd'%( rrd_dir, metricname )1374 rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial ) 1375 rrd_file = '%s/%s.rrd' %( rrd_dir, metricname ) 1363 1376 1364 1377 return rrd_dir, rrd_file … … 1414 1427 if metric['name'] == metricname: 1415 1428 1416 period 1417 1418 archive_secs 1429 period = self.determinePeriod( host, metric['time'] ) 1430 1431 archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60) 1419 1432 1420 1433 if (int( metric['time'] ) - int( period ) ) > archive_secs: … … 1464 1477 if not os.path.exists( rrd_file ): 1465 1478 1466 interval 1467 heartbeat 1468 1469 params 1479 interval = self.config.getInterval( self.cluster ) 1480 heartbeat = 8 * int( interval ) 1481 1482 params = [ ] 1470 1483 1471 1484 params.append( '--step' ) … … 1490 1503 debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) 1491 1504 1492 rrd_dir, rrd_file 1493 1494 update_list 1505 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 1506 1507 update_list = self.makeUpdateList( host, metriclist ) 1495 1508 1496 1509 if len( update_list ) > 0: … … 1551 1564 1552 1565 try: 1553 torque_xml_thread 1554 ganglia_xml_thread 1566 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' ) 1567 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 1555 1568 1556 1569 torque_xml_thread.start()
Note: See TracChangeset
for help on using the changeset viewer.