Changeset 365


Ignore:
Timestamp:
06/13/07 10:20:05 (14 years ago)
Author:
bastiaans
Message:

jobarchived/jobarchived.py:

  • rewritten to use py-rrdtool api and no more pipes
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobarchived/jobarchived.py

    r360 r365  
    163163import DBClass
    164164import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
     165import rrdtool
    165166
    166167class DataSQLStore:
     
    193194        def doDatabase(self, type, statement):
    194195
    195                 debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) )
     196                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
    196197                try:
    197198                        if type == 'set':
     
    206207                        sys.exit(1)
    207208
    208                 debug_msg( 6, 'doDatabase(): result: %s' %(result) )
     209                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
    209210                return result
    210211
     
    272273                update_str      = None
    273274
    274                 debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))
     275                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
    275276
    276277                ids = [ ]
     
    434435        binary = None
    435436
     437
    436438        def __init__( self, binary=None ):
    437439                """Set alternate binary if supplied"""
     
    443445                """Create a new rrd with args"""
    444446
    445                 return self.perform( 'create', '"' + filename + '"', args )
     447                #return self.perform( 'create', '"' + filename + '"', args )
     448                return self.perform( 'create', filename, args )
    446449
    447450        def update( self, filename, args ):
    448451                """Update a rrd with args"""
    449452
    450                 return self.perform( 'update', '"' + filename + '"', args )
     453                #return self.perform( 'update', '"' + filename + '"', args )
     454                return self.perform( 'update', filename, args )
    451455
    452456        def grabLastUpdate( self, filename ):
     
    455459                last_update = 0
    456460
    457                 debug_msg( 8, self.binary + ' info "' + filename + '"' )
    458 
    459                 my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
    460 
    461                 for line in my_pipe.readlines():
    462 
    463                         if line.find( 'last_update') != -1:
    464 
    465                                 last_update = line.split( ' = ' )[1]
    466 
    467                 if my_pipe:
    468 
    469                         my_pipe.close()
    470 
    471                 if last_update:
     461                debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
     462
     463                rrd_header      = { }
     464
     465                try:
     466                        rrd_header      = rrdtool.info( filename )
     467                except rrdtool.error, msg:
     468                        debug_msg( 8, str( msg ) )
     469
     470                if rrd_header.has_key( 'last_update' ):
    472471                        return last_update
    473472                else:
     
    491490                                arg_string = arg_string + ' ' + arg
    492491
    493                 debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
    494 
    495                 cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
    496                 lines   = cmd.readlines()
    497 
    498                 cmd.close()
    499 
    500                 for line in lines:
    501 
    502                         if line.find( 'ERROR' ) != -1:
    503 
    504                                 error_msg = string.join( line.split( ' ' )[1:] )
    505                                 debug_msg( 8, error_msg )
    506                                 return 1
     492                debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
     493
     494                try:
     495                        debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
     496
     497                        if action == 'create':
     498
     499                                rrdtool.create( str( filename ), *args )
     500
     501                        elif action == 'update':
     502
     503                                rrdtool.update( str( filename ), *args )
     504
     505                except rrdtool.error, msg:
     506
     507                        error_msg = str( msg )
     508                        debug_msg( 8, error_msg )
     509                        return 1
    507510
    508511                return 0
     
    566569
    567570                self.heartbeat  = 0
     571                self.elementct  = 0
    568572
    569573        def startElement( self, name, attrs ):
     
    576580                jobinfo = { }
    577581
     582                self.elementct  += 1
     583
    578584                if name == 'CLUSTER':
    579585
     
    623629                                                        self.jobs_to_store.append( job_id )
    624630
    625                                                 debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
     631                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
    626632                                else:
    627633                                        self.jobAttrs[ job_id ] = jobinfo
     
    630636                                                self.jobs_to_store.append( job_id )
    631637
    632                                         debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
     638                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
    633639                                       
    634640        def endDocument( self ):
    635641                """When all metrics have gone, check if any jobs have finished"""
     642
     643                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" jobs" )
    636644
    637645                if self.heartbeat:
     
    719727
    720728                debug_msg( 1, 'Checking database..' )
    721                 self.ds.checkStaleJobs()
     729
     730                global DEBUG_LEVEL
     731
     732                if DEBUG_LEVEL <= 2:
     733                        self.ds.checkStaleJobs()
     734
    722735                debug_msg( 1, 'Check done.' )
    723736                debug_msg( 1, 'Checking rrd archive..' )
     
    743756
    744757                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
     758
     759                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
    745760
    746761        def startElement( self, name, attrs ):
     
    10481063                """Store metrics retained in memory to disk"""
    10491064
     1065                global DEBUG_LEVEL
     1066
    10501067                # Store metrics somewhere between every 360 and 640 seconds
    10511068                #
    1052                 STORE_INTERVAL = random.randint( 360, 640 )
     1069                if DEBUG_LEVEL > 2:
     1070                        #STORE_INTERVAL = 60
     1071                        STORE_INTERVAL = random.randint( 360, 640 )
     1072                else:
     1073                        STORE_INTERVAL = random.randint( 360, 640 )
    10531074
    10541075                try:
     
    12271248                self.rrdm       = RRDMutator( RRDTOOL )
    12281249
    1229                 self.gatherLastUpdates()
     1250                global DEBUG_LEVEL
     1251
     1252                if DEBUG_LEVEL <= 2:
     1253                        self.gatherLastUpdates()
    12301254
    12311255        def gatherLastUpdates( self ):
     
    13301354                        if self.checkStoreMetric( host, metric ):
    13311355
    1332                                 update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
     1356                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
     1357                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
     1358                                update_list.append( u_val )
    13331359
    13341360                return update_list
     
    13801406                """
    13811407
     1408                debug_msg( 5, "Entering storeMetrics()")
     1409
     1410                count_values    = 0
     1411                count_metrics   = 0
     1412                count_bits      = 0
     1413
     1414                for hostname, mymetrics in self.myMetrics.items():     
     1415
     1416                        for metricname, mymetric in mymetrics.items():
     1417
     1418                                count_metrics += 1
     1419
     1420                                for dmetric in mymetric:
     1421
     1422                                        count_values += 1
     1423
     1424                                        count_bits      += len( dmetric['time'] )
     1425                                        count_bits      += len( dmetric['val'] )
     1426
     1427                count_bytes     = count_bits / 8
     1428
     1429                debug_msg( 5, "size of cluster '" + self.cluster + "': " +
     1430                        str( len( self.myMetrics.keys() ) ) + " hosts " +
     1431                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
     1432                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
     1433
    13821434                for hostname, mymetrics in self.myMetrics.items():     
    13831435
     
    14371489
    14381490                                self.memLastUpdate( hostname, metricname, metrics_to_store )
     1491
     1492                debug_msg( 5, "Leaving storeMetrics()")
    14391493
    14401494        def makeTimeSerial( self ):
Note: See TracChangeset for help on using the changeset viewer.