Changeset 292


Ignore:
Timestamp:
03/30/07 09:42:06 (17 years ago)
Author:
bastiaans
Message:

jobarchived/jobarchived.py:

  • rearranged indentation and layout to be more readable
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/jobarchived/jobarchived.py

    r289 r292  
    3434def processArgs( args ):
    3535
    36         SHORT_L = 'c:'
    37         LONG_L = 'config='
     36        SHORT_L = 'c:'
     37        LONG_L  = 'config='
    3838
    3939        config_filename = None
     
    106106        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL
    107107
    108         ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
    109 
    110         ARCHIVE_HOURS_PER_RRD = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
    111 
    112         DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
    113 
    114         USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
    115 
    116         SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
     108        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
     109
     110        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
     111
     112        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
     113
     114        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
     115
     116        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
    117117
    118118        try:
    119119
    120                 SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
     120                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
    121121
    122122        except AttributeError, detail:
     
    125125                sys.exit( 1 )
    126126
    127         GMETAD_CONF = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
    128 
    129         ARCHIVE_XMLSOURCE = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
    130 
    131         ARCHIVE_DATASOURCES = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
    132 
    133         ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
    134 
    135         JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
    136 
    137         DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
    138 
    139         RRDTOOL = cfg.get( 'DEFAULT', 'RRDTOOL' )
     127        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
     128
     129        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
     130
     131        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
     132
     133        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
     134
     135        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
     136
     137        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
     138
     139        RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
    140140
    141141        return True
     
    264264        def mutateJob( self, action, job_id, jobattrs ):
    265265
    266                 job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
    267 
    268                 insert_col_str = 'job_id'
    269                 insert_val_str = "'%s'" %job_id
    270                 update_str = None
     266                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
     267
     268                insert_col_str  = 'job_id'
     269                insert_val_str  = "'%s'" %job_id
     270                update_str      = None
    271271
    272272                debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))
     
    340340                for node in hostnames:
    341341
    342                         node = '%s.%s' %( node, domain )
    343                         id = self.getNodeId( node )
     342                        node    = '%s.%s' %( node, domain )
     343                        id      = self.getNodeId( node )
    344344       
    345345                        if not id:
     
    395395                debug_msg( 8, self.binary + ' info "' + filename + '"' )
    396396
    397                 for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
     397                my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
     398
     399                for line in my_pipe.readlines():
    398400
    399401                        if line.find( 'last_update') != -1:
    400402
    401403                                last_update = line.split( ' = ' )[1]
     404
     405                if my_pipe:
     406
     407                        my_pipe.close()
    402408
    403409                if last_update:
     
    425431                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
    426432
    427                 cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
    428                 lines = cmd.readlines()
     433                cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
     434                lines   = cmd.readlines()
     435
    429436                cmd.close()
    430437
     
    489496        def __init__( self ):
    490497
    491                 self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
    492                 self.jobs_processed = [ ]
    493                 self.jobs_to_store = [ ]
     498                self.ds                 = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
     499                self.jobs_processed     = [ ]
     500                self.jobs_to_store      = [ ]
    494501
    495502        def startDocument( self ):
    496503
    497                 self.heartbeat = 0
     504                self.heartbeat  = 0
    498505
    499506        def startElement( self, name, attrs ):
     
    519526                        elif metricname.find( 'MONARCH-JOB' ) != -1:
    520527
    521                                 job_id = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
    522                                 val = attrs.get( 'VAL', "" )
     528                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
     529                                val     = attrs.get( 'VAL', "" )
    523530
    524531                                if not job_id in self.jobs_processed:
     532
    525533                                        self.jobs_processed.append( job_id )
    526534
     
    528536
    529537                                if self.jobAttrs.has_key( job_id ):
     538
    530539                                        check_change = 1
    531540
     
    536545                                        if len( myval.split( '=' ) ) > 1:
    537546
    538                                                 valname = myval.split( '=' )[0]
    539                                                 value = myval.split( '=' )[1]
     547                                                valname = myval.split( '=' )[0]
     548                                                value   = myval.split( '=' )[1]
    540549
    541550                                                if valname == 'nodes':
     
    546555                                if check_change:
    547556                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
    548                                                 self.jobAttrs[ job_id ]['stop_timestamp'] = ''
    549                                                 self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
     557                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
     558                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
    550559                                                if not job_id in self.jobs_to_store:
    551560                                                        self.jobs_to_store.append( job_id )
     
    594603                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
    595604
    596                         self.jobs_processed = [ ]
    597                         self.jobs_to_store = [ ]
     605                        self.jobs_processed     = [ ]
     606                        self.jobs_to_store      = [ ]
    598607
    599608        def setJobAttrs( self, old, new ):
     
    642651                """Setup initial variables and gather info on existing rrd archive"""
    643652
    644                 self.config = config
    645                 self.clusters = { }
     653                self.config     = config
     654                self.clusters   = { }
    646655                debug_msg( 1, 'Checking existing toga rrd archive..' )
    647                 self.gatherClusters()
     656                #self.gatherClusters()
    648657                debug_msg( 1, 'Check done.' )
    649658
     
    651660                """Find all existing clusters in archive dir"""
    652661
    653                 archive_dir = check_dir(ARCHIVE_PATH)
    654 
    655                 hosts = [ ]
     662                archive_dir     = check_dir(ARCHIVE_PATH)
     663
     664                hosts           = [ ]
    656665
    657666                if os.path.exists( archive_dir ):
    658667
    659                         dirlist = os.listdir( archive_dir )
     668                        dirlist = os.listdir( archive_dir )
    660669
    661670                        for item in dirlist:
     
    672681                if name == 'GANGLIA_XML':
    673682
    674                         self.XMLSource = attrs.get( 'SOURCE', "" )
    675                         self.gangliaVersion = attrs.get( 'VERSION', "" )
     683                        self.XMLSource          = attrs.get( 'SOURCE', "" )
     684                        self.gangliaVersion     = attrs.get( 'VERSION', "" )
    676685
    677686                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
     
    679688                elif name == 'GRID':
    680689
    681                         self.gridName = attrs.get( 'NAME', "" )
    682                         self.time = attrs.get( 'LOCALTIME', "" )
     690                        self.gridName   = attrs.get( 'NAME', "" )
     691                        self.time       = attrs.get( 'LOCALTIME', "" )
    683692
    684693                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
     
    686695                elif name == 'CLUSTER':
    687696
    688                         self.clusterName = attrs.get( 'NAME', "" )
    689                         self.time = attrs.get( 'LOCALTIME', "" )
     697                        self.clusterName        = attrs.get( 'NAME', "" )
     698                        self.time               = attrs.get( 'LOCALTIME', "" )
    690699
    691700                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
     
    697706                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
    698707
    699                         self.hostName = attrs.get( 'NAME', "" )
    700                         self.hostIp = attrs.get( 'IP', "" )
    701                         self.hostReported = attrs.get( 'REPORTED', "" )
     708                        self.hostName           = attrs.get( 'NAME', "" )
     709                        self.hostIp             = attrs.get( 'IP', "" )
     710                        self.hostReported       = attrs.get( 'REPORTED', "" )
    702711
    703712                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
     
    723732                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
    724733
    725                                 myMetric = { }
    726                                 myMetric['name'] = attrs.get( 'NAME', "" )
    727                                 myMetric['val'] = attrs.get( 'VAL', "" )
    728                                 myMetric['time'] = self.hostReported
     734                                myMetric                = { }
     735                                myMetric['name']        = attrs.get( 'NAME', "" )
     736                                myMetric['val']         = attrs.get( 'VAL', "" )
     737                                myMetric['time']        = self.hostReported
    729738
    730739                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
     
    10701079                                if line.find( 'data_source' ) != -1 and line[0] != '#':
    10711080
    1072                                         source = { }
    1073                                         source['name'] = line.split( '"' )[1]
    1074                                         source_words = line.split( '"' )[2].split( ' ' )
     1081                                        source          = { }
     1082                                        source['name']  = line.split( '"' )[1]
     1083                                        source_words    = line.split( '"' )[2].split( ' ' )
    10751084
    10761085                                        for word in source_words:
     
    11351144                """Setup initial variables"""
    11361145
    1137                 self.block = 0
    1138                 self.cluster = cluster
    1139                 self.config = config
    1140                 self.slot = threading.Lock()
    1141                 self.rrdm = RRDMutator( RRDTOOL )
     1146                self.block      = 0
     1147                self.cluster    = cluster
     1148                self.config     = config
     1149                self.slot       = threading.Lock()
     1150                self.rrdm       = RRDMutator( RRDTOOL )
     1151
    11421152                self.gatherLastUpdates()
    11431153
     
    11591169                for host in hosts:
    11601170
    1161                         host_dir = cluster_dir + '/' + host
    1162                         dirlist = os.listdir( host_dir )
     1171                        host_dir        = cluster_dir + '/' + host
     1172                        dirlist         = os.listdir( host_dir )
    11631173
    11641174                        for dir in dirlist:
     
    11711181
    11721182                        last_serial = self.getLastRrdTimeSerial( host )
     1183
    11731184                        if last_serial:
    11741185
    11751186                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
     1187
    11761188                                if os.path.exists( metric_dir ):
    11771189
     
    12141226                                self.myMetrics[ host ][ metric['name'] ] = [ ]
    12151227                else:
    1216                         self.myMetrics[ host ] = { }
    1217                         self.myMetrics[ host ][ metric['name'] ] = [ ]
     1228                        self.myMetrics[ host ]                          = { }
     1229                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
    12181230
    12191231                # Push new metric onto stack
     
    12321244                """
    12331245
    1234                 update_list = [ ]
    1235                 metric = None
     1246                update_list     = [ ]
     1247                metric          = None
    12361248
    12371249                while len( metriclist ) > 0:
     
    12401252
    12411253                        if self.checkStoreMetric( host, metric ):
     1254
    12421255                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
    12431256
     
    13591372                """Make a RRD location/path and filename"""
    13601373
    1361                 rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
    1362                 rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
     1374                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
     1375                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
    13631376
    13641377                return rrd_dir, rrd_file
     
    14141427                        if metric['name'] == metricname:
    14151428
    1416                                 period = self.determinePeriod( host, metric['time'] )   
    1417 
    1418                                 archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
     1429                                period          = self.determinePeriod( host, metric['time'] ) 
     1430
     1431                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
    14191432
    14201433                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
     
    14641477                if not os.path.exists( rrd_file ):
    14651478
    1466                         interval = self.config.getInterval( self.cluster )
    1467                         heartbeat = 8 * int( interval )
    1468 
    1469                         params = [ ]
     1479                        interval        = self.config.getInterval( self.cluster )
     1480                        heartbeat       = 8 * int( interval )
     1481
     1482                        params          = [ ]
    14701483
    14711484                        params.append( '--step' )
     
    14901503                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
    14911504
    1492                 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
    1493 
    1494                 update_list = self.makeUpdateList( host, metriclist )
     1505                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
     1506
     1507                update_list             = self.makeUpdateList( host, metriclist )
    14951508
    14961509                if len( update_list ) > 0:
     
    15511564
    15521565        try:
    1553                 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
    1554                 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
     1566                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
     1567                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
    15551568
    15561569                torque_xml_thread.start()
Note: See TracChangeset for help on using the changeset viewer.