Changeset 774


Ignore:
Timestamp:
03/29/13 14:07:28 (11 years ago)
Author:
ramonb
Message:
  • job_id is now varchar and not int anymore (due to array ids)
  • fixed monarch job detection: jobid is now last field and sequence number is first
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/0.4/jobarchived/jobarchived.py

    r773 r774  
    134134    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
    135135    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
    136     global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
     136    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL, JOB_SQL_PASSWORD, JOB_SQL_USER
    137137
    138138    ARCHIVE_PATH        = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
     
    180180
    181181    JOB_SQL_DBASE        = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
     182    JOB_SQL_USER        = cfg.get( 'DEFAULT', 'JOB_SQL_USER' )
     183    JOB_SQL_PASSWORD        = cfg.get( 'DEFAULT', 'JOB_SQL_PASSWORD' )
    182184
    183185    JOB_TIMEOUT        = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
     
    328330        try:
    329331            c.execute(q_str)
    330             result = c.oidValue
    331332
    332333        except psycopg2.Error, details:
     
    336337
    337338        c.close()
    338         return result
     339        return True
    339340
    340341    def Commit(self):
     
    348349    def __init__( self, hostname, database ):
    349350
     351        global JOB_SQL_USER, JOB_SQL_PASSWORD
     352
    350353        self.db_vars = InitVars(DataBaseName=database,
    351                 User='root',
     354                User=JOB_SQL_USER,
    352355                Host=hostname,
    353                 Password='',
     356                Password=JOB_SQL_PASSWORD,
    354357                Dictionary='true')
    355358
     
    508511        elif action == 'update':
    509512
    510             self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
     513            self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
    511514
    512515        if len( ids ) > 0:
     
    540543    def addJobNode( self, jobid, nodeid ):
    541544
    542         self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
     545        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( '%s',%s )" %(jobid, nodeid) )
    543546
    544547    def storeJobInfo( self, jobid, jobattrs ):
     
    790793
    791794            my_data    = self.myXMLSource.getData()
     795            #print my_data
     796            #print "size my data: %d" %len( my_data )
    792797
    793798            debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
     
    799804
    800805                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
     806            else:
     807                debug_msg( 1, 'torque_xml_thread(): Got no data.' )
     808
    801809               
    802810            debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
     
    814822        self.jobs_processed    = [ ]
    815823        self.jobs_to_store    = [ ]
     824        debug_msg( 1, "XML: Handler created" )
    816825
    817826    def startDocument( self ):
     
    819828        self.heartbeat    = 0
    820829        self.elementct    = 0
     830        debug_msg( 1, "XML: Start document" )
    821831
    822832    def startElement( self, name, attrs ):
     
    844854            elif metricname.find( 'zplugin_monarch_job' ) != -1:
    845855
    846                 job_id    = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[0]
     856                job_id    = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
    847857                val    = str( attrs.get( 'VAL', "" ) )
    848858
     
    13311341        # Store metrics somewhere between every 360 and 640 seconds
    13321342        #
    1333         if DEBUG_LEVEL > 2:
    1334             #STORE_INTERVAL = 60
    1335             STORE_INTERVAL = random.randint( 360, 640 )
     1343        if DEBUG_LEVEL >= 1:
     1344            STORE_INTERVAL = 60
     1345            #STORE_INTERVAL = random.randint( 360, 640 )
    13361346        else:
    13371347            STORE_INTERVAL = random.randint( 360, 640 )
     
    13521362        if store_metric_thread.isAlive():
    13531363
    1354             debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
     1364            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to terminate..' )
    13551365            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
    1356             debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
     1366            debug_msg( 1, 'ganglia_store_thread(): Done waiting: terminated storemetricthread()' )
    13571367
    13581368        debug_msg( 1, 'ganglia_store_thread(): finished.' )
     
    13761386        """Process XML"""
    13771387
     1388        debug_msg( 5, "Entering processXML()")
     1389
    13781390        try:
    13791391            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
     
    13911403        if parsethread.isAlive():
    13921404
    1393             debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
     1405            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to terminate..' %PARSE_TIMEOUT )
    13941406            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
    1395             debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
     1407            debug_msg( 1, 'ganglia_xml_thread(): Done waiting. parsethread() terminated' )
    13961408
    13971409        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
     1410
     1411        debug_msg( 5, "Leaving processXML()")
    13981412
    13991413        return 0
     
    14061420       
    14071421        my_data    = self.myXMLSource.getData()
     1422        debug_msg( 1, 'ganglia_parse_thread(): data size %d.' %len(my_data) )
    14081423
    14091424        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
Note: See TracChangeset for help on using the changeset viewer.