source: branches/1.0/jobarchived/jobarchived.py @ 913

Last change on this file since 913 was 913, checked in by olahaye, 11 years ago

[rpm&deb packaging] Now fixes the VERSION outside current directory (can be SVN)
This avoids .in files and let generate tarballs and packages (binary and sources) without any VERSION values.
make deb or rpm or install even from svn is now safe from "sed -i -e"

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 61.9 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[771]5# Copyright (C) 2006-2013  Ramon Bastiaans
[225]6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 913 2013-05-22 17:00:22Z olahaye $
22#
[3]23
[466]24import getopt, syslog, ConfigParser, sys
[3]25
[913]26VERSION='__VERSION__'
[284]27
[466]28def usage( ver ):
[284]29
[770]30    print 'jobarchived %s' %VERSION
[284]31
[770]32    if ver:
33        return 0
[284]34
[770]35    print
36    print 'Purpose:'
37    print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
38    print '  and node statistics in a RRD archive'
39    print
40    print 'Usage:    jobarchived [OPTIONS]'
41    print
42    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobarchived.conf)'
43    print '  -p, --pidfile=FILE    Use pid file to store the process id'
44    print '  -h, --help        Print help and exit'
45    print '  -v, --version        Print version and exit'
46    print
[435]47
[214]48def processArgs( args ):
[6]49
[858]50    SHORT_L   = 'p:hvc:'
[773]51    LONG_L    = [ 'help', 'config=', 'pidfile=', 'version' ]
[169]52
[773]53    config_filename = '/etc/jobarchived.conf'
[169]54
[770]55    global PIDFILE
[435]56
[858]57    PIDFILE   = None
[435]58
[773]59    try:
[169]60
[773]61        opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]62
[773]63    except getopt.error, detail:
[60]64
[773]65        print detail
66        sys.exit(1)
[9]67
[773]68    for opt, value in opts:
[60]69
[773]70        if opt in [ '--config', '-c' ]:
[13]71
[773]72            config_filename = value
[198]73
[770]74        if opt in [ '--pidfile', '-p' ]:
[435]75
[770]76            PIDFILE         = value
[435]77
[770]78        if opt in [ '--help', '-h' ]:
[435]79
[770]80            usage( False )
81            sys.exit( 0 )
[435]82
[770]83        if opt in [ '--version', '-v' ]:
[60]84
[770]85            usage( True )
86            sys.exit( 0 )
[22]87
[770]88    try:
89        return loadConfig( config_filename )
[13]90
[770]91    except ConfigParser.NoOptionError, detail:
[214]92
[770]93        print detail
94        sys.exit( 1 )
[214]95
96def loadConfig( filename ):
97
[770]98    def getlist( cfg_string ):
[214]99
[770]100        my_list = [ ]
[214]101
[770]102        for item_txt in cfg_string.split( ',' ):
[214]103
[770]104            sep_char = None
[214]105
[770]106            item_txt = item_txt.strip()
[214]107
[770]108            for s_char in [ "'", '"' ]:
[214]109
[770]110                if item_txt.find( s_char ) != -1:
[214]111
[770]112                    if item_txt.count( s_char ) != 2:
[214]113
[770]114                        print 'Missing quote: %s' %item_txt
115                        sys.exit( 1 )
[214]116
[770]117                    else:
[214]118
[770]119                        sep_char = s_char
120                        break
[214]121
[770]122            if sep_char:
[214]123
[770]124                item_txt = item_txt.split( sep_char )[1]
[214]125
[770]126            my_list.append( item_txt )
[214]127
[770]128        return my_list
[214]129
[770]130    cfg = ConfigParser.ConfigParser()
[214]131
[770]132    cfg.read( filename )
[214]133
[770]134    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
135    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
[774]136    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL, JOB_SQL_PASSWORD, JOB_SQL_USER
[214]137
[858]138    ARCHIVE_PATH           = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
[214]139
[858]140    ARCHIVE_HOURS_PER_RRD  = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
[214]141
[858]142    DEBUG_LEVEL            = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[214]143
[858]144    USE_SYSLOG             = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[214]145
[858]146    SYSLOG_LEVEL           = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[214]147
[858]148    MODRRDTOOL             = False
[375]149
[770]150    try:
151        global rrdtool
152        import rrdtool
[214]153
[770]154        MODRRDTOOL        = True
[375]155
[770]156    except ImportError:
[375]157
[770]158        MODRRDTOOL        = False
[375]159
[770]160        print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
[375]161
[770]162        RRDTOOL            = cfg.get( 'DEFAULT', 'RRDTOOL' )
[455]163
[770]164    try:
[375]165
[770]166        SYSLOG_FACILITY    = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[214]167
[770]168    except AttributeError, detail:
[214]169
[770]170        print 'Unknown syslog facility'
171        sys.exit( 1 )
[214]172
[858]173    GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
[214]174
[858]175    ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
[214]176
[858]177    ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
[214]178
[858]179    ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
[214]180
[858]181    JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
182    JOB_SQL_USER            = cfg.get( 'DEFAULT', 'JOB_SQL_USER' )
[774]183    JOB_SQL_PASSWORD        = cfg.get( 'DEFAULT', 'JOB_SQL_PASSWORD' )
[214]184
[858]185    JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
[295]186
[858]187    DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[214]188
[770]189    return True
[214]190
[17]191# What XML data types not to store
[13]192#
[17]193UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]194
[47]195# Maximum time (in seconds) a parsethread may run
196#
197PARSE_TIMEOUT = 60
198
199# Maximum time (in seconds) a storethread may run
200#
201STORE_TIMEOUT = 360
202
[8]203"""
[224]204The Job Archiving Daemon
[8]205"""
206
[214]207from types import *
208
209import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
210
[486]211try:
[773]212    import psycopg2
[486]213
214except ImportError, details:
215
[773]216    print "FATAL ERROR: psycopg2 python module not found"
[770]217    sys.exit( 1 )
[486]218
[379]219class InitVars:
220        Vars = {}
221       
222        def __init__(self, **key_arg):
223                for (key, value) in key_arg.items():
224                        if value:
225                                self.Vars[key] = value
226                        else:   
227                                self.Vars[key] = None
228                               
229        def __call__(self, *key):
230                key = "%s" % key
231                return self.Vars[key]
232               
233        def __getitem__(self, key):
234                return self.Vars[key]
235               
236        def __repr__(self):
237                return repr(self.Vars)
238               
239        def keys(self):
240                barf =  map(None, self.Vars.keys())
241                return barf
242               
243        def values(self):
244                barf =  map(None, self.Vars.values())
245                return barf
246               
247        def has_key(self, key):
248                if self.Vars.has_key(key):
249                        return 1
250                else:   
251                        return 0
252                       
253class DBError(Exception):
254        def __init__(self, msg=''):
255                self.msg = msg
256                Exception.__init__(self, msg)
257        def __repr__(self):
258                return self.msg
259        __str__ = __repr__
260
261#
262# Class to connect to a database
263# and return the queury in a list or dictionairy.
264#
265class DB:
[773]266    def __init__(self, db_vars):
[379]267
[773]268        self.dict = db_vars
[379]269
[773]270        if self.dict.has_key('User'):
271            self.user = self.dict['User']
272        else:
273            self.user = 'postgres'
[379]274
[773]275        if self.dict.has_key('Host'):
276            self.host = self.dict['Host']
277        else:
278            self.host = 'localhost'
[379]279
[773]280        if self.dict.has_key('Password'):
281            self.passwd = self.dict['Password']
282        else:
283            self.passwd = ''
[379]284
[773]285        if self.dict.has_key('DataBaseName'):
286            self.db = self.dict['DataBaseName']
287        else:
288            self.db = 'jobarchive'
[379]289
[773]290        # connect_string = 'host:port:database:user:password:
291        dsn = "host='%s' dbname='%s' user='%s' password='%s'" %(self.host, self.db, self.user, self.passwd)
[379]292
[773]293        try:
294            self.SQL = psycopg2.connect(dsn)
295        except psycopg2.Error, details:
296            str = "%s" %details
297            raise DBError(str)
[379]298
[773]299    def __repr__(self):
300        return repr(self.result)
[379]301
[773]302    def __nonzero__(self):
303        return not(self.result == None)
[379]304
[773]305    def __len__(self):
306        return len(self.result)
[379]307
[773]308    def __getitem__(self,i):
309        return self.result[i]
[379]310
[773]311    def __getslice__(self,i,j):
312        return self.result[i:j]
[379]313
[773]314    def Get(self, q_str):
315        c = self.SQL.cursor()
316        try:
317            c.execute(q_str)
318            result = c.fetchall()
319        except psycopg2.Error, details:
320            c.close()
321            str = "%s" %details
322            raise DBError(str)
[379]323
[773]324        c.close()
325        return result
[379]326
[773]327    def Set(self, q_str):
328        c = self.SQL.cursor()
329        try:
330            c.execute(q_str)
[379]331
[773]332        except psycopg2.Error, details:
333            c.close()
334            str = "%s" %details
335            raise DBError(str)
[379]336
[773]337        c.close()
[774]338        return True
[379]339
[773]340    def Commit(self):
[379]341
[792]342        return self.SQL.commit()
343
344    def Rollback( self ):
345
346        return self.SQL.rollback()
347
[84]348class DataSQLStore:
349
[770]350    db_vars = None
351    dbc = None
[84]352
[770]353    def __init__( self, hostname, database ):
[84]354
[774]355        global JOB_SQL_USER, JOB_SQL_PASSWORD
356
[770]357        self.db_vars = InitVars(DataBaseName=database,
[774]358                User=JOB_SQL_USER,
[770]359                Host=hostname,
[774]360                Password=JOB_SQL_PASSWORD,
[770]361                Dictionary='true')
[84]362
[770]363        try:
364            self.dbc     = DB(self.db_vars)
365        except DBError, details:
366            debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
367            sys.exit(1)
[84]368
[770]369    def setDatabase(self, statement):
[792]370
[770]371        ret = self.doDatabase('set', statement)
372        return ret
373       
374    def getDatabase(self, statement):
[792]375
[770]376        ret = self.doDatabase('get', statement)
377        return ret
[84]378
[792]379    def doCommit( self ):
380
381        return self.dbc.Commit()
382
383    def doRollback( self ):
384
385        return self.dbc.Rollback()
386
[770]387    def doDatabase(self, type, statement):
[84]388
[770]389        debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
390        try:
391            if type == 'set':
392                result = self.dbc.Set( statement )
393            elif type == 'get':
394                result = self.dbc.Get( statement )
395               
396        except DBError, detail:
397            operation = statement.split(' ')[0]
[792]398            debug_msg( 0, 'ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
399            return False
[84]400
[770]401        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
402        return result
[84]403
[770]404    def getJobNodeId( self, job_id, node_id ):
[191]405
[770]406        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
[792]407        if not id:
408            return False
409
[770]410        if len( id ) > 0:
[191]411
[770]412            if len( id[0] ) > 0 and id[0] != '':
413           
[792]414                return True
[191]415
[792]416        return False
[191]417
[770]418    def getNodeId( self, hostname ):
[84]419
[770]420        id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]421
[770]422        if len( id ) > 0:
[84]423
[770]424            id = id[0][0]
[89]425
[770]426            return id
427        else:
428            return None
[84]429
[770]430    def getNodeIds( self, hostnames ):
[84]431
[770]432        ids = [ ]
[84]433
[770]434        for node in hostnames:
[84]435
[770]436            id = self.getNodeId( node )
[84]437
[770]438            if id:
439                ids.append( id )
[84]440
[770]441        return ids
[84]442
[770]443    def getJobId( self, jobid ):
[84]444
[770]445        id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
[84]446
[770]447        if id:
448            id = id[0][0]
[84]449
[770]450            return id
451        else:
452            return None
[84]453
[770]454    def addJob( self, job_id, jobattrs ):
[84]455
[770]456        if not self.getJobId( job_id ):
[84]457
[792]458            return self.mutateJob( 'insert', job_id, jobattrs ) 
[770]459        else:
[792]460            return self.mutateJob( 'update', job_id, jobattrs )
[84]461
[770]462    def mutateJob( self, action, job_id, jobattrs ):
[84]463
[782]464        job_values     = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
[84]465
[782]466        insert_col_str = 'job_id'
467        insert_val_str = "'%s'" %job_id
468        update_str     = None
[84]469
[770]470        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
[84]471
[770]472        ids = [ ]
[99]473
[770]474        for valname, value in jobattrs.items():
[84]475
[770]476            if valname in job_values and value != '':
[84]477
[770]478                column_name = 'job_' + valname
[84]479
[770]480                if action == 'insert':
[84]481
[770]482                    if not insert_col_str:
483                        insert_col_str = column_name
484                    else:
485                        insert_col_str = insert_col_str + ',' + column_name
[84]486
[770]487                    if not insert_val_str:
488                        insert_val_str = value
489                    else:
490                        insert_val_str = insert_val_str + ",'%s'" %value
[84]491
[770]492                elif action == 'update':
493                   
494                    if not update_str:
495                        update_str = "%s='%s'" %(column_name, value)
496                    else:
497                        update_str = update_str + ",%s='%s'" %(column_name, value)
[84]498
[770]499            elif valname == 'nodes' and value:
[84]500
[770]501                node_valid = 1
[190]502
[770]503                if len(value) == 1:
504               
505                    if jobattrs['status'] == 'Q':
[190]506
[770]507                        node_valid = 0
[190]508
[770]509                    else:
[190]510
[770]511                        node_valid = 0
[190]512
[770]513                        for node_char in str(value[0]):
[190]514
[770]515                            if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]516
[770]517                                node_valid = 1
[84]518
[770]519                if node_valid:
[191]520
[770]521                    ids = self.addNodes( value, jobattrs['domain'] )
[191]522
[770]523        if action == 'insert':
[84]524
[792]525            db_ret = self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]526
[770]527        elif action == 'update':
[84]528
[792]529            db_ret = self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
[84]530
[770]531        if len( ids ) > 0:
532            self.addJobNodes( job_id, ids )
[190]533
[792]534        return db_ret
535
[770]536    def addNodes( self, hostnames, domain ):
[84]537
[770]538        ids = [ ]
[98]539
[770]540        for node in hostnames:
[84]541
[770]542            node    = '%s.%s' %( node, domain )
543            id    = self.getNodeId( node )
544   
545            if not id:
546                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
547                id = self.getNodeId( node )
[84]548
[770]549            ids.append( id )
[98]550
[770]551        return ids
[98]552
[770]553    def addJobNodes( self, jobid, nodes ):
[86]554
[770]555        for node in nodes:
[86]556
[770]557            if not self.getJobNodeId( jobid, node ):
[191]558
[770]559                self.addJobNode( jobid, node )
[191]560
[770]561    def addJobNode( self, jobid, nodeid ):
[84]562
[774]563        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( '%s',%s )" %(jobid, nodeid) )
[84]564
[770]565    def storeJobInfo( self, jobid, jobattrs ):
[84]566
[792]567        return self.addJob( jobid, jobattrs )
[84]568
[857]569    def checkTimedoutJobs( self ):
[295]570
[857]571        debug_msg( 1, 'Housekeeping: checking database for timed out jobs..' )
572
[770]573        # Locate all jobs in the database that are not set to finished
574        #
575        q = "SELECT * from jobs WHERE job_status != 'F'"
[295]576
[770]577        r = self.getDatabase( q )
[295]578
[770]579        if len( r ) == 0:
[295]580
[770]581            return None
[295]582
[857]583        timeoutjobs  = [ ]
[295]584
[857]585        jobtimeout_sec = JOB_TIMEOUT * (60 * 60)
586        cur_time       = time.time()
[295]587
[770]588        for row in r:
[295]589
[857]590            job_id              = row[0]
591            job_requested_time  = row[4]
592            job_status          = row[7]
593            job_start_timestamp = row[8]
[295]594
[770]595            # If it was set to queued and we didn't see it started
596            # there's not point in keeping it around
597            #
[857]598            if job_status == 'R' and job_start_timestamp:
[295]599
[770]600                start_timestamp = int( job_start_timestamp )
[295]601
[770]602                # If it was set to running longer than JOB_TIMEOUT
603                # close the job: it probably finished while we were not running
604                #
605                if ( cur_time - start_timestamp ) > jobtimeout_sec:
[295]606
[770]607                    if job_requested_time:
[295]608
[770]609                        rtime_epoch    = reqtime2epoch( job_requested_time )
610                    else:
611                        rtime_epoch    = None
612                   
613                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
[295]614
[857]615        debug_msg( 1, 'Housekeeping: Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
[295]616
[857]617        ret_jobids_clean = [ ]
[295]618
[770]619        # Close these jobs in the database
620        # update the stop_timestamp to: start_timestamp + requested wallclock
621        # and set state: finished
622        #
623        for j in timeoutjobs:
[295]624
[770]625            ( i, s, r )        = j
[295]626
[770]627            if r:
628                new_end_timestamp    = int( s ) + r
[295]629
[857]630                q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
631                self.setDatabase( q )
632            else:
633
634                # Requested walltime unknown: cannot guess end time: sorry delete them
635                q = "DELETE FROM jobs WHERE job_id = '" + str( i ) + "'"
636                self.setDatabase( q )
637
638            ret_jobids_clean.append( i )
639
640        debug_msg( 1, 'Housekeeping: done.' )
641
642        return ret_jobids_clean
643
644    def checkStaleJobs( self ):
645
646        debug_msg( 1, 'Housekeeping: checking database for stale jobs..' )
647
648        # Locate all jobs in the database that are not set to finished
649        #
650        q = "SELECT * from jobs WHERE job_status != 'F'"
651
652        r = self.getDatabase( q )
653
654        if len( r ) == 0:
655
656            return None
657
658        cleanjobs      = [ ]
659
660        cur_time       = time.time()
661
662        for row in r:
663
664            job_id              = row[0]
665            job_requested_time  = row[4]
666            job_status          = row[7]
667            job_start_timestamp = row[8]
668
669            # If it was set to queued and we didn't see it started
670            # there's not point in keeping it around
671            #
672            if job_status == 'Q' or not job_start_timestamp:
673
674                cleanjobs.append( job_id )
675
676        debug_msg( 1, 'Housekeeping: Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
677
678        # Purge these from database
679        #
680        for j in cleanjobs:
681
682            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
[770]683            self.setDatabase( q )
[295]684
[857]685        debug_msg( 1, 'Housekeeping: done.' )
686
687        return cleanjobs
688
[37]689class RRDMutator:
[770]690    """A class for performing RRD mutations"""
[37]691
[770]692    binary = None
[37]693
[770]694    def __init__( self, binary=None ):
695        """Set alternate binary if supplied"""
[37]696
[770]697        if binary:
698            self.binary = binary
[37]699
[770]700    def create( self, filename, args ):
701        """Create a new rrd with args"""
[63]702
[770]703        global MODRRDTOOL
[37]704
[770]705        if MODRRDTOOL:
706            return self.perform( 'create', filename, args )
707        else:
708            return self.perform( 'create', '"' + filename + '"', args )
[375]709
[770]710    def update( self, filename, args ):
711        """Update a rrd with args"""
[63]712
[770]713        global MODRRDTOOL
[37]714
[770]715        if MODRRDTOOL:
716            return self.perform( 'update', filename, args )
717        else:
718            return self.perform( 'update', '"' + filename + '"', args )
[375]719
[770]720    def grabLastUpdate( self, filename ):
721        """Determine the last update time of filename rrd"""
[42]722
[770]723        global MODRRDTOOL
[375]724
[770]725        last_update = 0
[42]726
[770]727        # Use the py-rrdtool module if it's available on this system
728        #
729        if MODRRDTOOL:
[53]730
[770]731            debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
[42]732
[770]733            rrd_header     = { }
[292]734
[770]735            try:
736                rrd_header    = rrdtool.info( filename )
737            except rrdtool.error, msg:
738                debug_msg( 8, str( msg ) )
[375]739
[770]740            if rrd_header.has_key( 'last_update' ):
741                return last_update
742            else:
743                return 0
[375]744
[770]745        # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
746        # DEPRECATED (slow!)
747        #
748        else:
749            debug_msg( 8, self.binary + ' info ' + filename )
[42]750
[770]751            my_pipe        = os.popen( self.binary + ' info "' + filename + '"' )
[375]752
[770]753            for line in my_pipe.readlines():
[375]754
[770]755                if line.find( 'last_update') != -1:
[375]756
[770]757                    last_update = line.split( ' = ' )[1]
[375]758
[770]759            if my_pipe:
[375]760
[770]761                my_pipe.close()
[375]762
[770]763            if last_update:
764                return last_update
765            else:
766                return 0
[375]767
768
[770]769    def perform( self, action, filename, args ):
770        """Perform action on rrd filename with args"""
[37]771
[770]772        global MODRRDTOOL
[375]773
[770]774        arg_string = None
[37]775
[770]776        if type( args ) is not ListType:
777            debug_msg( 8, 'Arguments needs to be of type List' )
778            return 1
[40]779
[770]780        for arg in args:
[37]781
[770]782            if not arg_string:
[37]783
[770]784                arg_string = arg
785            else:
786                arg_string = arg_string + ' ' + arg
[37]787
[770]788        if MODRRDTOOL:
[37]789
[770]790            debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
[292]791
[770]792            try:
793                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
[37]794
[770]795                if action == 'create':
[146]796
[770]797                    rrdtool.create( str( filename ), *args )
[37]798
[770]799                elif action == 'update':
[37]800
[770]801                    rrdtool.update( str( filename ), *args )
[365]802
[770]803            except rrdtool.error, msg:
[365]804
[770]805                error_msg = str( msg )
806                debug_msg( 8, error_msg )
807                return 1
[375]808
[770]809        else:
[375]810
[770]811            debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[375]812
[770]813            cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
814            lines   = cmd.readlines()
[375]815
[770]816            cmd.close()
[375]817
[770]818            for line in lines:
[375]819
[770]820                if line.find( 'ERROR' ) != -1:
[375]821
[770]822                    error_msg = string.join( line.split( ' ' )[1:] )
823                    debug_msg( 8, error_msg )
824                    return 1
[375]825
[770]826        return 0
[37]827
[78]828class XMLProcessor:
[770]829    """Skeleton class for XML processor's"""
[78]830
[770]831    def run( self ):
832        """Do main processing of XML here"""
[78]833
[770]834        pass
[78]835
[782]836class JobXMLProcessor( XMLProcessor ):
[770]837    """Main class for processing XML and acting with it"""
[78]838
[770]839    def __init__( self, XMLSource, DataStore ):
840        """Setup initial XML connection and handlers"""
[78]841
[782]842        self.myXMLSource  = XMLSource
843        self.myXMLHandler = JobXMLHandler( DataStore )
844        self.myXMLError   = XMLErrorHandler()
[78]845
[782]846        self.config       = GangliaConfigParser( GMETAD_CONF )
[287]847
[782]848        self.kill_thread  = False
849
850    def killThread( self ):
851
852        self.kill_thread  = True
853
[770]854    def run( self ):
855        """Main XML processing"""
[78]856
[782]857        debug_msg( 1, 'job_xml_thread(): started.' )
[87]858
[770]859        while( 1 ):
[78]860
[782]861            debug_msg( 1, 'job_xml_thread(): Retrieving XML data..' )
[176]862
[770]863            my_data    = self.myXMLSource.getData()
[287]864
[782]865            debug_msg( 1, 'job_xml_thread(): Done retrieving: data size %d' %len(my_data) )
[469]866
[770]867            if my_data:
[782]868                debug_msg( 1, 'job_xml_thread(): Parsing XML..' )
[469]869
[770]870                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[469]871
[782]872                debug_msg( 1, 'job_xml_thread(): Done parsing.' )
[774]873            else:
[782]874                debug_msg( 1, 'job_xml_thread(): Got no data.' )
[774]875
[782]876            if self.kill_thread:
877
878                debug_msg( 1, 'job_xml_thread(): killed.' )
879                return None
[770]880               
[782]881            debug_msg( 1, 'job_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[770]882            time.sleep( self.config.getLowestInterval() )
[78]883
[782]884class JobXMLHandler( xml.sax.handler.ContentHandler ):
885    """Parse Job's jobinfo XML from our plugin"""
[63]886
[770]887    def __init__( self, datastore ):
[84]888
[782]889        self.ds              = datastore
890        self.jobs_processed  = [ ]
891        self.jobs_to_store   = [ ]
892        self.jobAttrs        = { }
893        self.jobAttrsSaved   = { }
894
[857]895        self.iteration       = 0
896
897        self.ds.checkTimedoutJobs()
898        self.ds.checkStaleJobs()
899
[774]900        debug_msg( 1, "XML: Handler created" )
[84]901
[770]902    def startDocument( self ):
[183]903
[782]904        self.jobs_processed = [ ]
905        self.heartbeat      = 0
906        self.elementct      = 0
[857]907        self.iteration      = self.iteration + 1
[782]908
[857]909        if self.iteration > 20:
[183]910
[857]911            timedout_jobs = self.ds.checkTimedoutJobs()
912            self.iteration = 0
913
914            for j in timedout_jobs:
915
916                del self.jobAttrs[ j ]
917                del self.jobAttrsSaved[ j ]
918
919        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
920
[770]921    def startElement( self, name, attrs ):
922        """
923        This XML will be all gmetric XML
924        so there will be no specific start/end element
925        just one XML statement with all info
926        """
[782]927
[770]928        jobinfo = { }
[63]929
[770]930        self.elementct    += 1
[365]931
[770]932        if name == 'CLUSTER':
[63]933
[770]934            self.clustername = str( attrs.get( 'NAME', "" ) )
[199]935
[770]936        elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
[199]937
[770]938            metricname = str( attrs.get( 'NAME', "" ) )
[63]939
[772]940            if metricname == 'zplugin_monarch_heartbeat':
[782]941
[770]942                self.heartbeat = str( attrs.get( 'VAL', "" ) )
[63]943
[772]944            elif metricname.find( 'zplugin_monarch_job' ) != -1:
[63]945
[782]946                job_id  = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
947                val     = str( attrs.get( 'VAL', "" ) )
[63]948
[770]949                valinfo = val.split( ' ' )
[63]950
[770]951                for myval in valinfo:
[63]952
[770]953                    if len( myval.split( '=' ) ) > 1:
[63]954
[782]955                        valname = myval.split( '=' )[0]
956                        value   = myval.split( '=' )[1]
[70]957
[770]958                        if valname == 'nodes':
[782]959
[770]960                            value = value.split( ';' )
[72]961
[770]962                        jobinfo[ valname ] = value
[84]963
[782]964                self.jobAttrs[ job_id ] = jobinfo
[84]965
[782]966                self.jobs_processed.append( job_id )
[770]967                   
968    def endDocument( self ):
969        """When all metrics have gone, check if any jobs have finished"""
[72]970
[783]971        jobs_finished = [ ]
972
[782]973        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_processed))+" jobs" )
[365]974
[782]975        if self.heartbeat == 0:
976            return None
[74]977
[782]978        for jobid, jobinfo in self.jobAttrs.items():
[102]979
[782]980            if jobinfo['reported'] != self.heartbeat:
[74]981
[782]982                if (jobinfo['status'] != 'R'):
983                    debug_msg( 1, 'job %s report time %s does not match current heartbeat %s : ignoring job' %(jobid, jobinfo['reported'], self.heartbeat ) )
984                    del self.jobAttrs[ jobid ]
[96]985
[782]986                    if jobid in self.jobs_to_store:
987                        del self.jobs_to_store[ jobid ]
988
989                    continue
990
991                elif jobid not in self.jobs_processed:
[783]992
[782]993                    # Was running previous heartbeat but not anymore: must be finished
[770]994                    self.jobAttrs[ jobid ]['status'] = 'F'
995                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
[782]996                    debug_msg( 1, 'job %s appears to have finished' %jobid )
[96]997
[783]998                    jobs_finished.append( jobid )
999
[770]1000                    if not jobid in self.jobs_to_store:
1001                        self.jobs_to_store.append( jobid )
[74]1002
[782]1003                    continue
[87]1004
[782]1005            elif self.jobAttrsSaved.has_key( jobid ):
[84]1006
[783]1007                # This should pretty much never happen, but hey let's be careful
1008                # Perhaps if someone altered their job while in queue with qalter
1009
[782]1010                if self.jobinfoChanged( jobid, jobinfo ):
[184]1011
[782]1012                    self.jobAttrs[ jobid ]['stop_timestamp'] = ''
1013                    self.jobAttrs[ jobid ]                   = self.setJobAttrs( self.jobAttrs[ jobid ], jobinfo )
[184]1014
[782]1015                    if not jobid in self.jobs_to_store:
[87]1016
[782]1017                        self.jobs_to_store.append( jobid )
[84]1018
[782]1019                    debug_msg( 10, 'jobinfo for job %s has changed' %jobid )
1020            else:
1021                debug_msg( 1, 'new job %s' %jobid )
1022
1023                if not jobid in self.jobs_to_store:
1024
1025                    self.jobs_to_store.append( jobid )
1026
1027        debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) )
1028
[792]1029        failed_store = [ ]
1030        succes_store = [ ]
1031
[782]1032        if len( self.jobs_to_store ) > 0:
1033
1034            debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' )
1035
[792]1036            for n in range( 0, len(self.jobs_to_store ) ):
[782]1037
[792]1038                if len( self.jobs_to_store ) == 0:
1039                    break
1040
[782]1041                jobid = self.jobs_to_store.pop( 0 )
1042
[792]1043                db_ok = self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
[782]1044
[792]1045                if not db_ok:
1046
1047                    self.ds.doRollback()
1048                    failed_store.append( jobid )
1049                    continue
1050
1051                self.ds.doCommit()
1052                succes_store.append( jobid )
1053
[783]1054                if not jobid in jobs_finished:
[782]1055
[783]1056                    self.jobAttrsSaved[ jobid ] = self.jobAttrs[ jobid ]
1057
1058                elif self.jobAttrsSaved.has_key( jobid ):
1059
1060                    del self.jobAttrsSaved[ jobid ]
1061
[782]1062                if self.jobAttrs[ jobid ]['status'] == 'F':
1063
1064                    del self.jobAttrs[ jobid ]
1065
[792]1066            result_str = 'succesfully stored: %s jobs' %str(len(succes_store))
[782]1067
[792]1068            if len( failed_store ) > 0:
1069                result_str = result_str + ' - failed to store: %s jobs - deferred to next interval' %str(len(failed_store))
1070
1071            debug_msg( 1, 'job_xml_thread(): Done storing. %s' %result_str )
1072
[782]1073        else:
1074            debug_msg( 1, 'job_xml_thread(): No jobs to store.' )
1075
1076        self.jobs_processed = [ ]
1077
[783]1078        # TODO: once in while check database AND self.jobAttrsSaved for stale jobs
1079
[770]1080    def setJobAttrs( self, old, new ):
1081        """
1082        Set new job attributes in old, but not lose existing fields
1083        if old attributes doesn't have those
1084        """
[82]1085
[770]1086        for valname, value in new.items():
1087            old[ valname ] = value
[82]1088
[770]1089        return old
1090       
[82]1091
[782]1092    def jobinfoChanged( self, jobid, jobinfo ):
[770]1093        """
1094        Check if jobinfo has changed from jobattrs[jobid]
1095        if it's report time is bigger than previous one
1096        and it is report time is recent (equal to heartbeat)
1097        """
[72]1098
[770]1099        ignore_changes = [ 'reported' ]
[87]1100
[782]1101        if self.jobAttrsSaved.has_key( jobid ):
[73]1102
[770]1103            for valname, value in jobinfo.items():
[73]1104
[770]1105                if valname not in ignore_changes:
[73]1106
[782]1107                    if self.jobAttrsSaved[ jobid ].has_key( valname ):
[73]1108
[782]1109                        if value != self.jobAttrsSaved[ jobid ][ valname ]:
[73]1110
[782]1111                            if jobinfo['reported'] > self.jobAttrsSaved[ jobid ][ 'reported' ]:
1112
1113                                debug_msg( 1, "job %s field '%s' changed since saved from: %s to: %s" %( jobid, valname, value, self.jobAttrsSaved[ jobid ][ valname ] ) )
1114
[770]1115                                return True
[73]1116
[770]1117                    else:
[782]1118                        debug_msg( 1, "job %s did not have field '%s'" %( jobid, valname )  )
[770]1119                        return True
[87]1120
[770]1121        return False
[73]1122
[71]1123class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[770]1124    """Parse Ganglia's XML"""
[3]1125
[770]1126    def __init__( self, config, datastore ):
1127        """Setup initial variables and gather info on existing rrd archive"""
[63]1128
[857]1129        self.config          = config
1130        self.clusters        = { }
1131        self.ds              = datastore
[324]1132
[857]1133        debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' )
[770]1134        self.gatherClusters()
[857]1135        debug_msg( 1, 'Housekeeping: RRD check complete.' )
[33]1136
[770]1137    def gatherClusters( self ):
1138        """Find all existing clusters in archive dir"""
[44]1139
[770]1140        archive_dir    = check_dir(ARCHIVE_PATH)
[44]1141
[770]1142        hosts        = [ ]
[44]1143
[770]1144        if os.path.exists( archive_dir ):
[44]1145
[770]1146            dirlist    = os.listdir( archive_dir )
[44]1147
[770]1148            for cfgcluster in ARCHIVE_DATASOURCES:
[369]1149
[770]1150                if cfgcluster not in dirlist:
[369]1151
[770]1152                    # Autocreate a directory for this cluster
1153                    # assume it is new
1154                    #
1155                    cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
[369]1156
[770]1157                    os.mkdir( cluster_dir )
[369]1158
[770]1159                    dirlist.append( cfgcluster )
[370]1160
[770]1161            for item in dirlist:
[44]1162
[770]1163                clustername = item
[44]1164
[770]1165                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]1166
[770]1167                    self.clusters[ clustername ] = RRDHandler( self.config, clustername )
[44]1168
[770]1169        debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
[365]1170
[770]1171    def startElement( self, name, attrs ):
1172        """Memorize appropriate data from xml start tags"""
[3]1173
[770]1174        if name == 'GANGLIA_XML':
[32]1175
[782]1176            self.XMLSource      = str( attrs.get( 'SOURCE',  "" ) )
1177            self.gangliaVersion = str( attrs.get( 'VERSION', "" ) )
[32]1178
[770]1179            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]1180
[770]1181        elif name == 'GRID':
[32]1182
[770]1183            self.gridName    = str( attrs.get( 'NAME', "" ) )
[858]1184            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
[32]1185
[770]1186            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]1187
[770]1188        elif name == 'CLUSTER':
[32]1189
[782]1190            self.clusterName = str( attrs.get( 'NAME',      "" ) )
[770]1191            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
[32]1192
[770]1193            if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]1194
[770]1195                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]1196
[770]1197                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]1198
[770]1199        elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]1200
[782]1201            self.hostName     = str( attrs.get( 'NAME',     "" ) )
1202            self.hostIp       = str( attrs.get( 'IP',       "" ) )
1203            self.hostReported = str( attrs.get( 'REPORTED', "" ) )
[32]1204
[770]1205            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]1206
[770]1207        elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]1208
[770]1209            type = str( attrs.get( 'TYPE', "" ) )
1210           
1211            exclude_metric = False
1212           
1213            for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]1214
[770]1215                orig_name = str( attrs.get( 'NAME', "" ) )
[3]1216
[770]1217                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1218               
1219                    exclude_metric = True
[198]1220
[770]1221                elif re.match( ex_metricstr, orig_name ):
[198]1222
[770]1223                    exclude_metric = True
[198]1224
[770]1225            if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
[198]1226
[782]1227                myMetric         = { }
1228                myMetric['name'] = str( attrs.get( 'NAME', "" ) )
1229                myMetric['val']  = str( attrs.get( 'VAL',  "" ) )
1230                myMetric['time'] = self.hostReported
[3]1231
[770]1232                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]1233
[770]1234                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]1235
[770]1236    def storeMetrics( self ):
1237        """Store metrics of each cluster rrd handler"""
[9]1238
[770]1239        for clustername, rrdh in self.clusters.items():
[16]1240
[770]1241            ret = rrdh.storeMetrics()
[9]1242
[770]1243            if ret:
1244                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1245                return 1
[38]1246
[770]1247        return 0
[38]1248
[71]1249class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1250
[770]1251    def error( self, exception ):
1252        """Recoverable error"""
[71]1253
[770]1254        debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]1255
[770]1256    def fatalError( self, exception ):
1257        """Non-recoverable error"""
[71]1258
[770]1259        exception_str = str( exception )
[71]1260
[770]1261        # Ignore 'no element found' errors
1262        if exception_str.find( 'no element found' ) != -1:
1263            debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1264            return 0
[71]1265
[770]1266        debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1267        sys.exit( 1 )
[71]1268
[770]1269    def warning( self, exception ):
1270        """Warning"""
[71]1271
[770]1272        debug_msg( 0, 'Warning ' + str( exception ) )
[71]1273
[78]1274class XMLGatherer:
[770]1275    """Setup a connection and file object to Ganglia's XML"""
[3]1276
[858]1277    s           = None
1278    fd          = None
[770]1279    data        = None
1280    slot        = None
[8]1281
[770]1282    # Time since the last update
1283    #
1284    LAST_UPDATE    = 0
[287]1285
[770]1286    # Minimum interval between updates
1287    #
1288    MIN_UPDATE_INT    = 10
[287]1289
[770]1290    # Is a update occuring now
1291    #
1292    update_now    = False
[287]1293
[770]1294    def __init__( self, host, port ):
1295        """Store host and port for connection"""
[8]1296
[770]1297        self.host    = host
1298        self.port    = port
[782]1299        self.slot    = threading.Lock()
[3]1300
[770]1301        self.retrieveData()
[287]1302
[770]1303    def retrieveData( self ):
1304        """Setup connection to XML source"""
[8]1305
[858]1306        self.update_now = True
[287]1307
[770]1308        self.slot.acquire()
[293]1309
[858]1310        self.data       = None
[469]1311
[770]1312        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]1313
[770]1314            af, socktype, proto, canonname, sa = res
[32]1315
[770]1316            try:
[32]1317
[770]1318                self.s = socket.socket( af, socktype, proto )
[32]1319
[770]1320            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
[32]1321
[770]1322                self.s = None
1323                continue
[32]1324
[773]1325            try:
[32]1326
[770]1327                self.s.connect( sa )
[32]1328
[773]1329            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
[32]1330
[770]1331                self.disconnect()
1332                continue
[32]1333
[773]1334            break
[3]1335
[770]1336        if self.s is None:
[32]1337
[770]1338            debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1339            self.update_now    = False
1340            #sys.exit( 1 )
[5]1341
[770]1342        else:
1343            #self.s.send( '\n' )
[287]1344
[770]1345            my_fp            = self.s.makefile( 'r' )
[773]1346            my_data          = my_fp.readlines()
1347            my_data          = string.join( my_data, '' )
[287]1348
[770]1349            self.data        = my_data
[287]1350
[773]1351            self.LAST_UPDATE = time.time()
[287]1352
[770]1353        self.slot.release()
[293]1354
[770]1355        self.update_now    = False
[287]1356
[770]1357    def disconnect( self ):
1358        """Close socket"""
[33]1359
[770]1360        if self.s:
1361            #self.s.shutdown( 2 )
1362            self.s.close()
1363            self.s = None
[33]1364
[770]1365    def __del__( self ):
1366        """Kill the socket before we leave"""
[33]1367
[770]1368        self.disconnect()
[33]1369
[770]1370    def reGetData( self ):
1371        """Reconnect"""
[33]1372
[770]1373        while self.update_now:
[287]1374
[770]1375            # Must be another update in progress:
1376            # Wait until the update is complete
1377            #
1378            time.sleep( 1 )
[287]1379
[770]1380        if self.s:
1381            self.disconnect()
[33]1382
[770]1383        self.retrieveData()
[5]1384
[770]1385    def getData( self ):
[287]1386
[770]1387        """Return the XML data"""
[287]1388
[770]1389        # If more than MIN_UPDATE_INT seconds passed since last data update
1390        # update the XML first before returning it
1391        #
[287]1392
[770]1393        cur_time    = time.time()
[287]1394
[770]1395        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
[287]1396
[770]1397            self.reGetData()
[287]1398
[770]1399        while self.update_now:
[287]1400
[770]1401            # Must be another update in progress:
1402            # Wait until the update is complete
1403            #
1404            time.sleep( 1 )
1405           
1406        return self.data
[287]1407
[770]1408    def makeFileDescriptor( self ):
1409        """Make file descriptor that points to our socket connection"""
[70]1410
[770]1411        self.reconnect()
[70]1412
[770]1413        if self.s:
1414            self.fd = self.s.makefile( 'r' )
[70]1415
[770]1416    def getFileObject( self ):
1417        """Connect, and return a file object"""
[70]1418
[770]1419        self.makeFileDescriptor()
[78]1420
[770]1421        if self.fd:
1422            return self.fd
[70]1423
[78]1424class GangliaXMLProcessor( XMLProcessor ):
[770]1425    """Main class for processing XML and acting with it"""
[5]1426
[770]1427    def __init__( self, XMLSource, DataStore ):
1428        """Setup initial XML connection and handlers"""
[33]1429
[782]1430        self.config       = GangliaConfigParser( GMETAD_CONF )
1431        self.myXMLSource  = XMLSource
1432        self.ds           = DataStore
1433        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )
1434        self.myXMLError   = XMLErrorHandler()
[33]1435
[770]1436    def run( self ):
1437        """Main XML processing; start a xml and storethread"""
[8]1438
[782]1439        xml_thread   = threading.Thread( None, self.processXML,   'xmlthread' )
[770]1440        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]1441
[770]1442        while( 1 ):
[36]1443
[770]1444            if not xml_thread.isAlive():
1445                # Gather XML at the same interval as gmetad
[36]1446
[770]1447                # threaded call to: self.processXML()
1448                #
1449                try:
1450                    xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1451                    xml_thread.start()
1452                except thread.error, msg:
1453                    debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1454                    #return 1
[36]1455
[770]1456            if not store_thread.isAlive():
1457                # Store metrics every .. sec
[36]1458
[770]1459                # threaded call to: self.storeMetrics()
1460                #
1461                try:
1462                    store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1463                    store_thread.start()
1464                except thread.error, msg:
1465                    debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1466                    #return 1
1467       
1468            # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1469            time.sleep( 1 )   
[36]1470
[770]1471    def storeMetrics( self ):
1472        """Store metrics retained in memory to disk"""
[22]1473
[770]1474        global DEBUG_LEVEL
[365]1475
[770]1476        # Store metrics somewhere between every 360 and 640 seconds
1477        #
[774]1478        if DEBUG_LEVEL >= 1:
1479            STORE_INTERVAL = 60
[770]1480        else:
[857]1481            STORE_INTERVAL = random.randint( 300, 600 )
[22]1482
[770]1483        try:
1484            store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1485            store_metric_thread.start()
1486        except thread.error, msg:
1487            debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1488            return 1
[36]1489
[770]1490        debug_msg( 1, 'ganglia_store_thread(): started.' )
[169]1491
[770]1492        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1493        time.sleep( STORE_INTERVAL )
1494        debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]1495
[770]1496        if store_metric_thread.isAlive():
[36]1497
[782]1498            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[770]1499            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[36]1500
[782]1501            if store_metric_thread.isAlive():
1502
1503                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' )
1504            else:
1505                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' )
1506
[770]1507        debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]1508
[770]1509        return 0
[36]1510
[770]1511    def storeThread( self ):
1512        """Actual metric storing thread"""
[39]1513
[770]1514        debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1515        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[782]1516
[770]1517        ret = self.myXMLHandler.storeMetrics()
1518        if ret > 0:
1519            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[782]1520
[770]1521        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1522        debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1523       
1524        return 0
[39]1525
[770]1526    def processXML( self ):
1527        """Process XML"""
[8]1528
[770]1529        try:
1530            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1531            parsethread.start()
1532        except thread.error, msg:
1533            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1534            return 1
[8]1535
[770]1536        debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]1537
[770]1538        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1539        time.sleep( float( self.config.getLowestInterval() ) )   
1540        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]1541
[770]1542        if parsethread.isAlive():
[36]1543
[782]1544            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[770]1545            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[36]1546
[782]1547            if parsethread.isAlive():
1548                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' )
1549            else:
1550                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' )
1551
[770]1552        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]1553
[770]1554        return 0
[36]1555
[770]1556    def parseThread( self ):
1557        """Actual parsing thread"""
[39]1558
[770]1559        debug_msg( 1, 'ganglia_parse_thread(): started.' )
1560        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
1561       
1562        my_data    = self.myXMLSource.getData()
[176]1563
[782]1564        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) )
[293]1565
[770]1566        if my_data:
1567            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1568            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1569            debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
[176]1570
[770]1571        debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]1572
[770]1573        return 0
[39]1574
[9]1575class GangliaConfigParser:
1576
[770]1577    sources = [ ]
[9]1578
[770]1579    def __init__( self, config ):
1580        """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1581
[770]1582        self.config = config
1583        self.parseValues()
[9]1584
[770]1585    def parseValues( self ):
1586        """Parse certain values from gmetad.conf"""
[9]1587
[770]1588        readcfg = open( self.config, 'r' )
[9]1589
[770]1590        for line in readcfg.readlines():
[9]1591
[770]1592            if line.count( '"' ) > 1:
[9]1593
[770]1594                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1595
[855]1596                    source                = { }
1597                    source['name']        = line.split( '"' )[1]
1598                    source_value_words    = line.split( '"' )[2].split( ' ' )
[9]1599
[855]1600                    check_interval        = source_value_words[0]
[9]1601
[855]1602                    try:
[9]1603
[855]1604                        source['interval'] = int( check_interval )
1605                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], str( source['interval'] ) ) )
1606                    except ValueError:
[32]1607
[770]1608                        source['interval'] = 15
1609                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1610
[770]1611                    self.sources.append( source )
[9]1612
[855]1613        readcfg.close()
1614
1615    def clusterExists( self, source_name ):
1616
1617        for source in self.sources:
1618
1619            if source['name'] == source_name:
1620
1621                return True
1622
1623        return False
1624
[770]1625    def getInterval( self, source_name ):
1626        """Return interval for source_name"""
[32]1627
[770]1628        for source in self.sources:
[32]1629
[770]1630            if source['name'] == source_name:
[32]1631
[770]1632                return source['interval']
[32]1633
[770]1634        return None
[9]1635
[770]1636    def getLowestInterval( self ):
1637        """Return the lowest interval of all clusters"""
[34]1638
[770]1639        lowest_interval = 0
[34]1640
[770]1641        for source in self.sources:
[34]1642
[770]1643            if not lowest_interval or source['interval'] <= lowest_interval:
[34]1644
[770]1645                lowest_interval = source['interval']
[34]1646
[770]1647        # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1648        if lowest_interval:
1649            return lowest_interval
1650        else:
1651            return 15
[34]1652
[9]1653class RRDHandler:
[770]1654    """Class for handling RRD activity"""
[9]1655
[858]1656    myMetrics   = { }
1657    lastStored  = { }
[770]1658    timeserials = { }
1659    slot = None
[32]1660
[770]1661    def __init__( self, config, cluster ):
1662        """Setup initial variables"""
[78]1663
[770]1664        global MODRRDTOOL
[455]1665
[858]1666        self.block   = 0
1667        self.cluster = cluster
1668        self.config  = config
[770]1669        self.slot    = threading.Lock()
[292]1670
[770]1671        if MODRRDTOOL:
[455]1672
[770]1673            self.rrdm    = RRDMutator()
1674        else:
1675            self.rrdm    = RRDMutator( RRDTOOL )
[455]1676
[770]1677        global DEBUG_LEVEL
[9]1678
[770]1679        if DEBUG_LEVEL <= 2:
1680            self.gatherLastUpdates()
[365]1681
[770]1682    def gatherLastUpdates( self ):
1683        """Populate the lastStored list, containing timestamps of all last updates"""
[42]1684
[770]1685        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
[42]1686
[770]1687        hosts = [ ]
[42]1688
[770]1689        if os.path.exists( cluster_dir ):
[42]1690
[770]1691            dirlist = os.listdir( cluster_dir )
[42]1692
[770]1693            for dir in dirlist:
[42]1694
[770]1695                hosts.append( dir )
[42]1696
[770]1697        for host in hosts:
[42]1698
[770]1699            host_dir    = cluster_dir + '/' + host
[858]1700            dirlist     = os.listdir( host_dir )
[47]1701
[770]1702            for dir in dirlist:
[47]1703
[770]1704                if not self.timeserials.has_key( host ):
[47]1705
[770]1706                    self.timeserials[ host ] = [ ]
[47]1707
[770]1708                self.timeserials[ host ].append( dir )
[47]1709
[770]1710            last_serial = self.getLastRrdTimeSerial( host )
[292]1711
[770]1712            if last_serial:
[42]1713
[770]1714                metric_dir = cluster_dir + '/' + host + '/' + last_serial
[292]1715
[770]1716                if os.path.exists( metric_dir ):
[42]1717
[770]1718                    dirlist = os.listdir( metric_dir )
[42]1719
[770]1720                    for file in dirlist:
[42]1721
[770]1722                        metricname = file.split( '.rrd' )[0]
[42]1723
[770]1724                        if not self.lastStored.has_key( host ):
[42]1725
[770]1726                            self.lastStored[ host ] = { }
[42]1727
[770]1728                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1729
[770]1730    def getClusterName( self ):
1731        """Return clustername"""
[63]1732
[770]1733        return self.cluster
[32]1734
[770]1735    def memMetric( self, host, metric ):
1736        """Store metric from host in memory"""
[32]1737
[770]1738        # <ATOMIC>
1739        #
1740        self.slot.acquire()
1741       
1742        if self.myMetrics.has_key( host ):
[32]1743
[770]1744            if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1745
[770]1746                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1747
[770]1748                    if mymetric['time'] == metric['time']:
[32]1749
[770]1750                        # Allready have this metric, abort
1751                        self.slot.release()
1752                        return 1
1753            else:
1754                self.myMetrics[ host ][ metric['name'] ] = [ ]
1755        else:
[858]1756            self.myMetrics[ host ]                   = { }
1757            self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]1758
[770]1759        # Push new metric onto stack
1760        # atomic code; only 1 thread at a time may access the stack
[63]1761
[770]1762        self.myMetrics[ host ][ metric['name'] ].append( metric )
[32]1763
[770]1764        self.slot.release()
1765        #
1766        # </ATOMIC>
[40]1767
[770]1768    def makeUpdateList( self, host, metriclist ):
1769        """
1770        Make a list of update values for rrdupdate
1771        but only those that we didn't store before
1772        """
[37]1773
[770]1774        update_list    = [ ]
1775        metric        = None
[37]1776
[770]1777        while len( metriclist ) > 0:
[37]1778
[770]1779            metric = metriclist.pop( 0 )
[37]1780
[770]1781            if self.checkStoreMetric( host, metric ):
[292]1782
[770]1783                u_val    = str( metric['time'] ) + ':' + str( metric['val'] )
1784                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1785                update_list.append( u_val )
[40]1786
[770]1787        return update_list
[37]1788
[770]1789    def checkStoreMetric( self, host, metric ):
1790        """Check if supplied metric if newer than last one stored"""
[40]1791
[770]1792        if self.lastStored.has_key( host ):
[40]1793
[770]1794            if self.lastStored[ host ].has_key( metric['name'] ):
[40]1795
[770]1796                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1797
[770]1798                    # This is old
1799                    return 0
[40]1800
[770]1801        return 1
[50]1802
[770]1803    def memLastUpdate( self, host, metricname, metriclist ):
1804        """
1805        Memorize the time of the latest metric from metriclist
1806        but only if it wasn't allready memorized
1807        """
[50]1808
[770]1809        if not self.lastStored.has_key( host ):
1810            self.lastStored[ host ] = { }
[54]1811
[770]1812        last_update_time = 0
[50]1813
[770]1814        for metric in metriclist:
[50]1815
[770]1816            if metric['name'] == metricname:
[50]1817
[770]1818                if metric['time'] > last_update_time:
[50]1819
[770]1820                    last_update_time = metric['time']
[40]1821
[770]1822        if self.lastStored[ host ].has_key( metricname ):
1823           
1824            if last_update_time <= self.lastStored[ host ][ metricname ]:
1825                return 1
[40]1826
[770]1827        self.lastStored[ host ][ metricname ] = last_update_time
[52]1828
[770]1829    def storeMetrics( self ):
1830        """
1831        Store all metrics from memory to disk
1832        and do it to the RRD's in appropriate timeperiod directory
1833        """
[33]1834
[770]1835        debug_msg( 5, "Entering storeMetrics()")
[365]1836
[782]1837        count_values  = 0
1838        count_metrics = 0
[770]1839        count_bits    = 0
[365]1840
[770]1841        for hostname, mymetrics in self.myMetrics.items():   
[33]1842
[770]1843            for metricname, mymetric in mymetrics.items():
[33]1844
[770]1845                count_metrics += 1
[365]1846
[770]1847                for dmetric in mymetric:
[365]1848
[770]1849                    count_values += 1
[365]1850
[782]1851                    count_bits   += len( dmetric['time'] )
1852                    count_bits   += len( dmetric['val'] )
[365]1853
[770]1854        count_bytes    = count_bits / 8
[365]1855
[770]1856        debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1857            str( len( self.myMetrics.keys() ) ) + " hosts " + 
1858            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1859            str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
[365]1860
[770]1861        for hostname, mymetrics in self.myMetrics.items():   
[365]1862
[770]1863            for metricname, mymetric in mymetrics.items():
[365]1864
[770]1865                metrics_to_store = [ ]
[53]1866
[770]1867                # Pop metrics from stack for storing until none is left
1868                # atomic code: only 1 thread at a time may access myMetrics
[63]1869
[770]1870                # <ATOMIC>
1871                #
1872                self.slot.acquire() 
[33]1873
[770]1874                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1875
[770]1876                    if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1877
[770]1878                        try:
1879                            metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1880                        except IndexError, msg:
[176]1881
[770]1882                            # Somehow sometimes myMetrics[ hostname ][ metricname ]
1883                            # is still len 0 when the statement is executed.
1884                            # Just ignore indexerror's..
1885                            pass
[176]1886
[770]1887                self.slot.release()
1888                #
1889                # </ATOMIC>
[53]1890
[770]1891                # Create a mapping table, each metric to the period where it should be stored
1892                #
1893                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1894
[770]1895                update_rets = [ ]
[50]1896
[770]1897                for period, pmetric in metric_serial_table.items():
[47]1898
[770]1899                    create_ret = self.createCheck( hostname, metricname, period )   
[47]1900
[770]1901                    update_ret = self.update( hostname, metricname, period, pmetric )
[47]1902
[770]1903                    if update_ret == 0:
[47]1904
[770]1905                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1906                    else:
1907                        debug_msg( 9, 'metric update failed' )
[47]1908
[770]1909                    update_rets.append( create_ret )
1910                    update_rets.append( update_ret )
[47]1911
[770]1912                # Lets ignore errors here for now, we need to make sure last update time
1913                # is correct!
1914                #
1915                #if not (1) in update_rets:
[50]1916
[770]1917                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1918
[770]1919        debug_msg( 5, "Leaving storeMetrics()")
[365]1920
[770]1921    def makeTimeSerial( self ):
1922        """Generate a time serial. Seconds since epoch"""
[17]1923
[770]1924        # Seconds since epoch
1925        mytime = int( time.time() )
[17]1926
[770]1927        return mytime
[17]1928
[770]1929    def makeRrdPath( self, host, metricname, timeserial ):
1930        """Make a RRD location/path and filename"""
[17]1931
[858]1932        rrd_dir  = '%s/%s/%s/%s'    %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1933        rrd_file = '%s/%s.rrd'    %( rrd_dir, metricname )
[17]1934
[770]1935        return rrd_dir, rrd_file
[17]1936
[770]1937    def getLastRrdTimeSerial( self, host ):
1938        """Find the last timeserial (directory) for this host"""
[17]1939
[770]1940        newest_timeserial = 0
[19]1941
[770]1942        for dir in self.timeserials[ host ]:
[32]1943
[770]1944            valid_dir = 1
[17]1945
[770]1946            for letter in dir:
1947                if letter not in string.digits:
1948                    valid_dir = 0
[17]1949
[770]1950            if valid_dir:
1951                timeserial = dir
1952                if timeserial > newest_timeserial:
1953                    newest_timeserial = timeserial
[17]1954
[770]1955        if newest_timeserial:
1956            return newest_timeserial
1957        else:
1958            return 0
[17]1959
[770]1960    def determinePeriod( self, host, check_serial ):
1961        """Determine to which period (directory) this time(serial) belongs"""
[47]1962
[770]1963        period_serial = 0
[47]1964
[770]1965        if self.timeserials.has_key( host ):
[47]1966
[770]1967            for serial in self.timeserials[ host ]:
[47]1968
[770]1969                if check_serial >= serial and period_serial < serial:
[47]1970
[770]1971                    period_serial = serial
[56]1972
[770]1973        return period_serial
[47]1974
[770]1975    def determineSerials( self, host, metricname, metriclist ):
1976        """
1977        Determine the correct serial and corresponding rrd to store
1978        for a list of metrics
1979        """
[47]1980
[770]1981        metric_serial_table = { }
[47]1982
[770]1983        for metric in metriclist:
[47]1984
[770]1985            if metric['name'] == metricname:
[47]1986
[858]1987                period       = self.determinePeriod( host, metric['time'] )   
[47]1988
[858]1989                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[47]1990
[770]1991                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1992
[770]1993                    # This one should get it's own new period
1994                    period = metric['time']
[57]1995
[770]1996                    if not self.timeserials.has_key( host ):
1997                        self.timeserials[ host ] = [ ]
[57]1998
[770]1999                    self.timeserials[ host ].append( period )
[47]2000
[770]2001                if not metric_serial_table.has_key( period ):
[47]2002
[770]2003                    metric_serial_table[ period ] = [ ]
[47]2004
[770]2005                metric_serial_table[ period ].append( metric )
[47]2006
[770]2007        return metric_serial_table
[47]2008
[770]2009    def createCheck( self, host, metricname, timeserial ):
2010        """Check if an rrd allready exists for this metric, create if not"""
[9]2011
[770]2012        debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
2013       
2014        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]2015
[770]2016        if not os.path.exists( rrd_dir ):
[58]2017
[770]2018            try:
2019                os.makedirs( rrd_dir )
[58]2020
[770]2021            except os.OSError, msg:
[58]2022
[770]2023                if msg.find( 'File exists' ) != -1:
[58]2024
[770]2025                    # Ignore exists errors
2026                    pass
[58]2027
[770]2028                else:
[58]2029
[770]2030                    print msg
2031                    return
[58]2032
[770]2033            debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]2034
[770]2035        if not os.path.exists( rrd_file ):
[9]2036
[855]2037            interval     = self.config.getInterval( self.cluster )
[770]2038            heartbeat    = 8 * int( interval )
[9]2039
[858]2040            params       = [ ]
[12]2041
[770]2042            params.append( '--step' )
2043            params.append( str( interval ) )
[12]2044
[770]2045            params.append( '--start' )
2046            params.append( str( int( timeserial ) - 1 ) )
[12]2047
[770]2048            params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
2049            params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]2050
[770]2051            self.rrdm.create( str(rrd_file), params )
[37]2052
[770]2053            debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
[14]2054
[770]2055    def update( self, host, metricname, timeserial, metriclist ):
2056        """
2057        Update rrd file for host with metricname
2058        in directory timeserial with metriclist
2059        """
[9]2060
[770]2061        debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]2062
[858]2063        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]2064
[858]2065        update_list       = self.makeUpdateList( host, metriclist )
[15]2066
[770]2067        if len( update_list ) > 0:
2068            ret = self.rrdm.update( str(rrd_file), update_list )
[32]2069
[770]2070            if ret:
2071                return 1
2072       
2073            debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]2074
[770]2075        return 0
[36]2076
[169]2077def daemon():
[770]2078    """daemonized threading"""
[8]2079
[770]2080    # Fork the first child
2081    #
2082    pid = os.fork()
[169]2083
[770]2084    if pid > 0:
[169]2085
[770]2086        sys.exit(0)  # end parent
[169]2087
[770]2088    # creates a session and sets the process group ID
2089    #
2090    os.setsid()
[169]2091
[770]2092    # Fork the second child
2093    #
2094    pid = os.fork()
[169]2095
[770]2096    if pid > 0:
[169]2097
[770]2098        sys.exit(0)  # end parent
[169]2099
[770]2100    write_pidfile()
[435]2101
[770]2102    # Go to the root directory and set the umask
2103    #
2104    os.chdir('/')
2105    os.umask(0)
[169]2106
[770]2107    sys.stdin.close()
2108    sys.stdout.close()
2109    sys.stderr.close()
[169]2110
[770]2111    os.open('/dev/null', os.O_RDWR)
2112    os.dup2(0, 1)
2113    os.dup2(0, 2)
[169]2114
[770]2115    run()
[169]2116
2117def run():
[770]2118    """Threading start"""
[169]2119
[855]2120    global ARCHIVE_DATASOURCES
2121
[782]2122    config             = GangliaConfigParser( GMETAD_CONF )
[855]2123
2124    for ds in ARCHIVE_DATASOURCES:
2125
2126        if not config.clusterExists( ds ):
2127
2128            print "FATAL ERROR: Data source with name '%s' not found in %s" %( ds, GMETAD_CONF )
2129            sys.exit( 1 )
2130
[782]2131    s_timeout          = int( config.getLowestInterval() - 1 )
[469]2132
[770]2133    socket.setdefaulttimeout( s_timeout )
[469]2134
[770]2135    myXMLSource        = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
2136    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[8]2137
[782]2138    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
2139    myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore )
[287]2140
[770]2141    try:
[782]2142        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
2143        ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]2144
[782]2145        job_xml_thread.start()
[770]2146        ganglia_xml_thread.start()
2147       
2148    except thread.error, msg:
2149        debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
2150        syslog.closelog()
2151        sys.exit(1)
2152       
2153    debug_msg( 0, 'main threading started.' )
[78]2154
[169]2155def main():
[770]2156    """Program startup"""
[169]2157
[770]2158    global DAEMONIZE, USE_SYSLOG
[375]2159
[770]2160    if not processArgs( sys.argv[1:] ):
2161        sys.exit( 1 )
[214]2162
[770]2163    if( DAEMONIZE and USE_SYSLOG ):
2164        syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[169]2165
[770]2166    if DAEMONIZE:
2167        daemon()
2168    else:
2169        run()
[169]2170
2171#
[81]2172# Global functions
[169]2173#
[81]2174
[9]2175def check_dir( directory ):
[770]2176    """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]2177
[770]2178    if directory[-1] == '/':
2179        directory = directory[:-1]
[9]2180
[770]2181    return directory
[9]2182
[295]2183def reqtime2epoch( rtime ):
2184
[770]2185    (hours, minutes, seconds )    = rtime.split( ':' )
[295]2186
[770]2187    etime    = int(seconds)
2188    etime    = etime + ( int(minutes) * 60 )
2189    etime    = etime + ( int(hours) * 60 * 60 )
[295]2190
[770]2191    return etime
[295]2192
[12]2193def debug_msg( level, msg ):
[770]2194    """Only print msg if correct levels"""
[12]2195
[770]2196    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2197        sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2198   
2199    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2200        syslog.syslog( msg )
[12]2201
[46]2202def printTime( ):
[770]2203    """Print current time in human readable format"""
[46]2204
[770]2205    return time.strftime("%a %d %b %Y %H:%M:%S")
[46]2206
[435]2207def write_pidfile():
2208
[770]2209    # Write pidfile if PIDFILE exists
2210    if PIDFILE:
[435]2211
[770]2212        pid     = os.getpid()
[435]2213
[770]2214        pidfile = open(PIDFILE, 'w')
[435]2215
[770]2216        pidfile.write( str( pid ) )
2217        pidfile.close()
[435]2218
[63]2219# Ooohh, someone started me! Let's go..
[469]2220#
[9]2221if __name__ == '__main__':
[770]2222    main()
Note: See TracBrowser for help on using the repository browser.