Changeset 774 for branches/0.4/jobarchived/jobarchived.py
- Timestamp:
- 03/29/13 14:07:28 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/0.4/jobarchived/jobarchived.py
r773 r774 134 134 global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE 135 135 global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS 136 global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL 136 global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL, JOB_SQL_PASSWORD, JOB_SQL_USER 137 137 138 138 ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' ) … … 180 180 181 181 JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' ) 182 JOB_SQL_USER = cfg.get( 'DEFAULT', 'JOB_SQL_USER' ) 183 JOB_SQL_PASSWORD = cfg.get( 'DEFAULT', 'JOB_SQL_PASSWORD' ) 182 184 183 185 JOB_TIMEOUT = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' ) … … 328 330 try: 329 331 c.execute(q_str) 330 result = c.oidValue331 332 332 333 except psycopg2.Error, details: … … 336 337 337 338 c.close() 338 return result339 return True 339 340 340 341 def Commit(self): … … 348 349 def __init__( self, hostname, database ): 349 350 351 global JOB_SQL_USER, JOB_SQL_PASSWORD 352 350 353 self.db_vars = InitVars(DataBaseName=database, 351 User= 'root',354 User=JOB_SQL_USER, 352 355 Host=hostname, 353 Password= '',356 Password=JOB_SQL_PASSWORD, 354 357 Dictionary='true') 355 358 … … 508 511 elif action == 'update': 509 512 510 self.setDatabase( "UPDATE jobs SET %s WHERE job_id= %s" %(update_str, job_id) )513 self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) ) 511 514 512 515 if len( ids ) > 0: … … 540 543 def addJobNode( self, jobid, nodeid ): 541 544 542 self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )545 self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( '%s',%s )" %(jobid, nodeid) ) 543 546 544 547 def storeJobInfo( self, jobid, jobattrs ): … … 790 793 791 794 my_data = self.myXMLSource.getData() 795 #print my_data 796 #print "size my data: %d" %len( my_data ) 792 797 793 798 debug_msg( 1, 'torque_xml_thread(): Done retrieving.' ) … … 799 804 800 805 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 806 else: 807 debug_msg( 1, 'torque_xml_thread(): Got no data.' ) 808 801 809 802 810 debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) ) … … 814 822 self.jobs_processed = [ ] 815 823 self.jobs_to_store = [ ] 824 debug_msg( 1, "XML: Handler created" ) 816 825 817 826 def startDocument( self ): … … 819 828 self.heartbeat = 0 820 829 self.elementct = 0 830 debug_msg( 1, "XML: Start document" ) 821 831 822 832 def startElement( self, name, attrs ): … … 844 854 elif metricname.find( 'zplugin_monarch_job' ) != -1: 845 855 846 job_id = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[ 0]856 job_id = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1] 847 857 val = str( attrs.get( 'VAL', "" ) ) 848 858 … … 1331 1341 # Store metrics somewhere between every 360 and 640 seconds 1332 1342 # 1333 if DEBUG_LEVEL > 2:1334 #STORE_INTERVAL = 601335 STORE_INTERVAL = random.randint( 360, 640 )1343 if DEBUG_LEVEL >= 1: 1344 STORE_INTERVAL = 60 1345 #STORE_INTERVAL = random.randint( 360, 640 ) 1336 1346 else: 1337 1347 STORE_INTERVAL = random.randint( 360, 640 ) … … 1352 1362 if store_metric_thread.isAlive(): 1353 1363 1354 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )1364 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to terminate..' ) 1355 1365 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1356 debug_msg( 1, 'ganglia_store_thread(): Done waiting .' )1366 debug_msg( 1, 'ganglia_store_thread(): Done waiting: terminated storemetricthread()' ) 1357 1367 1358 1368 debug_msg( 1, 'ganglia_store_thread(): finished.' ) … … 1376 1386 """Process XML""" 1377 1387 1388 debug_msg( 5, "Entering processXML()") 1389 1378 1390 try: 1379 1391 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) … … 1391 1403 if parsethread.isAlive(): 1392 1404 1393 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )1405 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to terminate..' %PARSE_TIMEOUT ) 1394 1406 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1395 debug_msg( 1, 'ganglia_xml_thread(): Done waiting. ' )1407 debug_msg( 1, 'ganglia_xml_thread(): Done waiting. parsethread() terminated' ) 1396 1408 1397 1409 debug_msg( 1, 'ganglia_xml_thread(): finished.' ) 1410 1411 debug_msg( 5, "Leaving processXML()") 1398 1412 1399 1413 return 0 … … 1406 1420 1407 1421 my_data = self.myXMLSource.getData() 1422 debug_msg( 1, 'ganglia_parse_thread(): data size %d.' %len(my_data) ) 1408 1423 1409 1424 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
Note: See TracChangeset
for help on using the changeset viewer.