source: branches/1.2/jobarchived/jobarchived.py @ 949

Last change on this file since 949 was 949, checked in by ramonb, 10 years ago

1.1/jobarchived/jobarchived.py:

  • reverted change that was supposed to go in 1.2

1.2/jobarchived/jobarchived.py:

  • see #173
  • changed threading: now each cluster gets it's own ganglia xml/store threads
  • added some yappi profiling functions for debugging
  • better debug statements
  • many performance improvements:
    • now use deque collections in stead of lists for storing metrics: faster appends and pops
    • remove some typecasts
    • replaced xml readlines() with read()
    • XMLDataGatherer now truly caches and prevents unnecessary data retrieval
    • replaced some if statements with catch/excepts: is faster
    • disabled thread locking
    • excluded metrics are now ignored while storing: not while parsing, and matched with compiled regexp
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 67.3 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
[771]5# Copyright (C) 2006-2013  Ramon Bastiaans
[225]6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 949 2014-01-20 16:04:55Z ramonb $
22#
[3]23
[466]24import getopt, syslog, ConfigParser, sys
[949]25from collections import deque
26import time
27from pprint import pprint
28#import yappi
[3]29
[949]30#yappi.start()
31
32try:
33    from resource import getrusage, RUSAGE_SELF
34except ImportError:
35    RUSAGE_SELF = 0
36    def getrusage(who=0):
37        return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0
38
39p_stats = None
40p_start_time = None
41
42def profiler(frame, event, arg):
43    if event not in ('call','return'): return profiler
44    #### gather stats ####
45    rusage = getrusage(RUSAGE_SELF)
46    t_cpu = rusage[0] + rusage[1] # user time + system time
47    code = frame.f_code
48    fun = (code.co_name, code.co_filename, code.co_firstlineno)
49    #### get stack with functions entry stats ####
50    ct = threading.currentThread()
51    try:
52        p_stack = ct.p_stack
53    except AttributeError:
54        ct.p_stack = deque()
55        p_stack = ct.p_stack
56    #### handle call and return ####
57    if event == 'call':
58        p_stack.append((time.time(), t_cpu, fun))
59    elif event == 'return':
60        try:
61            t,t_cpu_prev,f = p_stack.pop()
62            assert f == fun
63        except IndexError: # TODO investigate
64            t,t_cpu_prev,f = p_start_time, 0.0, None
65        call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))
66        p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)
67    return profiler
68
69
70def profile_on():
71    global p_stats, p_start_time
72    p_stats = {}
73    p_start_time = time.time()
74    threading.setprofile(profiler)
75    sys.setprofile(profiler)
76
77    debug_msg( 1, 'profile_on(): profiling..' )
78
79def profile_off():
80    threading.setprofile(None)
81    sys.setprofile(None)
82    debug_msg( 1, 'profile_on(): profiling ended..' )
83
84def get_profile_stats():
85    """
86    returns dict[function_tuple] -> stats_tuple
87    where
88      function_tuple = (function_name, filename, lineno)
89      stats_tuple = (call_cnt, real_time, cpu_time)
90    """
91    debug_msg( 1, 'get_profile_stats(): dumping stats..' )
92    return p_stats
93
[913]94VERSION='__VERSION__'
[284]95
[466]96def usage( ver ):
[284]97
[770]98    print 'jobarchived %s' %VERSION
[284]99
[770]100    if ver:
101        return 0
[284]102
[770]103    print
104    print 'Purpose:'
105    print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
106    print '  and node statistics in a RRD archive'
107    print
108    print 'Usage:    jobarchived [OPTIONS]'
109    print
110    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobarchived.conf)'
111    print '  -p, --pidfile=FILE    Use pid file to store the process id'
112    print '  -h, --help        Print help and exit'
113    print '  -v, --version        Print version and exit'
114    print
[435]115
[214]116def processArgs( args ):
[6]117
[858]118    SHORT_L   = 'p:hvc:'
[773]119    LONG_L    = [ 'help', 'config=', 'pidfile=', 'version' ]
[169]120
[773]121    config_filename = '/etc/jobarchived.conf'
[169]122
[770]123    global PIDFILE
[435]124
[858]125    PIDFILE   = None
[435]126
[773]127    try:
[169]128
[773]129        opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]130
[773]131    except getopt.error, detail:
[60]132
[773]133        print detail
134        sys.exit(1)
[9]135
[773]136    for opt, value in opts:
[60]137
[773]138        if opt in [ '--config', '-c' ]:
[13]139
[773]140            config_filename = value
[198]141
[770]142        if opt in [ '--pidfile', '-p' ]:
[435]143
[770]144            PIDFILE         = value
[435]145
[770]146        if opt in [ '--help', '-h' ]:
[435]147
[770]148            usage( False )
149            sys.exit( 0 )
[435]150
[770]151        if opt in [ '--version', '-v' ]:
[60]152
[770]153            usage( True )
154            sys.exit( 0 )
[22]155
[770]156    try:
157        return loadConfig( config_filename )
[13]158
[770]159    except ConfigParser.NoOptionError, detail:
[214]160
[770]161        print detail
162        sys.exit( 1 )
[214]163
164def loadConfig( filename ):
165
[770]166    def getlist( cfg_string ):
[214]167
[770]168        my_list = [ ]
[214]169
[770]170        for item_txt in cfg_string.split( ',' ):
[214]171
[770]172            sep_char = None
[214]173
[770]174            item_txt = item_txt.strip()
[214]175
[770]176            for s_char in [ "'", '"' ]:
[214]177
[770]178                if item_txt.find( s_char ) != -1:
[214]179
[770]180                    if item_txt.count( s_char ) != 2:
[214]181
[770]182                        print 'Missing quote: %s' %item_txt
183                        sys.exit( 1 )
[214]184
[770]185                    else:
[214]186
[770]187                        sep_char = s_char
188                        break
[214]189
[770]190            if sep_char:
[214]191
[770]192                item_txt = item_txt.split( sep_char )[1]
[214]193
[770]194            my_list.append( item_txt )
[214]195
[770]196        return my_list
[214]197
[770]198    cfg = ConfigParser.ConfigParser()
[214]199
[770]200    cfg.read( filename )
[214]201
[770]202    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
203    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
[774]204    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL, JOB_SQL_PASSWORD, JOB_SQL_USER
[214]205
[858]206    ARCHIVE_PATH           = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
[214]207
[858]208    ARCHIVE_HOURS_PER_RRD  = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
[214]209
[858]210    DEBUG_LEVEL            = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[214]211
[858]212    USE_SYSLOG             = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[214]213
[858]214    SYSLOG_LEVEL           = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[214]215
[858]216    MODRRDTOOL             = False
[375]217
[770]218    try:
219        global rrdtool
220        import rrdtool
[214]221
[770]222        MODRRDTOOL        = True
[375]223
[770]224    except ImportError:
[375]225
[770]226        MODRRDTOOL        = False
[375]227
[770]228        print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
[375]229
[770]230        RRDTOOL            = cfg.get( 'DEFAULT', 'RRDTOOL' )
[455]231
[770]232    try:
[375]233
[770]234        SYSLOG_FACILITY    = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[214]235
[770]236    except AttributeError, detail:
[214]237
[770]238        print 'Unknown syslog facility'
239        sys.exit( 1 )
[214]240
[858]241    GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
[214]242
[858]243    ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
[214]244
[858]245    ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
[214]246
[858]247    ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
[214]248
[858]249    JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
250    JOB_SQL_USER            = cfg.get( 'DEFAULT', 'JOB_SQL_USER' )
[774]251    JOB_SQL_PASSWORD        = cfg.get( 'DEFAULT', 'JOB_SQL_PASSWORD' )
[214]252
[858]253    JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
[295]254
[858]255    DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[214]256
[770]257    return True
[214]258
[17]259# What XML data types not to store
[13]260#
[17]261UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]262
[47]263# Maximum time (in seconds) a parsethread may run
264#
265PARSE_TIMEOUT = 60
266
267# Maximum time (in seconds) a storethread may run
268#
269STORE_TIMEOUT = 360
270
[8]271"""
[224]272The Job Archiving Daemon
[8]273"""
274
[214]275from types import *
276
277import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
278
[486]279try:
[773]280    import psycopg2
[486]281
282except ImportError, details:
283
[773]284    print "FATAL ERROR: psycopg2 python module not found"
[770]285    sys.exit( 1 )
[486]286
[379]287class InitVars:
288        Vars = {}
289       
290        def __init__(self, **key_arg):
291                for (key, value) in key_arg.items():
292                        if value:
293                                self.Vars[key] = value
294                        else:   
295                                self.Vars[key] = None
296                               
297        def __call__(self, *key):
298                key = "%s" % key
299                return self.Vars[key]
300               
301        def __getitem__(self, key):
302                return self.Vars[key]
303               
304        def __repr__(self):
305                return repr(self.Vars)
306               
307        def keys(self):
308                barf =  map(None, self.Vars.keys())
309                return barf
310               
311        def values(self):
312                barf =  map(None, self.Vars.values())
313                return barf
314               
315        def has_key(self, key):
316                if self.Vars.has_key(key):
317                        return 1
318                else:   
319                        return 0
320                       
321class DBError(Exception):
322        def __init__(self, msg=''):
323                self.msg = msg
324                Exception.__init__(self, msg)
325        def __repr__(self):
326                return self.msg
327        __str__ = __repr__
328
329#
330# Class to connect to a database
331# and return the queury in a list or dictionairy.
332#
333class DB:
[773]334    def __init__(self, db_vars):
[379]335
[773]336        self.dict = db_vars
[379]337
[773]338        if self.dict.has_key('User'):
339            self.user = self.dict['User']
340        else:
341            self.user = 'postgres'
[379]342
[773]343        if self.dict.has_key('Host'):
344            self.host = self.dict['Host']
345        else:
346            self.host = 'localhost'
[379]347
[773]348        if self.dict.has_key('Password'):
349            self.passwd = self.dict['Password']
350        else:
351            self.passwd = ''
[379]352
[773]353        if self.dict.has_key('DataBaseName'):
354            self.db = self.dict['DataBaseName']
355        else:
356            self.db = 'jobarchive'
[379]357
[773]358        # connect_string = 'host:port:database:user:password:
359        dsn = "host='%s' dbname='%s' user='%s' password='%s'" %(self.host, self.db, self.user, self.passwd)
[379]360
[773]361        try:
362            self.SQL = psycopg2.connect(dsn)
363        except psycopg2.Error, details:
364            str = "%s" %details
365            raise DBError(str)
[379]366
[773]367    def __repr__(self):
368        return repr(self.result)
[379]369
[773]370    def __nonzero__(self):
371        return not(self.result == None)
[379]372
[773]373    def __len__(self):
374        return len(self.result)
[379]375
[773]376    def __getitem__(self,i):
377        return self.result[i]
[379]378
[773]379    def __getslice__(self,i,j):
380        return self.result[i:j]
[379]381
[773]382    def Get(self, q_str):
383        c = self.SQL.cursor()
384        try:
385            c.execute(q_str)
386            result = c.fetchall()
387        except psycopg2.Error, details:
388            c.close()
389            str = "%s" %details
390            raise DBError(str)
[379]391
[773]392        c.close()
393        return result
[379]394
[773]395    def Set(self, q_str):
396        c = self.SQL.cursor()
397        try:
398            c.execute(q_str)
[379]399
[773]400        except psycopg2.Error, details:
401            c.close()
402            str = "%s" %details
403            raise DBError(str)
[379]404
[773]405        c.close()
[774]406        return True
[379]407
[773]408    def Commit(self):
[379]409
[792]410        return self.SQL.commit()
411
412    def Rollback( self ):
413
414        return self.SQL.rollback()
415
[84]416class DataSQLStore:
417
[770]418    db_vars = None
419    dbc = None
[84]420
[770]421    def __init__( self, hostname, database ):
[84]422
[774]423        global JOB_SQL_USER, JOB_SQL_PASSWORD
424
[770]425        self.db_vars = InitVars(DataBaseName=database,
[774]426                User=JOB_SQL_USER,
[770]427                Host=hostname,
[774]428                Password=JOB_SQL_PASSWORD,
[770]429                Dictionary='true')
[84]430
[770]431        try:
432            self.dbc     = DB(self.db_vars)
433        except DBError, details:
434            debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
435            sys.exit(1)
[84]436
[770]437    def setDatabase(self, statement):
[792]438
[770]439        ret = self.doDatabase('set', statement)
440        return ret
441       
442    def getDatabase(self, statement):
[792]443
[770]444        ret = self.doDatabase('get', statement)
445        return ret
[84]446
[792]447    def doCommit( self ):
448
449        return self.dbc.Commit()
450
451    def doRollback( self ):
452
453        return self.dbc.Rollback()
454
[770]455    def doDatabase(self, type, statement):
[84]456
[770]457        debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
458        try:
459            if type == 'set':
460                result = self.dbc.Set( statement )
461            elif type == 'get':
462                result = self.dbc.Get( statement )
463               
464        except DBError, detail:
465            operation = statement.split(' ')[0]
[792]466            debug_msg( 0, 'ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
467            return False
[84]468
[770]469        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
470        return result
[84]471
[770]472    def getJobNodeId( self, job_id, node_id ):
[191]473
[770]474        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
[792]475        if not id:
476            return False
477
[770]478        if len( id ) > 0:
[191]479
[770]480            if len( id[0] ) > 0 and id[0] != '':
481           
[792]482                return True
[191]483
[792]484        return False
[191]485
[770]486    def getNodeId( self, hostname ):
[84]487
[770]488        id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]489
[770]490        if len( id ) > 0:
[84]491
[770]492            id = id[0][0]
[89]493
[770]494            return id
495        else:
496            return None
[84]497
[770]498    def getNodeIds( self, hostnames ):
[84]499
[770]500        ids = [ ]
[84]501
[770]502        for node in hostnames:
[84]503
[770]504            id = self.getNodeId( node )
[84]505
[770]506            if id:
507                ids.append( id )
[84]508
[770]509        return ids
[84]510
[770]511    def getJobId( self, jobid ):
[84]512
[770]513        id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
[84]514
[770]515        if id:
516            id = id[0][0]
[84]517
[770]518            return id
519        else:
520            return None
[84]521
[770]522    def addJob( self, job_id, jobattrs ):
[84]523
[770]524        if not self.getJobId( job_id ):
[84]525
[792]526            return self.mutateJob( 'insert', job_id, jobattrs ) 
[770]527        else:
[792]528            return self.mutateJob( 'update', job_id, jobattrs )
[84]529
[770]530    def mutateJob( self, action, job_id, jobattrs ):
[84]531
[782]532        job_values     = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
[84]533
[782]534        insert_col_str = 'job_id'
535        insert_val_str = "'%s'" %job_id
536        update_str     = None
[84]537
[770]538        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
[84]539
[770]540        ids = [ ]
[99]541
[770]542        for valname, value in jobattrs.items():
[84]543
[770]544            if valname in job_values and value != '':
[84]545
[770]546                column_name = 'job_' + valname
[84]547
[770]548                if action == 'insert':
[84]549
[770]550                    if not insert_col_str:
551                        insert_col_str = column_name
552                    else:
553                        insert_col_str = insert_col_str + ',' + column_name
[84]554
[770]555                    if not insert_val_str:
556                        insert_val_str = value
557                    else:
558                        insert_val_str = insert_val_str + ",'%s'" %value
[84]559
[770]560                elif action == 'update':
561                   
562                    if not update_str:
563                        update_str = "%s='%s'" %(column_name, value)
564                    else:
565                        update_str = update_str + ",%s='%s'" %(column_name, value)
[84]566
[770]567            elif valname == 'nodes' and value:
[84]568
[770]569                node_valid = 1
[190]570
[770]571                if len(value) == 1:
572               
573                    if jobattrs['status'] == 'Q':
[190]574
[770]575                        node_valid = 0
[190]576
[770]577                    else:
[190]578
[770]579                        node_valid = 0
[190]580
[770]581                        for node_char in str(value[0]):
[190]582
[770]583                            if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]584
[770]585                                node_valid = 1
[84]586
[770]587                if node_valid:
[191]588
[770]589                    ids = self.addNodes( value, jobattrs['domain'] )
[191]590
[770]591        if action == 'insert':
[84]592
[792]593            db_ret = self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]594
[770]595        elif action == 'update':
[84]596
[792]597            db_ret = self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
[84]598
[770]599        if len( ids ) > 0:
600            self.addJobNodes( job_id, ids )
[190]601
[792]602        return db_ret
603
[770]604    def addNodes( self, hostnames, domain ):
[84]605
[770]606        ids = [ ]
[98]607
[770]608        for node in hostnames:
[84]609
[770]610            node    = '%s.%s' %( node, domain )
611            id    = self.getNodeId( node )
612   
613            if not id:
614                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
615                id = self.getNodeId( node )
[84]616
[770]617            ids.append( id )
[98]618
[770]619        return ids
[98]620
[770]621    def addJobNodes( self, jobid, nodes ):
[86]622
[770]623        for node in nodes:
[86]624
[770]625            if not self.getJobNodeId( jobid, node ):
[191]626
[770]627                self.addJobNode( jobid, node )
[191]628
[770]629    def addJobNode( self, jobid, nodeid ):
[84]630
[774]631        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( '%s',%s )" %(jobid, nodeid) )
[84]632
[770]633    def storeJobInfo( self, jobid, jobattrs ):
[84]634
[792]635        return self.addJob( jobid, jobattrs )
[84]636
[857]637    def checkTimedoutJobs( self ):
[295]638
[857]639        debug_msg( 1, 'Housekeeping: checking database for timed out jobs..' )
640
[770]641        # Locate all jobs in the database that are not set to finished
642        #
643        q = "SELECT * from jobs WHERE job_status != 'F'"
[295]644
[770]645        r = self.getDatabase( q )
[295]646
[770]647        if len( r ) == 0:
[295]648
[770]649            return None
[295]650
[857]651        timeoutjobs  = [ ]
[295]652
[857]653        jobtimeout_sec = JOB_TIMEOUT * (60 * 60)
654        cur_time       = time.time()
[295]655
[770]656        for row in r:
[295]657
[857]658            job_id              = row[0]
659            job_requested_time  = row[4]
660            job_status          = row[7]
661            job_start_timestamp = row[8]
[295]662
[770]663            # If it was set to queued and we didn't see it started
664            # there's not point in keeping it around
665            #
[857]666            if job_status == 'R' and job_start_timestamp:
[295]667
[770]668                start_timestamp = int( job_start_timestamp )
[295]669
[770]670                # If it was set to running longer than JOB_TIMEOUT
671                # close the job: it probably finished while we were not running
672                #
673                if ( cur_time - start_timestamp ) > jobtimeout_sec:
[295]674
[770]675                    if job_requested_time:
[295]676
[770]677                        rtime_epoch    = reqtime2epoch( job_requested_time )
678                    else:
679                        rtime_epoch    = None
680                   
681                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
[295]682
[857]683        debug_msg( 1, 'Housekeeping: Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
[295]684
[857]685        ret_jobids_clean = [ ]
[295]686
[770]687        # Close these jobs in the database
688        # update the stop_timestamp to: start_timestamp + requested wallclock
689        # and set state: finished
690        #
691        for j in timeoutjobs:
[295]692
[770]693            ( i, s, r )        = j
[295]694
[770]695            if r:
696                new_end_timestamp    = int( s ) + r
[295]697
[857]698                q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
699                self.setDatabase( q )
700            else:
701
702                # Requested walltime unknown: cannot guess end time: sorry delete them
703                q = "DELETE FROM jobs WHERE job_id = '" + str( i ) + "'"
704                self.setDatabase( q )
705
706            ret_jobids_clean.append( i )
707
708        debug_msg( 1, 'Housekeeping: done.' )
709
710        return ret_jobids_clean
711
712    def checkStaleJobs( self ):
713
714        debug_msg( 1, 'Housekeeping: checking database for stale jobs..' )
715
716        # Locate all jobs in the database that are not set to finished
717        #
718        q = "SELECT * from jobs WHERE job_status != 'F'"
719
720        r = self.getDatabase( q )
721
722        if len( r ) == 0:
723
724            return None
725
726        cleanjobs      = [ ]
727
728        cur_time       = time.time()
729
730        for row in r:
731
732            job_id              = row[0]
733            job_requested_time  = row[4]
734            job_status          = row[7]
735            job_start_timestamp = row[8]
736
737            # If it was set to queued and we didn't see it started
738            # there's not point in keeping it around
739            #
740            if job_status == 'Q' or not job_start_timestamp:
741
742                cleanjobs.append( job_id )
743
744        debug_msg( 1, 'Housekeeping: Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
745
746        # Purge these from database
747        #
748        for j in cleanjobs:
749
750            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
[770]751            self.setDatabase( q )
[295]752
[857]753        debug_msg( 1, 'Housekeeping: done.' )
754
755        return cleanjobs
756
[37]757class RRDMutator:
[770]758    """A class for performing RRD mutations"""
[37]759
[770]760    binary = None
[37]761
[770]762    def __init__( self, binary=None ):
763        """Set alternate binary if supplied"""
[37]764
[770]765        if binary:
766            self.binary = binary
[37]767
[770]768    def create( self, filename, args ):
769        """Create a new rrd with args"""
[63]770
[770]771        global MODRRDTOOL
[37]772
[770]773        if MODRRDTOOL:
774            return self.perform( 'create', filename, args )
775        else:
776            return self.perform( 'create', '"' + filename + '"', args )
[375]777
[770]778    def update( self, filename, args ):
779        """Update a rrd with args"""
[63]780
[770]781        global MODRRDTOOL
[37]782
[770]783        if MODRRDTOOL:
784            return self.perform( 'update', filename, args )
785        else:
786            return self.perform( 'update', '"' + filename + '"', args )
[375]787
[770]788    def grabLastUpdate( self, filename ):
789        """Determine the last update time of filename rrd"""
[42]790
[770]791        global MODRRDTOOL
[375]792
[770]793        last_update = 0
[42]794
[770]795        # Use the py-rrdtool module if it's available on this system
796        #
797        if MODRRDTOOL:
[53]798
[770]799            debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
[42]800
[770]801            rrd_header     = { }
[292]802
[770]803            try:
804                rrd_header    = rrdtool.info( filename )
805            except rrdtool.error, msg:
806                debug_msg( 8, str( msg ) )
[375]807
[770]808            if rrd_header.has_key( 'last_update' ):
809                return last_update
810            else:
811                return 0
[375]812
[770]813        # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
814        # DEPRECATED (slow!)
815        #
816        else:
817            debug_msg( 8, self.binary + ' info ' + filename )
[42]818
[770]819            my_pipe        = os.popen( self.binary + ' info "' + filename + '"' )
[375]820
[770]821            for line in my_pipe.readlines():
[375]822
[770]823                if line.find( 'last_update') != -1:
[375]824
[770]825                    last_update = line.split( ' = ' )[1]
[375]826
[770]827            if my_pipe:
[375]828
[770]829                my_pipe.close()
[375]830
[770]831            if last_update:
832                return last_update
833            else:
834                return 0
[375]835
836
[770]837    def perform( self, action, filename, args ):
838        """Perform action on rrd filename with args"""
[37]839
[770]840        global MODRRDTOOL
[375]841
[770]842        arg_string = None
[37]843
[770]844        if type( args ) is not ListType:
845            debug_msg( 8, 'Arguments needs to be of type List' )
846            return 1
[40]847
[770]848        for arg in args:
[37]849
[770]850            if not arg_string:
[37]851
[770]852                arg_string = arg
853            else:
854                arg_string = arg_string + ' ' + arg
[37]855
[770]856        if MODRRDTOOL:
[37]857
[770]858            debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
[292]859
[770]860            try:
861                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
[37]862
[770]863                if action == 'create':
[146]864
[770]865                    rrdtool.create( str( filename ), *args )
[37]866
[770]867                elif action == 'update':
[37]868
[770]869                    rrdtool.update( str( filename ), *args )
[365]870
[770]871            except rrdtool.error, msg:
[365]872
[770]873                error_msg = str( msg )
874                debug_msg( 8, error_msg )
875                return 1
[375]876
[770]877        else:
[375]878
[770]879            debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[375]880
[770]881            cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
882            lines   = cmd.readlines()
[375]883
[770]884            cmd.close()
[375]885
[770]886            for line in lines:
[375]887
[770]888                if line.find( 'ERROR' ) != -1:
[375]889
[770]890                    error_msg = string.join( line.split( ' ' )[1:] )
891                    debug_msg( 8, error_msg )
892                    return 1
[375]893
[770]894        return 0
[37]895
[78]896class XMLProcessor:
[770]897    """Skeleton class for XML processor's"""
[78]898
[770]899    def run( self ):
900        """Do main processing of XML here"""
[78]901
[770]902        pass
[78]903
[782]904class JobXMLProcessor( XMLProcessor ):
[770]905    """Main class for processing XML and acting with it"""
[78]906
[770]907    def __init__( self, XMLSource, DataStore ):
908        """Setup initial XML connection and handlers"""
[78]909
[782]910        self.myXMLSource  = XMLSource
911        self.myXMLHandler = JobXMLHandler( DataStore )
912        self.myXMLError   = XMLErrorHandler()
[78]913
[782]914        self.config       = GangliaConfigParser( GMETAD_CONF )
[287]915
[782]916        self.kill_thread  = False
917
918    def killThread( self ):
919
920        self.kill_thread  = True
921
[770]922    def run( self ):
923        """Main XML processing"""
[78]924
[782]925        debug_msg( 1, 'job_xml_thread(): started.' )
[87]926
[770]927        while( 1 ):
[78]928
[782]929            debug_msg( 1, 'job_xml_thread(): Retrieving XML data..' )
[176]930
[770]931            my_data    = self.myXMLSource.getData()
[287]932
[782]933            debug_msg( 1, 'job_xml_thread(): Done retrieving: data size %d' %len(my_data) )
[469]934
[770]935            if my_data:
[782]936                debug_msg( 1, 'job_xml_thread(): Parsing XML..' )
[469]937
[770]938                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[469]939
[926]940                if self.myXMLError.isFatal():
941
942                    sys.exit( 1 )
943
[782]944                debug_msg( 1, 'job_xml_thread(): Done parsing.' )
[774]945            else:
[782]946                debug_msg( 1, 'job_xml_thread(): Got no data.' )
[774]947
[782]948            if self.kill_thread:
949
950                debug_msg( 1, 'job_xml_thread(): killed.' )
951                return None
[770]952               
[782]953            debug_msg( 1, 'job_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[770]954            time.sleep( self.config.getLowestInterval() )
[78]955
[782]956class JobXMLHandler( xml.sax.handler.ContentHandler ):
957    """Parse Job's jobinfo XML from our plugin"""
[63]958
[770]959    def __init__( self, datastore ):
[84]960
[782]961        self.ds              = datastore
962        self.jobs_processed  = [ ]
963        self.jobs_to_store   = [ ]
964        self.jobAttrs        = { }
965        self.jobAttrsSaved   = { }
966
[857]967        self.iteration       = 0
968
969        self.ds.checkTimedoutJobs()
970        self.ds.checkStaleJobs()
971
[774]972        debug_msg( 1, "XML: Handler created" )
[84]973
[770]974    def startDocument( self ):
[183]975
[782]976        self.jobs_processed = [ ]
977        self.heartbeat      = 0
978        self.elementct      = 0
[857]979        self.iteration      = self.iteration + 1
[782]980
[857]981        if self.iteration > 20:
[183]982
[857]983            timedout_jobs = self.ds.checkTimedoutJobs()
984            self.iteration = 0
985
[925]986            if timedout_jobs != None:
[857]987
[925]988                for j in timedout_jobs:
[857]989
[949]990                    try:
991                        del self.jobAttrs[ j ]
992                        del self.jobAttrsSaved[ j ]
993                    except KeyError:
994                        pass
[925]995
[857]996        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
997
[770]998    def startElement( self, name, attrs ):
999        """
1000        This XML will be all gmetric XML
1001        so there will be no specific start/end element
1002        just one XML statement with all info
1003        """
[782]1004
[770]1005        jobinfo = { }
[63]1006
[770]1007        self.elementct    += 1
[365]1008
[770]1009        if name == 'CLUSTER':
[63]1010
[770]1011            self.clustername = str( attrs.get( 'NAME', "" ) )
[199]1012
[770]1013        elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
[199]1014
[770]1015            metricname = str( attrs.get( 'NAME', "" ) )
[63]1016
[772]1017            if metricname == 'zplugin_monarch_heartbeat':
[782]1018
[770]1019                self.heartbeat = str( attrs.get( 'VAL', "" ) )
[63]1020
[772]1021            elif metricname.find( 'zplugin_monarch_job' ) != -1:
[63]1022
[782]1023                job_id  = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
1024                val     = str( attrs.get( 'VAL', "" ) )
[63]1025
[770]1026                valinfo = val.split( ' ' )
[63]1027
[770]1028                for myval in valinfo:
[63]1029
[770]1030                    if len( myval.split( '=' ) ) > 1:
[63]1031
[782]1032                        valname = myval.split( '=' )[0]
1033                        value   = myval.split( '=' )[1]
[70]1034
[770]1035                        if valname == 'nodes':
[782]1036
[770]1037                            value = value.split( ';' )
[72]1038
[770]1039                        jobinfo[ valname ] = value
[84]1040
[782]1041                self.jobAttrs[ job_id ] = jobinfo
[84]1042
[782]1043                self.jobs_processed.append( job_id )
[949]1044
[770]1045                   
1046    def endDocument( self ):
1047        """When all metrics have gone, check if any jobs have finished"""
[72]1048
[783]1049        jobs_finished = [ ]
1050
[782]1051        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_processed))+" jobs" )
[365]1052
[782]1053        if self.heartbeat == 0:
1054            return None
[74]1055
[782]1056        for jobid, jobinfo in self.jobAttrs.items():
[102]1057
[782]1058            if jobinfo['reported'] != self.heartbeat:
[74]1059
[782]1060                if (jobinfo['status'] != 'R'):
1061                    debug_msg( 1, 'job %s report time %s does not match current heartbeat %s : ignoring job' %(jobid, jobinfo['reported'], self.heartbeat ) )
1062                    del self.jobAttrs[ jobid ]
[96]1063
[782]1064                    if jobid in self.jobs_to_store:
1065                        del self.jobs_to_store[ jobid ]
1066
1067                    continue
1068
1069                elif jobid not in self.jobs_processed:
[783]1070
[782]1071                    # Was running previous heartbeat but not anymore: must be finished
[770]1072                    self.jobAttrs[ jobid ]['status'] = 'F'
1073                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
[782]1074                    debug_msg( 1, 'job %s appears to have finished' %jobid )
[96]1075
[783]1076                    jobs_finished.append( jobid )
1077
[770]1078                    if not jobid in self.jobs_to_store:
1079                        self.jobs_to_store.append( jobid )
[74]1080
[782]1081                    continue
[87]1082
[782]1083            elif self.jobAttrsSaved.has_key( jobid ):
[84]1084
[783]1085                # This should pretty much never happen, but hey let's be careful
1086                # Perhaps if someone altered their job while in queue with qalter
1087
[782]1088                if self.jobinfoChanged( jobid, jobinfo ):
[184]1089
[782]1090                    self.jobAttrs[ jobid ]['stop_timestamp'] = ''
1091                    self.jobAttrs[ jobid ]                   = self.setJobAttrs( self.jobAttrs[ jobid ], jobinfo )
[184]1092
[782]1093                    if not jobid in self.jobs_to_store:
[87]1094
[782]1095                        self.jobs_to_store.append( jobid )
[84]1096
[782]1097                    debug_msg( 10, 'jobinfo for job %s has changed' %jobid )
1098            else:
[930]1099                debug_msg( 10, 'new job %s' %jobid )
[782]1100
1101                if not jobid in self.jobs_to_store:
1102
1103                    self.jobs_to_store.append( jobid )
1104
1105        debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) )
1106
[792]1107        failed_store = [ ]
1108        succes_store = [ ]
1109
[782]1110        if len( self.jobs_to_store ) > 0:
1111
1112            debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' )
1113
[792]1114            for n in range( 0, len(self.jobs_to_store ) ):
[782]1115
[792]1116                if len( self.jobs_to_store ) == 0:
1117                    break
1118
[782]1119                jobid = self.jobs_to_store.pop( 0 )
1120
[792]1121                db_ok = self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
[782]1122
[792]1123                if not db_ok:
1124
1125                    self.ds.doRollback()
1126                    failed_store.append( jobid )
1127                    continue
1128
1129                self.ds.doCommit()
1130                succes_store.append( jobid )
1131
[783]1132                if not jobid in jobs_finished:
[782]1133
[783]1134                    self.jobAttrsSaved[ jobid ] = self.jobAttrs[ jobid ]
1135
1136                elif self.jobAttrsSaved.has_key( jobid ):
1137
1138                    del self.jobAttrsSaved[ jobid ]
1139
[782]1140                if self.jobAttrs[ jobid ]['status'] == 'F':
1141
1142                    del self.jobAttrs[ jobid ]
1143
[792]1144            result_str = 'succesfully stored: %s jobs' %str(len(succes_store))
[782]1145
[792]1146            if len( failed_store ) > 0:
1147                result_str = result_str + ' - failed to store: %s jobs - deferred to next interval' %str(len(failed_store))
1148
1149            debug_msg( 1, 'job_xml_thread(): Done storing. %s' %result_str )
1150
[782]1151        else:
1152            debug_msg( 1, 'job_xml_thread(): No jobs to store.' )
1153
1154        self.jobs_processed = [ ]
1155
[783]1156        # TODO: once in while check database AND self.jobAttrsSaved for stale jobs
1157
[770]1158    def setJobAttrs( self, old, new ):
1159        """
1160        Set new job attributes in old, but not lose existing fields
1161        if old attributes doesn't have those
1162        """
[82]1163
[770]1164        for valname, value in new.items():
1165            old[ valname ] = value
[82]1166
[770]1167        return old
1168       
[82]1169
[782]1170    def jobinfoChanged( self, jobid, jobinfo ):
[770]1171        """
1172        Check if jobinfo has changed from jobattrs[jobid]
1173        if it's report time is bigger than previous one
1174        and it is report time is recent (equal to heartbeat)
1175        """
[72]1176
[770]1177        ignore_changes = [ 'reported' ]
[87]1178
[782]1179        if self.jobAttrsSaved.has_key( jobid ):
[73]1180
[770]1181            for valname, value in jobinfo.items():
[73]1182
[770]1183                if valname not in ignore_changes:
[73]1184
[782]1185                    if self.jobAttrsSaved[ jobid ].has_key( valname ):
[73]1186
[782]1187                        if value != self.jobAttrsSaved[ jobid ][ valname ]:
[73]1188
[782]1189                            if jobinfo['reported'] > self.jobAttrsSaved[ jobid ][ 'reported' ]:
1190
1191                                debug_msg( 1, "job %s field '%s' changed since saved from: %s to: %s" %( jobid, valname, value, self.jobAttrsSaved[ jobid ][ valname ] ) )
1192
[770]1193                                return True
[73]1194
[770]1195                    else:
[782]1196                        debug_msg( 1, "job %s did not have field '%s'" %( jobid, valname )  )
[770]1197                        return True
[87]1198
[770]1199        return False
[73]1200
[71]1201class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[770]1202    """Parse Ganglia's XML"""
[3]1203
[949]1204    def __init__( self, config, datastore, cluster ):
[770]1205        """Setup initial variables and gather info on existing rrd archive"""
[63]1206
[857]1207        self.config          = config
[949]1208        self.clusterName     = cluster
[857]1209        self.ds              = datastore
[949]1210        self.rrd_handler     = None
1211        self.cluster_start   = False
[324]1212
[949]1213        debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName )
1214        self.gatherCluster()
1215        debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName )
[33]1216
[949]1217    def gatherCluster( self ):
[770]1218        """Find all existing clusters in archive dir"""
[44]1219
[930]1220        global ARCHIVE_DATASOURCES
1221
[770]1222        archive_dir    = check_dir(ARCHIVE_PATH)
[44]1223
[770]1224        hosts        = [ ]
[44]1225
[770]1226        if os.path.exists( archive_dir ):
[44]1227
[770]1228            dirlist    = os.listdir( archive_dir )
[44]1229
[949]1230            if self.clusterName not in dirlist:
[369]1231
[949]1232                # Autocreate a directory for this cluster
1233                # assume it is new
1234                #
1235                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
[369]1236
[949]1237                os.mkdir( cluster_dir )
[369]1238
[949]1239                dirlist.append( cfgcluster )
[369]1240
[949]1241            for d in dirlist:
[370]1242
[949]1243                if not self.rrd_handler and d == self.clusterName:
[44]1244
[949]1245                    self.rrd_handler = RRDHandler( self.config, d )
[44]1246
[949]1247                    debug_msg( 9, 'Found cluster dir: %s' %( d ) )
[44]1248
[770]1249    def startElement( self, name, attrs ):
1250        """Memorize appropriate data from xml start tags"""
[3]1251
[770]1252        if name == 'GANGLIA_XML':
[32]1253
[949]1254            self.XMLSource      = attrs.get( 'SOURCE',  "" )
1255            self.gangliaVersion = attrs.get( 'VERSION', "" )
[32]1256
[770]1257            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]1258
[949]1259            return 0
[32]1260
[949]1261        if name == 'GRID':
[32]1262
[949]1263            self.gridName    = attrs.get( 'NAME', "" )
1264            self.time        = attrs.get( 'LOCALTIME', "" )
1265
[770]1266            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]1267
[949]1268            return 0
[32]1269
[949]1270        if name == 'CLUSTER':
[32]1271
[949]1272            xmlClusterName   = attrs.get( 'NAME',      "" )
1273            self.time        = attrs.get( 'LOCALTIME', "" )
[32]1274
[949]1275            if self.clusterName == xmlClusterName:
[33]1276
[949]1277                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]1278
[949]1279                self.cluster_start = True
[32]1280
[949]1281                if not self.rrd_handler:
[32]1282
[949]1283                    self.rrd_handler = RRDHandler( self.config, self.clusterName )
1284            else:
1285                self.cluster_start = False
1286
1287            return 0
1288
1289        if name == 'HOST' and self.cluster_start:
1290
1291            self.hostName     = attrs.get( 'NAME',     "" )
1292            self.hostIp       = attrs.get( 'IP',       "" )
1293            self.hostReported = attrs.get( 'REPORTED', "" )
1294
[770]1295            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]1296
[949]1297            return 0
[6]1298
[949]1299        if name == 'METRIC' and self.cluster_start:
1300
1301            #type = attrs.get( 'TYPE', "" )
1302            #orig_name = attrs.get( 'NAME', "" )
[770]1303           
[949]1304            if attrs.get( 'TYPE', "" ) != 'string':
[6]1305
[949]1306                #myMetric         = { }
1307                #myMetric['name'] = attrs.get( 'NAME', "" )
1308                #myMetric['val']  = attrs.get( 'VAL',  "" )
1309                #myMetric['time'] = self.hostReported
[3]1310
[949]1311                self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL',  "" ), 'time': self.hostReported } )
[198]1312
[949]1313                #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
1314                #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[198]1315
1316
[949]1317    def endElement( self, name ):
[198]1318
[949]1319        if name == 'CLUSTER' and self.cluster_start:
[3]1320
[949]1321            self.cluster_start = False
1322            debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) )
[3]1323
[770]1324    def storeMetrics( self ):
1325        """Store metrics of each cluster rrd handler"""
[9]1326
[949]1327        ret = self.rrd_handler.storeMetrics()
[16]1328
[949]1329        if ret:
1330            debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1331            return 1
[9]1332
[770]1333        return 0
[38]1334
[71]1335class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1336
[926]1337    def __init__( self ):
1338
1339        self.me_fatal = False
1340
[770]1341    def error( self, exception ):
1342        """Recoverable error"""
[71]1343
[770]1344        debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]1345
[770]1346    def fatalError( self, exception ):
1347        """Non-recoverable error"""
[71]1348
[770]1349        exception_str = str( exception )
[71]1350
[770]1351        # Ignore 'no element found' errors
1352        if exception_str.find( 'no element found' ) != -1:
[926]1353
[770]1354            debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1355            return 0
[71]1356
[926]1357        self.me_fatal = True
1358
[770]1359        debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]1360
[926]1361    def isFatal( self ):
1362
1363        return self.me_fatal
1364
[770]1365    def warning( self, exception ):
1366        """Warning"""
[71]1367
[770]1368        debug_msg( 0, 'Warning ' + str( exception ) )
[71]1369
[78]1370class XMLGatherer:
[770]1371    """Setup a connection and file object to Ganglia's XML"""
[3]1372
[858]1373    s           = None
1374    fd          = None
[770]1375    data        = None
1376    slot        = None
[8]1377
[770]1378    # Time since the last update
1379    #
1380    LAST_UPDATE    = 0
[287]1381
[770]1382    # Minimum interval between updates
1383    #
1384    MIN_UPDATE_INT    = 10
[287]1385
[770]1386    # Is a update occuring now
1387    #
1388    update_now    = False
[287]1389
[770]1390    def __init__( self, host, port ):
1391        """Store host and port for connection"""
[8]1392
[770]1393        self.host    = host
1394        self.port    = port
[782]1395        self.slot    = threading.Lock()
[3]1396
[770]1397        self.retrieveData()
[287]1398
[770]1399    def retrieveData( self ):
1400        """Setup connection to XML source"""
[8]1401
[949]1402        if self.update_now:
1403            return 0
1404
[858]1405        self.update_now = True
[287]1406
[949]1407        #self.slot.acquire()
[293]1408
[858]1409        self.data       = None
[469]1410
[949]1411        debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )
1412
[770]1413        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]1414
[770]1415            af, socktype, proto, canonname, sa = res
[32]1416
[770]1417            try:
[32]1418
[770]1419                self.s = socket.socket( af, socktype, proto )
[32]1420
[770]1421            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
[32]1422
[770]1423                self.s = None
1424                continue
[32]1425
[773]1426            try:
[32]1427
[770]1428                self.s.connect( sa )
[32]1429
[773]1430            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
[32]1431
[770]1432                self.disconnect()
1433                continue
[32]1434
[773]1435            break
[3]1436
[770]1437        if self.s is None:
[32]1438
[770]1439            debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1440            self.update_now    = False
1441            #sys.exit( 1 )
[5]1442
[770]1443        else:
1444            #self.s.send( '\n' )
[287]1445
[770]1446            my_fp            = self.s.makefile( 'r' )
[949]1447            #my_data          = my_fp.readlines()
1448            #my_data          = string.join( my_data, '' )
[287]1449
[949]1450            #self.data        = my_data
1451            self.data        = my_fp.read()
[287]1452
[773]1453            self.LAST_UPDATE = time.time()
[287]1454
[949]1455            self.disconnect()
[293]1456
[949]1457        #self.slot.release()
1458
1459        debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." )
1460
[770]1461        self.update_now    = False
[287]1462
[770]1463    def disconnect( self ):
1464        """Close socket"""
[33]1465
[770]1466        if self.s:
1467            #self.s.shutdown( 2 )
1468            self.s.close()
1469            self.s = None
[33]1470
[770]1471    def __del__( self ):
1472        """Kill the socket before we leave"""
[33]1473
[770]1474        self.disconnect()
[33]1475
[770]1476    def reGetData( self ):
1477        """Reconnect"""
[33]1478
[949]1479        if self.update_now:
1480            return 0
[287]1481
[949]1482        #while self.update_now:
1483
[770]1484            # Must be another update in progress:
1485            # Wait until the update is complete
1486            #
[949]1487        #    time.sleep( 1 )
[287]1488
[770]1489        if self.s:
1490            self.disconnect()
[33]1491
[949]1492        cur_time    = time.time()
[5]1493
[949]1494        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1495
1496            if not self.update_now:
1497
1498                self.retrieveData()
1499
[770]1500    def getData( self ):
[287]1501
[770]1502        """Return the XML data"""
[287]1503
[770]1504        # If more than MIN_UPDATE_INT seconds passed since last data update
1505        # update the XML first before returning it
1506        #
[287]1507
[770]1508        cur_time    = time.time()
[287]1509
[770]1510        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
[287]1511
[949]1512            if not self.update_now:
[287]1513
[949]1514                self.reGetData()
1515
[770]1516        while self.update_now:
[287]1517
[770]1518            # Must be another update in progress:
1519            # Wait until the update is complete
1520            #
1521            time.sleep( 1 )
1522           
1523        return self.data
[287]1524
[770]1525    def makeFileDescriptor( self ):
1526        """Make file descriptor that points to our socket connection"""
[70]1527
[770]1528        self.reconnect()
[70]1529
[770]1530        if self.s:
1531            self.fd = self.s.makefile( 'r' )
[70]1532
[770]1533    def getFileObject( self ):
1534        """Connect, and return a file object"""
[70]1535
[770]1536        self.makeFileDescriptor()
[78]1537
[770]1538        if self.fd:
1539            return self.fd
[70]1540
[78]1541class GangliaXMLProcessor( XMLProcessor ):
[770]1542    """Main class for processing XML and acting with it"""
[5]1543
[949]1544    def __init__( self, XMLSource, DataStore, cluster ):
[770]1545        """Setup initial XML connection and handlers"""
[33]1546
[782]1547        self.config       = GangliaConfigParser( GMETAD_CONF )
1548        self.myXMLSource  = XMLSource
1549        self.ds           = DataStore
1550        self.myXMLError   = XMLErrorHandler()
[949]1551        self.clusterName  = cluster
1552        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )
[33]1553
[949]1554
[770]1555    def run( self ):
1556        """Main XML processing; start a xml and storethread"""
[8]1557
[782]1558        xml_thread   = threading.Thread( None, self.processXML,   'xmlthread' )
[770]1559        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]1560
[770]1561        while( 1 ):
[36]1562
[770]1563            if not xml_thread.isAlive():
1564                # Gather XML at the same interval as gmetad
[36]1565
[770]1566                # threaded call to: self.processXML()
1567                #
1568                try:
1569                    xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1570                    xml_thread.start()
1571                except thread.error, msg:
[949]1572                    debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) )
[770]1573                    #return 1
[36]1574
[770]1575            if not store_thread.isAlive():
1576                # Store metrics every .. sec
[36]1577
[770]1578                # threaded call to: self.storeMetrics()
1579                #
1580                try:
1581                    store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1582                    store_thread.start()
1583                except thread.error, msg:
[949]1584                    debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) )
[770]1585                    #return 1
1586       
1587            # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1588            time.sleep( 1 )   
[36]1589
[770]1590    def storeMetrics( self ):
1591        """Store metrics retained in memory to disk"""
[22]1592
[770]1593        global DEBUG_LEVEL
[365]1594
[770]1595        # Store metrics somewhere between every 360 and 640 seconds
1596        #
[774]1597        if DEBUG_LEVEL >= 1:
1598            STORE_INTERVAL = 60
[770]1599        else:
[857]1600            STORE_INTERVAL = random.randint( 300, 600 )
[22]1601
[770]1602        try:
1603            store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1604            store_metric_thread.start()
1605        except thread.error, msg:
1606            debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1607            return 1
[36]1608
[949]1609        debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName )
[169]1610
[949]1611        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) )
[770]1612        time.sleep( STORE_INTERVAL )
[949]1613        debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName )
[36]1614
[770]1615        if store_metric_thread.isAlive():
[36]1616
[949]1617            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName )
[770]1618            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[36]1619
[782]1620            if store_metric_thread.isAlive():
1621
[949]1622                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName )
[782]1623            else:
[949]1624                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName )
[782]1625
[949]1626        debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName )
[36]1627
[770]1628        return 0
[36]1629
[770]1630    def storeThread( self ):
1631        """Actual metric storing thread"""
[39]1632
[949]1633        debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName )
1634        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName )
[782]1635
[770]1636        ret = self.myXMLHandler.storeMetrics()
1637        if ret > 0:
[949]1638            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )
[782]1639
[949]1640        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName )
1641        debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName )
[770]1642       
1643        return 0
[39]1644
[770]1645    def processXML( self ):
1646        """Process XML"""
[8]1647
[770]1648        try:
1649            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1650            parsethread.start()
1651        except thread.error, msg:
[949]1652            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) )
[770]1653            return 1
[8]1654
[949]1655        debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName )
[36]1656
[949]1657        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) )
[770]1658        time.sleep( float( self.config.getLowestInterval() ) )   
[949]1659        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName )
[36]1660
[770]1661        if parsethread.isAlive():
[36]1662
[949]1663            debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) )
[770]1664            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[36]1665
[782]1666            if parsethread.isAlive():
[949]1667                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName )
[782]1668            else:
[949]1669                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName )
[782]1670
[949]1671        debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName )
[36]1672
[770]1673        return 0
[36]1674
[770]1675    def parseThread( self ):
1676        """Actual parsing thread"""
[39]1677
[949]1678
1679        debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName )
1680        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName )
[770]1681       
1682        my_data    = self.myXMLSource.getData()
[176]1683
[949]1684        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) )
[293]1685
[770]1686        if my_data:
[949]1687            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName )
1688
[770]1689            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]1690
[949]1691            debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName )
1692            #yappi.print_stats()
[39]1693
[949]1694        debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName )
1695
[770]1696        return 0
[39]1697
[9]1698class GangliaConfigParser:
1699
[770]1700    sources = [ ]
[9]1701
[770]1702    def __init__( self, config ):
1703        """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1704
[770]1705        self.config = config
1706        self.parseValues()
[9]1707
[770]1708    def parseValues( self ):
1709        """Parse certain values from gmetad.conf"""
[9]1710
[770]1711        readcfg = open( self.config, 'r' )
[9]1712
[770]1713        for line in readcfg.readlines():
[9]1714
[770]1715            if line.count( '"' ) > 1:
[9]1716
[770]1717                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1718
[855]1719                    source                = { }
1720                    source['name']        = line.split( '"' )[1]
1721                    source_value_words    = line.split( '"' )[2].split( ' ' )
[9]1722
[855]1723                    check_interval        = source_value_words[0]
[9]1724
[855]1725                    try:
[9]1726
[855]1727                        source['interval'] = int( check_interval )
1728                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], str( source['interval'] ) ) )
1729                    except ValueError:
[32]1730
[770]1731                        source['interval'] = 15
1732                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1733
[770]1734                    self.sources.append( source )
[9]1735
[855]1736        readcfg.close()
1737
1738    def clusterExists( self, source_name ):
1739
1740        for source in self.sources:
1741
1742            if source['name'] == source_name:
1743
1744                return True
1745
1746        return False
1747
[770]1748    def getInterval( self, source_name ):
1749        """Return interval for source_name"""
[32]1750
[770]1751        for source in self.sources:
[32]1752
[770]1753            if source['name'] == source_name:
[32]1754
[770]1755                return source['interval']
[32]1756
[770]1757        return None
[9]1758
[770]1759    def getLowestInterval( self ):
1760        """Return the lowest interval of all clusters"""
[34]1761
[770]1762        lowest_interval = 0
[34]1763
[770]1764        for source in self.sources:
[34]1765
[770]1766            if not lowest_interval or source['interval'] <= lowest_interval:
[34]1767
[770]1768                lowest_interval = source['interval']
[34]1769
[770]1770        # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1771        if lowest_interval:
1772            return lowest_interval
1773        else:
1774            return 15
[34]1775
[9]1776class RRDHandler:
[770]1777    """Class for handling RRD activity"""
[9]1778
[770]1779    def __init__( self, config, cluster ):
1780        """Setup initial variables"""
[78]1781
[949]1782        global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS
[455]1783
[858]1784        self.block   = 0
1785        self.cluster = cluster
1786        self.config  = config
[770]1787        self.slot    = threading.Lock()
[930]1788        self.myMetrics   = { }
1789        self.lastStored  = { }
1790        self.timeserials = { }
[292]1791
[770]1792        if MODRRDTOOL:
[455]1793
[770]1794            self.rrdm    = RRDMutator()
1795        else:
1796            self.rrdm    = RRDMutator( RRDTOOL )
[455]1797
[770]1798        global DEBUG_LEVEL
[9]1799
[949]1800        if DEBUG_LEVEL <= 0:
[770]1801            self.gatherLastUpdates()
[365]1802
[949]1803        self.excludes        = [ ]
1804
1805        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1806
1807            self.excludes.append( re.compile( ex_metricstr ) )
1808
[770]1809    def gatherLastUpdates( self ):
1810        """Populate the lastStored list, containing timestamps of all last updates"""
[42]1811
[770]1812        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
[42]1813
[770]1814        hosts = [ ]
[42]1815
[770]1816        if os.path.exists( cluster_dir ):
[42]1817
[770]1818            dirlist = os.listdir( cluster_dir )
[42]1819
[770]1820            for dir in dirlist:
[42]1821
[770]1822                hosts.append( dir )
[42]1823
[770]1824        for host in hosts:
[42]1825
[770]1826            host_dir    = cluster_dir + '/' + host
[858]1827            dirlist     = os.listdir( host_dir )
[47]1828
[770]1829            for dir in dirlist:
[47]1830
[770]1831                if not self.timeserials.has_key( host ):
[47]1832
[770]1833                    self.timeserials[ host ] = [ ]
[47]1834
[770]1835                self.timeserials[ host ].append( dir )
[47]1836
[770]1837            last_serial = self.getLastRrdTimeSerial( host )
[292]1838
[770]1839            if last_serial:
[42]1840
[770]1841                metric_dir = cluster_dir + '/' + host + '/' + last_serial
[292]1842
[770]1843                if os.path.exists( metric_dir ):
[42]1844
[770]1845                    dirlist = os.listdir( metric_dir )
[42]1846
[770]1847                    for file in dirlist:
[42]1848
[770]1849                        metricname = file.split( '.rrd' )[0]
[42]1850
[770]1851                        if not self.lastStored.has_key( host ):
[42]1852
[770]1853                            self.lastStored[ host ] = { }
[42]1854
[770]1855                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1856
[770]1857    def getClusterName( self ):
1858        """Return clustername"""
[63]1859
[770]1860        return self.cluster
[32]1861
[770]1862    def memMetric( self, host, metric ):
1863        """Store metric from host in memory"""
[32]1864
[770]1865        # <ATOMIC>
1866        #
1867       
[949]1868        #if host in self.myMetrics:
[32]1869
[949]1870            #if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1871
[949]1872            #    if len( self.myMetrics[ host ][ metric['name'] ] ) > 0:
1873     
1874            #        if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
[32]1875
[949]1876            #            return 1
1877                #for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1878
[949]1879                #    if mymetric['time'] == metric['time']:
1880
[770]1881                        # Allready have this metric, abort
[949]1882                #        return 1
1883            #else:
1884            #if metric['name'] not in self.myMetrics[ host ]:
1885            #    self.myMetrics[ host ][ metric['name'] ] = deque()
1886        #else:
1887        #    self.myMetrics[ host ]                   = { }
1888        #    self.myMetrics[ host ][ metric['name'] ] = deque()
[32]1889
[770]1890        # Push new metric onto stack
1891        # atomic code; only 1 thread at a time may access the stack
[949]1892        #self.slot.acquire()
[63]1893
[949]1894        try:
1895            host_metrics = self.myMetrics[ host ]
1896        except KeyError:
1897            self.myMetrics[ host ] = { }
1898            host_metrics = self.myMetrics[ host ]
1899
1900        try:
1901            metric_values = self.myMetrics[ host ][ metric['name'] ]
1902        except KeyError:
1903            self.myMetrics[ host ][ metric['name'] ] = deque()
1904            metric_values = self.myMetrics[ host ][ metric['name'] ]
1905
1906        try:
1907            if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
1908                return 1
1909        except (IndexError, KeyError):
1910            pass
1911
[770]1912        self.myMetrics[ host ][ metric['name'] ].append( metric )
[32]1913
[949]1914        #self.slot.release()
[770]1915        #
1916        # </ATOMIC>
[40]1917
[770]1918    def makeUpdateList( self, host, metriclist ):
1919        """
1920        Make a list of update values for rrdupdate
1921        but only those that we didn't store before
1922        """
[37]1923
[770]1924        update_list    = [ ]
1925        metric        = None
[37]1926
[770]1927        while len( metriclist ) > 0:
[37]1928
[770]1929            metric = metriclist.pop( 0 )
[37]1930
[770]1931            if self.checkStoreMetric( host, metric ):
[292]1932
[770]1933                u_val    = str( metric['time'] ) + ':' + str( metric['val'] )
1934                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1935                update_list.append( u_val )
[40]1936
[770]1937        return update_list
[37]1938
[770]1939    def checkStoreMetric( self, host, metric ):
1940        """Check if supplied metric if newer than last one stored"""
[40]1941
[770]1942        if self.lastStored.has_key( host ):
[40]1943
[770]1944            if self.lastStored[ host ].has_key( metric['name'] ):
[40]1945
[770]1946                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1947
[770]1948                    # This is old
1949                    return 0
[40]1950
[770]1951        return 1
[50]1952
[770]1953    def memLastUpdate( self, host, metricname, metriclist ):
1954        """
1955        Memorize the time of the latest metric from metriclist
1956        but only if it wasn't allready memorized
1957        """
[50]1958
[770]1959        if not self.lastStored.has_key( host ):
1960            self.lastStored[ host ] = { }
[54]1961
[770]1962        last_update_time = 0
[50]1963
[770]1964        for metric in metriclist:
[50]1965
[770]1966            if metric['name'] == metricname:
[50]1967
[770]1968                if metric['time'] > last_update_time:
[50]1969
[770]1970                    last_update_time = metric['time']
[40]1971
[770]1972        if self.lastStored[ host ].has_key( metricname ):
1973           
1974            if last_update_time <= self.lastStored[ host ][ metricname ]:
1975                return 1
[40]1976
[770]1977        self.lastStored[ host ][ metricname ] = last_update_time
[52]1978
[770]1979    def storeMetrics( self ):
1980        """
1981        Store all metrics from memory to disk
1982        and do it to the RRD's in appropriate timeperiod directory
1983        """
[33]1984
[949]1985        debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster )
[365]1986
[782]1987        count_values  = 0
1988        count_metrics = 0
[770]1989        count_bits    = 0
[365]1990
[770]1991        for hostname, mymetrics in self.myMetrics.items():   
[33]1992
[770]1993            for metricname, mymetric in mymetrics.items():
[33]1994
[770]1995                count_metrics += 1
[365]1996
[770]1997                for dmetric in mymetric:
[365]1998
[770]1999                    count_values += 1
[365]2000
[782]2001                    count_bits   += len( dmetric['time'] )
2002                    count_bits   += len( dmetric['val'] )
[365]2003
[770]2004        count_bytes    = count_bits / 8
[365]2005
[949]2006        debug_msg( 1, "size of cluster '" + self.cluster + "': " + 
[770]2007            str( len( self.myMetrics.keys() ) ) + " hosts " + 
2008            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
2009            str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
[365]2010
[770]2011        for hostname, mymetrics in self.myMetrics.items():   
[365]2012
[770]2013            for metricname, mymetric in mymetrics.items():
[365]2014
[949]2015                for e in self.excludes:
2016
2017                    if e.match( metricname ):
2018
2019                        del self.myMetrics[ hostname ][ metricname ]
2020                        continue
2021
[770]2022                metrics_to_store = [ ]
[53]2023
[770]2024                # Pop metrics from stack for storing until none is left
2025                # atomic code: only 1 thread at a time may access myMetrics
[63]2026
[770]2027                # <ATOMIC>
2028                #
[949]2029                #self.slot.acquire()
[33]2030
[949]2031                if metricname in self.myMetrics[ hostname ]:
[53]2032
[949]2033                    while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]2034
[949]2035                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[176]2036
[949]2037                            try:
2038                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() )
2039                            except IndexError, msg:
[176]2040
[949]2041                                # Somehow sometimes myMetrics[ hostname ][ metricname ]
2042                                # is still len 0 when the statement is executed.
2043                                # Just ignore indexerror's..
2044                                pass
2045
2046                #self.slot.release()
[770]2047                #
2048                # </ATOMIC>
[53]2049
[770]2050                # Create a mapping table, each metric to the period where it should be stored
2051                #
2052                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]2053
[770]2054                update_rets = [ ]
[50]2055
[770]2056                for period, pmetric in metric_serial_table.items():
[47]2057
[770]2058                    create_ret = self.createCheck( hostname, metricname, period )   
[47]2059
[770]2060                    update_ret = self.update( hostname, metricname, period, pmetric )
[47]2061
[770]2062                    if update_ret == 0:
[47]2063
[770]2064                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
2065                    else:
2066                        debug_msg( 9, 'metric update failed' )
[47]2067
[770]2068                    update_rets.append( create_ret )
2069                    update_rets.append( update_ret )
[47]2070
[770]2071                # Lets ignore errors here for now, we need to make sure last update time
2072                # is correct!
2073                #
2074                #if not (1) in update_rets:
[50]2075
[770]2076                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]2077
[949]2078        debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster )
[365]2079
[770]2080    def makeTimeSerial( self ):
2081        """Generate a time serial. Seconds since epoch"""
[17]2082
[770]2083        # Seconds since epoch
2084        mytime = int( time.time() )
[17]2085
[770]2086        return mytime
[17]2087
[770]2088    def makeRrdPath( self, host, metricname, timeserial ):
2089        """Make a RRD location/path and filename"""
[17]2090
[858]2091        rrd_dir  = '%s/%s/%s/%s'    %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
2092        rrd_file = '%s/%s.rrd'    %( rrd_dir, metricname )
[17]2093
[770]2094        return rrd_dir, rrd_file
[17]2095
[770]2096    def getLastRrdTimeSerial( self, host ):
2097        """Find the last timeserial (directory) for this host"""
[17]2098
[770]2099        newest_timeserial = 0
[19]2100
[770]2101        for dir in self.timeserials[ host ]:
[32]2102
[770]2103            valid_dir = 1
[17]2104
[770]2105            for letter in dir:
2106                if letter not in string.digits:
2107                    valid_dir = 0
[17]2108
[770]2109            if valid_dir:
2110                timeserial = dir
2111                if timeserial > newest_timeserial:
2112                    newest_timeserial = timeserial
[17]2113
[770]2114        if newest_timeserial:
2115            return newest_timeserial
2116        else:
2117            return 0
[17]2118
[770]2119    def determinePeriod( self, host, check_serial ):
2120        """Determine to which period (directory) this time(serial) belongs"""
[47]2121
[770]2122        period_serial = 0
[47]2123
[770]2124        if self.timeserials.has_key( host ):
[47]2125
[770]2126            for serial in self.timeserials[ host ]:
[47]2127
[770]2128                if check_serial >= serial and period_serial < serial:
[47]2129
[770]2130                    period_serial = serial
[56]2131
[770]2132        return period_serial
[47]2133
[770]2134    def determineSerials( self, host, metricname, metriclist ):
2135        """
2136        Determine the correct serial and corresponding rrd to store
2137        for a list of metrics
2138        """
[47]2139
[770]2140        metric_serial_table = { }
[47]2141
[770]2142        for metric in metriclist:
[47]2143
[770]2144            if metric['name'] == metricname:
[47]2145
[858]2146                period       = self.determinePeriod( host, metric['time'] )   
[47]2147
[858]2148                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[47]2149
[770]2150                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]2151
[770]2152                    # This one should get it's own new period
2153                    period = metric['time']
[57]2154
[770]2155                    if not self.timeserials.has_key( host ):
2156                        self.timeserials[ host ] = [ ]
[57]2157
[770]2158                    self.timeserials[ host ].append( period )
[47]2159
[770]2160                if not metric_serial_table.has_key( period ):
[47]2161
[770]2162                    metric_serial_table[ period ] = [ ]
[47]2163
[770]2164                metric_serial_table[ period ].append( metric )
[47]2165
[770]2166        return metric_serial_table
[47]2167
[770]2168    def createCheck( self, host, metricname, timeserial ):
2169        """Check if an rrd allready exists for this metric, create if not"""
[9]2170
[770]2171        debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
2172       
2173        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]2174
[770]2175        if not os.path.exists( rrd_dir ):
[58]2176
[770]2177            try:
2178                os.makedirs( rrd_dir )
[58]2179
[770]2180            except os.OSError, msg:
[58]2181
[770]2182                if msg.find( 'File exists' ) != -1:
[58]2183
[770]2184                    # Ignore exists errors
2185                    pass
[58]2186
[770]2187                else:
[58]2188
[770]2189                    print msg
2190                    return
[58]2191
[770]2192            debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]2193
[770]2194        if not os.path.exists( rrd_file ):
[9]2195
[855]2196            interval     = self.config.getInterval( self.cluster )
[770]2197            heartbeat    = 8 * int( interval )
[9]2198
[858]2199            params       = [ ]
[12]2200
[770]2201            params.append( '--step' )
2202            params.append( str( interval ) )
[12]2203
[770]2204            params.append( '--start' )
2205            params.append( str( int( timeserial ) - 1 ) )
[12]2206
[770]2207            params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
2208            params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]2209
[770]2210            self.rrdm.create( str(rrd_file), params )
[37]2211
[770]2212            debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
[14]2213
[770]2214    def update( self, host, metricname, timeserial, metriclist ):
2215        """
2216        Update rrd file for host with metricname
2217        in directory timeserial with metriclist
2218        """
[9]2219
[770]2220        debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]2221
[858]2222        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]2223
[858]2224        update_list       = self.makeUpdateList( host, metriclist )
[15]2225
[770]2226        if len( update_list ) > 0:
2227            ret = self.rrdm.update( str(rrd_file), update_list )
[32]2228
[770]2229            if ret:
2230                return 1
2231       
2232            debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]2233
[770]2234        return 0
[36]2235
[169]2236def daemon():
[770]2237    """daemonized threading"""
[8]2238
[770]2239    # Fork the first child
2240    #
2241    pid = os.fork()
[169]2242
[770]2243    if pid > 0:
[169]2244
[770]2245        sys.exit(0)  # end parent
[169]2246
[770]2247    # creates a session and sets the process group ID
2248    #
2249    os.setsid()
[169]2250
[770]2251    # Fork the second child
2252    #
2253    pid = os.fork()
[169]2254
[770]2255    if pid > 0:
[169]2256
[770]2257        sys.exit(0)  # end parent
[169]2258
[770]2259    write_pidfile()
[435]2260
[770]2261    # Go to the root directory and set the umask
2262    #
2263    os.chdir('/')
2264    os.umask(0)
[169]2265
[770]2266    sys.stdin.close()
2267    sys.stdout.close()
2268    sys.stderr.close()
[169]2269
[770]2270    os.open('/dev/null', os.O_RDWR)
2271    os.dup2(0, 1)
2272    os.dup2(0, 2)
[169]2273
[770]2274    run()
[169]2275
2276def run():
[770]2277    """Threading start"""
[169]2278
[855]2279    global ARCHIVE_DATASOURCES
2280
[782]2281    config             = GangliaConfigParser( GMETAD_CONF )
[855]2282
2283    for ds in ARCHIVE_DATASOURCES:
2284
2285        if not config.clusterExists( ds ):
2286
2287            print "FATAL ERROR: Data source with name '%s' not found in %s" %( ds, GMETAD_CONF )
2288            sys.exit( 1 )
2289
[782]2290    s_timeout          = int( config.getLowestInterval() - 1 )
[469]2291
[770]2292    socket.setdefaulttimeout( s_timeout )
[469]2293
[770]2294    myXMLSource        = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
2295    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[8]2296
[782]2297    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
[287]2298
[949]2299    myGangliaProcessors= [ ]
2300
2301    for archive_cluster in ARCHIVE_DATASOURCES:
2302
2303        myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) )
2304
2305    ganglia_xml_threads = [ ]
2306
[770]2307    try:
[782]2308        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
[22]2309
[949]2310        t = 0
2311
2312        for ganglia_processor in myGangliaProcessors:
2313
2314            ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) )
2315
2316            t = t + 1
2317
[782]2318        job_xml_thread.start()
[949]2319
2320        for t in ganglia_xml_threads:
2321
2322            t.start()
[770]2323       
2324    except thread.error, msg:
2325        debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
2326        syslog.closelog()
2327        sys.exit(1)
2328       
2329    debug_msg( 0, 'main threading started.' )
[78]2330
[169]2331def main():
[770]2332    """Program startup"""
[169]2333
[770]2334    global DAEMONIZE, USE_SYSLOG
[375]2335
[770]2336    if not processArgs( sys.argv[1:] ):
2337        sys.exit( 1 )
[214]2338
[770]2339    if( DAEMONIZE and USE_SYSLOG ):
2340        syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
[169]2341
[770]2342    if DAEMONIZE:
2343        daemon()
2344    else:
2345        run()
[169]2346
2347#
[81]2348# Global functions
[169]2349#
[81]2350
[9]2351def check_dir( directory ):
[770]2352    """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]2353
[770]2354    if directory[-1] == '/':
2355        directory = directory[:-1]
[9]2356
[770]2357    return directory
[9]2358
[295]2359def reqtime2epoch( rtime ):
2360
[770]2361    (hours, minutes, seconds )    = rtime.split( ':' )
[295]2362
[770]2363    etime    = int(seconds)
2364    etime    = etime + ( int(minutes) * 60 )
2365    etime    = etime + ( int(hours) * 60 * 60 )
[295]2366
[770]2367    return etime
[295]2368
[12]2369def debug_msg( level, msg ):
[770]2370    """Only print msg if correct levels"""
[12]2371
[770]2372    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2373        sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2374   
2375    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2376        syslog.syslog( msg )
[12]2377
[46]2378def printTime( ):
[770]2379    """Print current time in human readable format"""
[46]2380
[770]2381    return time.strftime("%a %d %b %Y %H:%M:%S")
[46]2382
[435]2383def write_pidfile():
2384
[770]2385    # Write pidfile if PIDFILE exists
2386    if PIDFILE:
[435]2387
[770]2388        pid     = os.getpid()
[435]2389
[770]2390        pidfile = open(PIDFILE, 'w')
[435]2391
[770]2392        pidfile.write( str( pid ) )
2393        pidfile.close()
[435]2394
[63]2395# Ooohh, someone started me! Let's go..
[469]2396#
[9]2397if __name__ == '__main__':
[770]2398    main()
Note: See TracBrowser for help on using the repository browser.