source: branches/1.1/jobarchived/jobarchived.py @ 945

Last change on this file since 945 was 945, checked in by ramonb, 10 years ago

jobarchived/jobarchived.py:

  • see #173
  • changed threading: now each cluster gets it's own ganglia xml/store threads
  • added some yappi profiling functions for debugging
  • better debug statements
  • many performance improvements:
    • now use deque collections in stead of lists for storing metrics: faster appends and pops
    • remove some typecasts
    • replaced xml readlines() with read()
    • XMLDataGatherer now truly caches and prevents unnecessary data retrieval
    • replaced some if statements with catch/excepts: is faster
    • disabled thread locking
    • excluded metrics are now ignored while storing: not while parsing, and matched with compiled regexp
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 67.3 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 945 2014-01-16 15:24:00Z ramonb $
22#
23
24import getopt, syslog, ConfigParser, sys
25from collections import deque
26import time
27from pprint import pprint
28#import yappi
29
30#yappi.start()
31
32try:
33    from resource import getrusage, RUSAGE_SELF
34except ImportError:
35    RUSAGE_SELF = 0
36    def getrusage(who=0):
37        return [0.0, 0.0] # on non-UNIX platforms cpu_time always 0.0
38
39p_stats = None
40p_start_time = None
41
42def profiler(frame, event, arg):
43    if event not in ('call','return'): return profiler
44    #### gather stats ####
45    rusage = getrusage(RUSAGE_SELF)
46    t_cpu = rusage[0] + rusage[1] # user time + system time
47    code = frame.f_code
48    fun = (code.co_name, code.co_filename, code.co_firstlineno)
49    #### get stack with functions entry stats ####
50    ct = threading.currentThread()
51    try:
52        p_stack = ct.p_stack
53    except AttributeError:
54        ct.p_stack = deque()
55        p_stack = ct.p_stack
56    #### handle call and return ####
57    if event == 'call':
58        p_stack.append((time.time(), t_cpu, fun))
59    elif event == 'return':
60        try:
61            t,t_cpu_prev,f = p_stack.pop()
62            assert f == fun
63        except IndexError: # TODO investigate
64            t,t_cpu_prev,f = p_start_time, 0.0, None
65        call_cnt, t_sum, t_cpu_sum = p_stats.get(fun, (0, 0.0, 0.0))
66        p_stats[fun] = (call_cnt+1, t_sum+time.time()-t, t_cpu_sum+t_cpu-t_cpu_prev)
67    return profiler
68
69
70def profile_on():
71    global p_stats, p_start_time
72    p_stats = {}
73    p_start_time = time.time()
74    threading.setprofile(profiler)
75    sys.setprofile(profiler)
76
77    debug_msg( 1, 'profile_on(): profiling..' )
78
79def profile_off():
80    threading.setprofile(None)
81    sys.setprofile(None)
82    debug_msg( 1, 'profile_on(): profiling ended..' )
83
84def get_profile_stats():
85    """
86    returns dict[function_tuple] -> stats_tuple
87    where
88      function_tuple = (function_name, filename, lineno)
89      stats_tuple = (call_cnt, real_time, cpu_time)
90    """
91    debug_msg( 1, 'get_profile_stats(): dumping stats..' )
92    return p_stats
93
94VERSION='__VERSION__'
95
96def usage( ver ):
97
98    print 'jobarchived %s' %VERSION
99
100    if ver:
101        return 0
102
103    print
104    print 'Purpose:'
105    print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
106    print '  and node statistics in a RRD archive'
107    print
108    print 'Usage:    jobarchived [OPTIONS]'
109    print
110    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobarchived.conf)'
111    print '  -p, --pidfile=FILE    Use pid file to store the process id'
112    print '  -h, --help        Print help and exit'
113    print '  -v, --version        Print version and exit'
114    print
115
116def processArgs( args ):
117
118    SHORT_L   = 'p:hvc:'
119    LONG_L    = [ 'help', 'config=', 'pidfile=', 'version' ]
120
121    config_filename = '/etc/jobarchived.conf'
122
123    global PIDFILE
124
125    PIDFILE   = None
126
127    try:
128
129        opts, args = getopt.getopt( args, SHORT_L, LONG_L )
130
131    except getopt.error, detail:
132
133        print detail
134        sys.exit(1)
135
136    for opt, value in opts:
137
138        if opt in [ '--config', '-c' ]:
139
140            config_filename = value
141
142        if opt in [ '--pidfile', '-p' ]:
143
144            PIDFILE         = value
145
146        if opt in [ '--help', '-h' ]:
147
148            usage( False )
149            sys.exit( 0 )
150
151        if opt in [ '--version', '-v' ]:
152
153            usage( True )
154            sys.exit( 0 )
155
156    try:
157        return loadConfig( config_filename )
158
159    except ConfigParser.NoOptionError, detail:
160
161        print detail
162        sys.exit( 1 )
163
164def loadConfig( filename ):
165
166    def getlist( cfg_string ):
167
168        my_list = [ ]
169
170        for item_txt in cfg_string.split( ',' ):
171
172            sep_char = None
173
174            item_txt = item_txt.strip()
175
176            for s_char in [ "'", '"' ]:
177
178                if item_txt.find( s_char ) != -1:
179
180                    if item_txt.count( s_char ) != 2:
181
182                        print 'Missing quote: %s' %item_txt
183                        sys.exit( 1 )
184
185                    else:
186
187                        sep_char = s_char
188                        break
189
190            if sep_char:
191
192                item_txt = item_txt.split( sep_char )[1]
193
194            my_list.append( item_txt )
195
196        return my_list
197
198    cfg = ConfigParser.ConfigParser()
199
200    cfg.read( filename )
201
202    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
203    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
204    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL, JOB_SQL_PASSWORD, JOB_SQL_USER
205
206    ARCHIVE_PATH           = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
207
208    ARCHIVE_HOURS_PER_RRD  = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
209
210    DEBUG_LEVEL            = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
211
212    USE_SYSLOG             = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
213
214    SYSLOG_LEVEL           = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
215
216    MODRRDTOOL             = False
217
218    try:
219        global rrdtool
220        import rrdtool
221
222        MODRRDTOOL        = True
223
224    except ImportError:
225
226        MODRRDTOOL        = False
227
228        print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
229
230        RRDTOOL            = cfg.get( 'DEFAULT', 'RRDTOOL' )
231
232    try:
233
234        SYSLOG_FACILITY    = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
235
236    except AttributeError, detail:
237
238        print 'Unknown syslog facility'
239        sys.exit( 1 )
240
241    GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
242
243    ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
244
245    ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
246
247    ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
248
249    JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
250    JOB_SQL_USER            = cfg.get( 'DEFAULT', 'JOB_SQL_USER' )
251    JOB_SQL_PASSWORD        = cfg.get( 'DEFAULT', 'JOB_SQL_PASSWORD' )
252
253    JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
254
255    DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
256
257    return True
258
259# What XML data types not to store
260#
261UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
262
263# Maximum time (in seconds) a parsethread may run
264#
265PARSE_TIMEOUT = 60
266
267# Maximum time (in seconds) a storethread may run
268#
269STORE_TIMEOUT = 360
270
271"""
272The Job Archiving Daemon
273"""
274
275from types import *
276
277import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
278
279try:
280    import psycopg2
281
282except ImportError, details:
283
284    print "FATAL ERROR: psycopg2 python module not found"
285    sys.exit( 1 )
286
287class InitVars:
288        Vars = {}
289       
290        def __init__(self, **key_arg):
291                for (key, value) in key_arg.items():
292                        if value:
293                                self.Vars[key] = value
294                        else:   
295                                self.Vars[key] = None
296                               
297        def __call__(self, *key):
298                key = "%s" % key
299                return self.Vars[key]
300               
301        def __getitem__(self, key):
302                return self.Vars[key]
303               
304        def __repr__(self):
305                return repr(self.Vars)
306               
307        def keys(self):
308                barf =  map(None, self.Vars.keys())
309                return barf
310               
311        def values(self):
312                barf =  map(None, self.Vars.values())
313                return barf
314               
315        def has_key(self, key):
316                if self.Vars.has_key(key):
317                        return 1
318                else:   
319                        return 0
320                       
321class DBError(Exception):
322        def __init__(self, msg=''):
323                self.msg = msg
324                Exception.__init__(self, msg)
325        def __repr__(self):
326                return self.msg
327        __str__ = __repr__
328
329#
330# Class to connect to a database
331# and return the queury in a list or dictionairy.
332#
333class DB:
334    def __init__(self, db_vars):
335
336        self.dict = db_vars
337
338        if self.dict.has_key('User'):
339            self.user = self.dict['User']
340        else:
341            self.user = 'postgres'
342
343        if self.dict.has_key('Host'):
344            self.host = self.dict['Host']
345        else:
346            self.host = 'localhost'
347
348        if self.dict.has_key('Password'):
349            self.passwd = self.dict['Password']
350        else:
351            self.passwd = ''
352
353        if self.dict.has_key('DataBaseName'):
354            self.db = self.dict['DataBaseName']
355        else:
356            self.db = 'jobarchive'
357
358        # connect_string = 'host:port:database:user:password:
359        dsn = "host='%s' dbname='%s' user='%s' password='%s'" %(self.host, self.db, self.user, self.passwd)
360
361        try:
362            self.SQL = psycopg2.connect(dsn)
363        except psycopg2.Error, details:
364            str = "%s" %details
365            raise DBError(str)
366
367    def __repr__(self):
368        return repr(self.result)
369
370    def __nonzero__(self):
371        return not(self.result == None)
372
373    def __len__(self):
374        return len(self.result)
375
376    def __getitem__(self,i):
377        return self.result[i]
378
379    def __getslice__(self,i,j):
380        return self.result[i:j]
381
382    def Get(self, q_str):
383        c = self.SQL.cursor()
384        try:
385            c.execute(q_str)
386            result = c.fetchall()
387        except psycopg2.Error, details:
388            c.close()
389            str = "%s" %details
390            raise DBError(str)
391
392        c.close()
393        return result
394
395    def Set(self, q_str):
396        c = self.SQL.cursor()
397        try:
398            c.execute(q_str)
399
400        except psycopg2.Error, details:
401            c.close()
402            str = "%s" %details
403            raise DBError(str)
404
405        c.close()
406        return True
407
408    def Commit(self):
409
410        return self.SQL.commit()
411
412    def Rollback( self ):
413
414        return self.SQL.rollback()
415
416class DataSQLStore:
417
418    db_vars = None
419    dbc = None
420
421    def __init__( self, hostname, database ):
422
423        global JOB_SQL_USER, JOB_SQL_PASSWORD
424
425        self.db_vars = InitVars(DataBaseName=database,
426                User=JOB_SQL_USER,
427                Host=hostname,
428                Password=JOB_SQL_PASSWORD,
429                Dictionary='true')
430
431        try:
432            self.dbc     = DB(self.db_vars)
433        except DBError, details:
434            debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
435            sys.exit(1)
436
437    def setDatabase(self, statement):
438
439        ret = self.doDatabase('set', statement)
440        return ret
441       
442    def getDatabase(self, statement):
443
444        ret = self.doDatabase('get', statement)
445        return ret
446
447    def doCommit( self ):
448
449        return self.dbc.Commit()
450
451    def doRollback( self ):
452
453        return self.dbc.Rollback()
454
455    def doDatabase(self, type, statement):
456
457        debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
458        try:
459            if type == 'set':
460                result = self.dbc.Set( statement )
461            elif type == 'get':
462                result = self.dbc.Get( statement )
463               
464        except DBError, detail:
465            operation = statement.split(' ')[0]
466            debug_msg( 0, 'ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
467            return False
468
469        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
470        return result
471
472    def getJobNodeId( self, job_id, node_id ):
473
474        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
475        if not id:
476            return False
477
478        if len( id ) > 0:
479
480            if len( id[0] ) > 0 and id[0] != '':
481           
482                return True
483
484        return False
485
486    def getNodeId( self, hostname ):
487
488        id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
489
490        if len( id ) > 0:
491
492            id = id[0][0]
493
494            return id
495        else:
496            return None
497
498    def getNodeIds( self, hostnames ):
499
500        ids = [ ]
501
502        for node in hostnames:
503
504            id = self.getNodeId( node )
505
506            if id:
507                ids.append( id )
508
509        return ids
510
511    def getJobId( self, jobid ):
512
513        id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
514
515        if id:
516            id = id[0][0]
517
518            return id
519        else:
520            return None
521
522    def addJob( self, job_id, jobattrs ):
523
524        if not self.getJobId( job_id ):
525
526            return self.mutateJob( 'insert', job_id, jobattrs ) 
527        else:
528            return self.mutateJob( 'update', job_id, jobattrs )
529
530    def mutateJob( self, action, job_id, jobattrs ):
531
532        job_values     = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
533
534        insert_col_str = 'job_id'
535        insert_val_str = "'%s'" %job_id
536        update_str     = None
537
538        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
539
540        ids = [ ]
541
542        for valname, value in jobattrs.items():
543
544            if valname in job_values and value != '':
545
546                column_name = 'job_' + valname
547
548                if action == 'insert':
549
550                    if not insert_col_str:
551                        insert_col_str = column_name
552                    else:
553                        insert_col_str = insert_col_str + ',' + column_name
554
555                    if not insert_val_str:
556                        insert_val_str = value
557                    else:
558                        insert_val_str = insert_val_str + ",'%s'" %value
559
560                elif action == 'update':
561                   
562                    if not update_str:
563                        update_str = "%s='%s'" %(column_name, value)
564                    else:
565                        update_str = update_str + ",%s='%s'" %(column_name, value)
566
567            elif valname == 'nodes' and value:
568
569                node_valid = 1
570
571                if len(value) == 1:
572               
573                    if jobattrs['status'] == 'Q':
574
575                        node_valid = 0
576
577                    else:
578
579                        node_valid = 0
580
581                        for node_char in str(value[0]):
582
583                            if string.find( string.digits, node_char ) != -1 and not node_valid:
584
585                                node_valid = 1
586
587                if node_valid:
588
589                    ids = self.addNodes( value, jobattrs['domain'] )
590
591        if action == 'insert':
592
593            db_ret = self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
594
595        elif action == 'update':
596
597            db_ret = self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
598
599        if len( ids ) > 0:
600            self.addJobNodes( job_id, ids )
601
602        return db_ret
603
604    def addNodes( self, hostnames, domain ):
605
606        ids = [ ]
607
608        for node in hostnames:
609
610            node    = '%s.%s' %( node, domain )
611            id    = self.getNodeId( node )
612   
613            if not id:
614                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
615                id = self.getNodeId( node )
616
617            ids.append( id )
618
619        return ids
620
621    def addJobNodes( self, jobid, nodes ):
622
623        for node in nodes:
624
625            if not self.getJobNodeId( jobid, node ):
626
627                self.addJobNode( jobid, node )
628
629    def addJobNode( self, jobid, nodeid ):
630
631        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( '%s',%s )" %(jobid, nodeid) )
632
633    def storeJobInfo( self, jobid, jobattrs ):
634
635        return self.addJob( jobid, jobattrs )
636
637    def checkTimedoutJobs( self ):
638
639        debug_msg( 1, 'Housekeeping: checking database for timed out jobs..' )
640
641        # Locate all jobs in the database that are not set to finished
642        #
643        q = "SELECT * from jobs WHERE job_status != 'F'"
644
645        r = self.getDatabase( q )
646
647        if len( r ) == 0:
648
649            return None
650
651        timeoutjobs  = [ ]
652
653        jobtimeout_sec = JOB_TIMEOUT * (60 * 60)
654        cur_time       = time.time()
655
656        for row in r:
657
658            job_id              = row[0]
659            job_requested_time  = row[4]
660            job_status          = row[7]
661            job_start_timestamp = row[8]
662
663            # If it was set to queued and we didn't see it started
664            # there's not point in keeping it around
665            #
666            if job_status == 'R' and job_start_timestamp:
667
668                start_timestamp = int( job_start_timestamp )
669
670                # If it was set to running longer than JOB_TIMEOUT
671                # close the job: it probably finished while we were not running
672                #
673                if ( cur_time - start_timestamp ) > jobtimeout_sec:
674
675                    if job_requested_time:
676
677                        rtime_epoch    = reqtime2epoch( job_requested_time )
678                    else:
679                        rtime_epoch    = None
680                   
681                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
682
683        debug_msg( 1, 'Housekeeping: Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
684
685        ret_jobids_clean = [ ]
686
687        # Close these jobs in the database
688        # update the stop_timestamp to: start_timestamp + requested wallclock
689        # and set state: finished
690        #
691        for j in timeoutjobs:
692
693            ( i, s, r )        = j
694
695            if r:
696                new_end_timestamp    = int( s ) + r
697
698                q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
699                self.setDatabase( q )
700            else:
701
702                # Requested walltime unknown: cannot guess end time: sorry delete them
703                q = "DELETE FROM jobs WHERE job_id = '" + str( i ) + "'"
704                self.setDatabase( q )
705
706            ret_jobids_clean.append( i )
707
708        debug_msg( 1, 'Housekeeping: done.' )
709
710        return ret_jobids_clean
711
712    def checkStaleJobs( self ):
713
714        debug_msg( 1, 'Housekeeping: checking database for stale jobs..' )
715
716        # Locate all jobs in the database that are not set to finished
717        #
718        q = "SELECT * from jobs WHERE job_status != 'F'"
719
720        r = self.getDatabase( q )
721
722        if len( r ) == 0:
723
724            return None
725
726        cleanjobs      = [ ]
727
728        cur_time       = time.time()
729
730        for row in r:
731
732            job_id              = row[0]
733            job_requested_time  = row[4]
734            job_status          = row[7]
735            job_start_timestamp = row[8]
736
737            # If it was set to queued and we didn't see it started
738            # there's not point in keeping it around
739            #
740            if job_status == 'Q' or not job_start_timestamp:
741
742                cleanjobs.append( job_id )
743
744        debug_msg( 1, 'Housekeeping: Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
745
746        # Purge these from database
747        #
748        for j in cleanjobs:
749
750            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
751            self.setDatabase( q )
752
753        debug_msg( 1, 'Housekeeping: done.' )
754
755        return cleanjobs
756
757class RRDMutator:
758    """A class for performing RRD mutations"""
759
760    binary = None
761
762    def __init__( self, binary=None ):
763        """Set alternate binary if supplied"""
764
765        if binary:
766            self.binary = binary
767
768    def create( self, filename, args ):
769        """Create a new rrd with args"""
770
771        global MODRRDTOOL
772
773        if MODRRDTOOL:
774            return self.perform( 'create', filename, args )
775        else:
776            return self.perform( 'create', '"' + filename + '"', args )
777
778    def update( self, filename, args ):
779        """Update a rrd with args"""
780
781        global MODRRDTOOL
782
783        if MODRRDTOOL:
784            return self.perform( 'update', filename, args )
785        else:
786            return self.perform( 'update', '"' + filename + '"', args )
787
788    def grabLastUpdate( self, filename ):
789        """Determine the last update time of filename rrd"""
790
791        global MODRRDTOOL
792
793        last_update = 0
794
795        # Use the py-rrdtool module if it's available on this system
796        #
797        if MODRRDTOOL:
798
799            debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
800
801            rrd_header     = { }
802
803            try:
804                rrd_header    = rrdtool.info( filename )
805            except rrdtool.error, msg:
806                debug_msg( 8, str( msg ) )
807
808            if rrd_header.has_key( 'last_update' ):
809                return last_update
810            else:
811                return 0
812
813        # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
814        # DEPRECATED (slow!)
815        #
816        else:
817            debug_msg( 8, self.binary + ' info ' + filename )
818
819            my_pipe        = os.popen( self.binary + ' info "' + filename + '"' )
820
821            for line in my_pipe.readlines():
822
823                if line.find( 'last_update') != -1:
824
825                    last_update = line.split( ' = ' )[1]
826
827            if my_pipe:
828
829                my_pipe.close()
830
831            if last_update:
832                return last_update
833            else:
834                return 0
835
836
837    def perform( self, action, filename, args ):
838        """Perform action on rrd filename with args"""
839
840        global MODRRDTOOL
841
842        arg_string = None
843
844        if type( args ) is not ListType:
845            debug_msg( 8, 'Arguments needs to be of type List' )
846            return 1
847
848        for arg in args:
849
850            if not arg_string:
851
852                arg_string = arg
853            else:
854                arg_string = arg_string + ' ' + arg
855
856        if MODRRDTOOL:
857
858            debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
859
860            try:
861                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
862
863                if action == 'create':
864
865                    rrdtool.create( str( filename ), *args )
866
867                elif action == 'update':
868
869                    rrdtool.update( str( filename ), *args )
870
871            except rrdtool.error, msg:
872
873                error_msg = str( msg )
874                debug_msg( 8, error_msg )
875                return 1
876
877        else:
878
879            debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
880
881            cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
882            lines   = cmd.readlines()
883
884            cmd.close()
885
886            for line in lines:
887
888                if line.find( 'ERROR' ) != -1:
889
890                    error_msg = string.join( line.split( ' ' )[1:] )
891                    debug_msg( 8, error_msg )
892                    return 1
893
894        return 0
895
896class XMLProcessor:
897    """Skeleton class for XML processor's"""
898
899    def run( self ):
900        """Do main processing of XML here"""
901
902        pass
903
904class JobXMLProcessor( XMLProcessor ):
905    """Main class for processing XML and acting with it"""
906
907    def __init__( self, XMLSource, DataStore ):
908        """Setup initial XML connection and handlers"""
909
910        self.myXMLSource  = XMLSource
911        self.myXMLHandler = JobXMLHandler( DataStore )
912        self.myXMLError   = XMLErrorHandler()
913
914        self.config       = GangliaConfigParser( GMETAD_CONF )
915
916        self.kill_thread  = False
917
918    def killThread( self ):
919
920        self.kill_thread  = True
921
922    def run( self ):
923        """Main XML processing"""
924
925        debug_msg( 1, 'job_xml_thread(): started.' )
926
927        while( 1 ):
928
929            debug_msg( 1, 'job_xml_thread(): Retrieving XML data..' )
930
931            my_data    = self.myXMLSource.getData()
932
933            debug_msg( 1, 'job_xml_thread(): Done retrieving: data size %d' %len(my_data) )
934
935            if my_data:
936                debug_msg( 1, 'job_xml_thread(): Parsing XML..' )
937
938                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
939
940                if self.myXMLError.isFatal():
941
942                    sys.exit( 1 )
943
944                debug_msg( 1, 'job_xml_thread(): Done parsing.' )
945            else:
946                debug_msg( 1, 'job_xml_thread(): Got no data.' )
947
948            if self.kill_thread:
949
950                debug_msg( 1, 'job_xml_thread(): killed.' )
951                return None
952               
953            debug_msg( 1, 'job_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
954            time.sleep( self.config.getLowestInterval() )
955
956class JobXMLHandler( xml.sax.handler.ContentHandler ):
957    """Parse Job's jobinfo XML from our plugin"""
958
959    def __init__( self, datastore ):
960
961        self.ds              = datastore
962        self.jobs_processed  = [ ]
963        self.jobs_to_store   = [ ]
964        self.jobAttrs        = { }
965        self.jobAttrsSaved   = { }
966
967        self.iteration       = 0
968
969        self.ds.checkTimedoutJobs()
970        self.ds.checkStaleJobs()
971
972        debug_msg( 1, "XML: Handler created" )
973
974    def startDocument( self ):
975
976        self.jobs_processed = [ ]
977        self.heartbeat      = 0
978        self.elementct      = 0
979        self.iteration      = self.iteration + 1
980
981        if self.iteration > 20:
982
983            timedout_jobs = self.ds.checkTimedoutJobs()
984            self.iteration = 0
985
986            if timedout_jobs != None:
987
988                for j in timedout_jobs:
989
990                    try:
991                        del self.jobAttrs[ j ]
992                        del self.jobAttrsSaved[ j ]
993                    except KeyError:
994                        pass
995
996        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
997
998    def startElement( self, name, attrs ):
999        """
1000        This XML will be all gmetric XML
1001        so there will be no specific start/end element
1002        just one XML statement with all info
1003        """
1004
1005        jobinfo = { }
1006
1007        self.elementct    += 1
1008
1009        if name == 'CLUSTER':
1010
1011            self.clustername = str( attrs.get( 'NAME', "" ) )
1012
1013        elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
1014
1015            metricname = str( attrs.get( 'NAME', "" ) )
1016
1017            if metricname == 'zplugin_monarch_heartbeat':
1018
1019                self.heartbeat = str( attrs.get( 'VAL', "" ) )
1020
1021            elif metricname.find( 'zplugin_monarch_job' ) != -1:
1022
1023                job_id  = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
1024                val     = str( attrs.get( 'VAL', "" ) )
1025
1026                valinfo = val.split( ' ' )
1027
1028                for myval in valinfo:
1029
1030                    if len( myval.split( '=' ) ) > 1:
1031
1032                        valname = myval.split( '=' )[0]
1033                        value   = myval.split( '=' )[1]
1034
1035                        if valname == 'nodes':
1036
1037                            value = value.split( ';' )
1038
1039                        jobinfo[ valname ] = value
1040
1041                self.jobAttrs[ job_id ] = jobinfo
1042
1043                self.jobs_processed.append( job_id )
1044
1045                   
1046    def endDocument( self ):
1047        """When all metrics have gone, check if any jobs have finished"""
1048
1049        jobs_finished = [ ]
1050
1051        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_processed))+" jobs" )
1052
1053        if self.heartbeat == 0:
1054            return None
1055
1056        for jobid, jobinfo in self.jobAttrs.items():
1057
1058            if jobinfo['reported'] != self.heartbeat:
1059
1060                if (jobinfo['status'] != 'R'):
1061                    debug_msg( 1, 'job %s report time %s does not match current heartbeat %s : ignoring job' %(jobid, jobinfo['reported'], self.heartbeat ) )
1062                    del self.jobAttrs[ jobid ]
1063
1064                    if jobid in self.jobs_to_store:
1065                        del self.jobs_to_store[ jobid ]
1066
1067                    continue
1068
1069                elif jobid not in self.jobs_processed:
1070
1071                    # Was running previous heartbeat but not anymore: must be finished
1072                    self.jobAttrs[ jobid ]['status'] = 'F'
1073                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
1074                    debug_msg( 1, 'job %s appears to have finished' %jobid )
1075
1076                    jobs_finished.append( jobid )
1077
1078                    if not jobid in self.jobs_to_store:
1079                        self.jobs_to_store.append( jobid )
1080
1081                    continue
1082
1083            elif self.jobAttrsSaved.has_key( jobid ):
1084
1085                # This should pretty much never happen, but hey let's be careful
1086                # Perhaps if someone altered their job while in queue with qalter
1087
1088                if self.jobinfoChanged( jobid, jobinfo ):
1089
1090                    self.jobAttrs[ jobid ]['stop_timestamp'] = ''
1091                    self.jobAttrs[ jobid ]                   = self.setJobAttrs( self.jobAttrs[ jobid ], jobinfo )
1092
1093                    if not jobid in self.jobs_to_store:
1094
1095                        self.jobs_to_store.append( jobid )
1096
1097                    debug_msg( 10, 'jobinfo for job %s has changed' %jobid )
1098            else:
1099                debug_msg( 10, 'new job %s' %jobid )
1100
1101                if not jobid in self.jobs_to_store:
1102
1103                    self.jobs_to_store.append( jobid )
1104
1105        debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) )
1106
1107        failed_store = [ ]
1108        succes_store = [ ]
1109
1110        if len( self.jobs_to_store ) > 0:
1111
1112            debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' )
1113
1114            for n in range( 0, len(self.jobs_to_store ) ):
1115
1116                if len( self.jobs_to_store ) == 0:
1117                    break
1118
1119                jobid = self.jobs_to_store.pop( 0 )
1120
1121                db_ok = self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
1122
1123                if not db_ok:
1124
1125                    self.ds.doRollback()
1126                    failed_store.append( jobid )
1127                    continue
1128
1129                self.ds.doCommit()
1130                succes_store.append( jobid )
1131
1132                if not jobid in jobs_finished:
1133
1134                    self.jobAttrsSaved[ jobid ] = self.jobAttrs[ jobid ]
1135
1136                elif self.jobAttrsSaved.has_key( jobid ):
1137
1138                    del self.jobAttrsSaved[ jobid ]
1139
1140                if self.jobAttrs[ jobid ]['status'] == 'F':
1141
1142                    del self.jobAttrs[ jobid ]
1143
1144            result_str = 'succesfully stored: %s jobs' %str(len(succes_store))
1145
1146            if len( failed_store ) > 0:
1147                result_str = result_str + ' - failed to store: %s jobs - deferred to next interval' %str(len(failed_store))
1148
1149            debug_msg( 1, 'job_xml_thread(): Done storing. %s' %result_str )
1150
1151        else:
1152            debug_msg( 1, 'job_xml_thread(): No jobs to store.' )
1153
1154        self.jobs_processed = [ ]
1155
1156        # TODO: once in while check database AND self.jobAttrsSaved for stale jobs
1157
1158    def setJobAttrs( self, old, new ):
1159        """
1160        Set new job attributes in old, but not lose existing fields
1161        if old attributes doesn't have those
1162        """
1163
1164        for valname, value in new.items():
1165            old[ valname ] = value
1166
1167        return old
1168       
1169
1170    def jobinfoChanged( self, jobid, jobinfo ):
1171        """
1172        Check if jobinfo has changed from jobattrs[jobid]
1173        if it's report time is bigger than previous one
1174        and it is report time is recent (equal to heartbeat)
1175        """
1176
1177        ignore_changes = [ 'reported' ]
1178
1179        if self.jobAttrsSaved.has_key( jobid ):
1180
1181            for valname, value in jobinfo.items():
1182
1183                if valname not in ignore_changes:
1184
1185                    if self.jobAttrsSaved[ jobid ].has_key( valname ):
1186
1187                        if value != self.jobAttrsSaved[ jobid ][ valname ]:
1188
1189                            if jobinfo['reported'] > self.jobAttrsSaved[ jobid ][ 'reported' ]:
1190
1191                                debug_msg( 1, "job %s field '%s' changed since saved from: %s to: %s" %( jobid, valname, value, self.jobAttrsSaved[ jobid ][ valname ] ) )
1192
1193                                return True
1194
1195                    else:
1196                        debug_msg( 1, "job %s did not have field '%s'" %( jobid, valname )  )
1197                        return True
1198
1199        return False
1200
1201class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
1202    """Parse Ganglia's XML"""
1203
1204    def __init__( self, config, datastore, cluster ):
1205        """Setup initial variables and gather info on existing rrd archive"""
1206
1207        self.config          = config
1208        self.clusterName     = cluster
1209        self.ds              = datastore
1210        self.rrd_handler     = None
1211        self.cluster_start   = False
1212
1213        debug_msg( 1, 'Housekeeping: checking RRD archive for cluster %s (may take a while)..' %self.clusterName )
1214        self.gatherCluster()
1215        debug_msg( 1, 'Housekeeping: RRD check complete for cluster %s.' %self.clusterName )
1216
1217    def gatherCluster( self ):
1218        """Find all existing clusters in archive dir"""
1219
1220        global ARCHIVE_DATASOURCES
1221
1222        archive_dir    = check_dir(ARCHIVE_PATH)
1223
1224        hosts        = [ ]
1225
1226        if os.path.exists( archive_dir ):
1227
1228            dirlist    = os.listdir( archive_dir )
1229
1230            if self.clusterName not in dirlist:
1231
1232                # Autocreate a directory for this cluster
1233                # assume it is new
1234                #
1235                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
1236
1237                os.mkdir( cluster_dir )
1238
1239                dirlist.append( cfgcluster )
1240
1241            for d in dirlist:
1242
1243                if not self.rrd_handler and d == self.clusterName:
1244
1245                    self.rrd_handler = RRDHandler( self.config, d )
1246
1247                    debug_msg( 9, 'Found cluster dir: %s' %( d ) )
1248
1249    def startElement( self, name, attrs ):
1250        """Memorize appropriate data from xml start tags"""
1251
1252        if name == 'GANGLIA_XML':
1253
1254            self.XMLSource      = attrs.get( 'SOURCE',  "" )
1255            self.gangliaVersion = attrs.get( 'VERSION', "" )
1256
1257            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
1258
1259            return 0
1260
1261        if name == 'GRID':
1262
1263            self.gridName    = attrs.get( 'NAME', "" )
1264            self.time        = attrs.get( 'LOCALTIME', "" )
1265
1266            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
1267
1268            return 0
1269
1270        if name == 'CLUSTER':
1271
1272            xmlClusterName   = attrs.get( 'NAME',      "" )
1273            self.time        = attrs.get( 'LOCALTIME', "" )
1274
1275            if self.clusterName == xmlClusterName:
1276
1277                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
1278
1279                self.cluster_start = True
1280
1281                if not self.rrd_handler:
1282
1283                    self.rrd_handler = RRDHandler( self.config, self.clusterName )
1284            else:
1285                self.cluster_start = False
1286
1287            return 0
1288
1289        if name == 'HOST' and self.cluster_start:
1290
1291            self.hostName     = attrs.get( 'NAME',     "" )
1292            self.hostIp       = attrs.get( 'IP',       "" )
1293            self.hostReported = attrs.get( 'REPORTED', "" )
1294
1295            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
1296
1297            return 0
1298
1299        if name == 'METRIC' and self.cluster_start:
1300
1301            #type = attrs.get( 'TYPE', "" )
1302            #orig_name = attrs.get( 'NAME', "" )
1303           
1304            if attrs.get( 'TYPE', "" ) != 'string':
1305
1306                #myMetric         = { }
1307                #myMetric['name'] = attrs.get( 'NAME', "" )
1308                #myMetric['val']  = attrs.get( 'VAL',  "" )
1309                #myMetric['time'] = self.hostReported
1310
1311                self.rrd_handler.memMetric( self.hostName, { 'name': attrs.get( 'NAME', "" ), 'val': attrs.get( 'VAL',  "" ), 'time': self.hostReported } )
1312
1313                #debug_msg( 9, 'added metric %s from host %s to cluster %s' %( myMetric['name'], self.hostName, self.clusterName ) )
1314                #debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
1315
1316
1317    def endElement( self, name ):
1318
1319        if name == 'CLUSTER' and self.cluster_start:
1320
1321            self.cluster_start = False
1322            debug_msg( 10, ' `-Cluster ended: %s' %( self.clusterName ) )
1323
1324    def storeMetrics( self ):
1325        """Store metrics of each cluster rrd handler"""
1326
1327        ret = self.rrd_handler.storeMetrics()
1328
1329        if ret:
1330            debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1331            return 1
1332
1333        return 0
1334
1335class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1336
1337    def __init__( self ):
1338
1339        self.me_fatal = False
1340
1341    def error( self, exception ):
1342        """Recoverable error"""
1343
1344        debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
1345
1346    def fatalError( self, exception ):
1347        """Non-recoverable error"""
1348
1349        exception_str = str( exception )
1350
1351        # Ignore 'no element found' errors
1352        if exception_str.find( 'no element found' ) != -1:
1353
1354            debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1355            return 0
1356
1357        self.me_fatal = True
1358
1359        debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1360
1361    def isFatal( self ):
1362
1363        return self.me_fatal
1364
1365    def warning( self, exception ):
1366        """Warning"""
1367
1368        debug_msg( 0, 'Warning ' + str( exception ) )
1369
1370class XMLGatherer:
1371    """Setup a connection and file object to Ganglia's XML"""
1372
1373    s           = None
1374    fd          = None
1375    data        = None
1376    slot        = None
1377
1378    # Time since the last update
1379    #
1380    LAST_UPDATE    = 0
1381
1382    # Minimum interval between updates
1383    #
1384    MIN_UPDATE_INT    = 10
1385
1386    # Is a update occuring now
1387    #
1388    update_now    = False
1389
1390    def __init__( self, host, port ):
1391        """Store host and port for connection"""
1392
1393        self.host    = host
1394        self.port    = port
1395        self.slot    = threading.Lock()
1396
1397        self.retrieveData()
1398
1399    def retrieveData( self ):
1400        """Setup connection to XML source"""
1401
1402        if self.update_now:
1403            return 0
1404
1405        self.update_now = True
1406
1407        #self.slot.acquire()
1408
1409        self.data       = None
1410
1411        debug_msg( 1, "XMLGatherer.retrieveData(): actually retrieving data.." )
1412
1413        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
1414
1415            af, socktype, proto, canonname, sa = res
1416
1417            try:
1418
1419                self.s = socket.socket( af, socktype, proto )
1420
1421            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1422
1423                self.s = None
1424                continue
1425
1426            try:
1427
1428                self.s.connect( sa )
1429
1430            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1431
1432                self.disconnect()
1433                continue
1434
1435            break
1436
1437        if self.s is None:
1438
1439            debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1440            self.update_now    = False
1441            #sys.exit( 1 )
1442
1443        else:
1444            #self.s.send( '\n' )
1445
1446            my_fp            = self.s.makefile( 'r' )
1447            #my_data          = my_fp.readlines()
1448            #my_data          = string.join( my_data, '' )
1449
1450            #self.data        = my_data
1451            self.data        = my_fp.read()
1452
1453            self.LAST_UPDATE = time.time()
1454
1455            self.disconnect()
1456
1457        #self.slot.release()
1458
1459        debug_msg( 1, "XMLGatherer.retrieveData(): Done retrieving data." )
1460
1461        self.update_now    = False
1462
1463    def disconnect( self ):
1464        """Close socket"""
1465
1466        if self.s:
1467            #self.s.shutdown( 2 )
1468            self.s.close()
1469            self.s = None
1470
1471    def __del__( self ):
1472        """Kill the socket before we leave"""
1473
1474        self.disconnect()
1475
1476    def reGetData( self ):
1477        """Reconnect"""
1478
1479        if self.update_now:
1480            return 0
1481
1482        #while self.update_now:
1483
1484            # Must be another update in progress:
1485            # Wait until the update is complete
1486            #
1487        #    time.sleep( 1 )
1488
1489        if self.s:
1490            self.disconnect()
1491
1492        cur_time    = time.time()
1493
1494        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1495
1496            if not self.update_now:
1497
1498                self.retrieveData()
1499
1500    def getData( self ):
1501
1502        """Return the XML data"""
1503
1504        # If more than MIN_UPDATE_INT seconds passed since last data update
1505        # update the XML first before returning it
1506        #
1507
1508        cur_time    = time.time()
1509
1510        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1511
1512            if not self.update_now:
1513
1514                self.reGetData()
1515
1516        while self.update_now:
1517
1518            # Must be another update in progress:
1519            # Wait until the update is complete
1520            #
1521            time.sleep( 1 )
1522           
1523        return self.data
1524
1525    def makeFileDescriptor( self ):
1526        """Make file descriptor that points to our socket connection"""
1527
1528        self.reconnect()
1529
1530        if self.s:
1531            self.fd = self.s.makefile( 'r' )
1532
1533    def getFileObject( self ):
1534        """Connect, and return a file object"""
1535
1536        self.makeFileDescriptor()
1537
1538        if self.fd:
1539            return self.fd
1540
1541class GangliaXMLProcessor( XMLProcessor ):
1542    """Main class for processing XML and acting with it"""
1543
1544    def __init__( self, XMLSource, DataStore, cluster ):
1545        """Setup initial XML connection and handlers"""
1546
1547        self.config       = GangliaConfigParser( GMETAD_CONF )
1548        self.myXMLSource  = XMLSource
1549        self.ds           = DataStore
1550        self.myXMLError   = XMLErrorHandler()
1551        self.clusterName  = cluster
1552        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds, self.clusterName )
1553
1554
1555    def run( self ):
1556        """Main XML processing; start a xml and storethread"""
1557
1558        xml_thread   = threading.Thread( None, self.processXML,   'xmlthread' )
1559        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1560
1561        while( 1 ):
1562
1563            if not xml_thread.isAlive():
1564                # Gather XML at the same interval as gmetad
1565
1566                # threaded call to: self.processXML()
1567                #
1568                try:
1569                    xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1570                    xml_thread.start()
1571                except thread.error, msg:
1572                    debug_msg( 0, 'ERROR: Unable to start xml_thread for cluster %s!: %s' %(self.clusterName, str(msg)) )
1573                    #return 1
1574
1575            if not store_thread.isAlive():
1576                # Store metrics every .. sec
1577
1578                # threaded call to: self.storeMetrics()
1579                #
1580                try:
1581                    store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1582                    store_thread.start()
1583                except thread.error, msg:
1584                    debug_msg( 0, 'ERROR: Unable to start store_thread for clsuter %s!: %s' %(self.clusterName, str(msg)) )
1585                    #return 1
1586       
1587            # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1588            time.sleep( 1 )   
1589
1590    def storeMetrics( self ):
1591        """Store metrics retained in memory to disk"""
1592
1593        global DEBUG_LEVEL
1594
1595        # Store metrics somewhere between every 360 and 640 seconds
1596        #
1597        if DEBUG_LEVEL >= 1:
1598            STORE_INTERVAL = 60
1599        else:
1600            STORE_INTERVAL = random.randint( 300, 600 )
1601
1602        try:
1603            store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1604            store_metric_thread.start()
1605        except thread.error, msg:
1606            debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1607            return 1
1608
1609        debug_msg( 1, 'ganglia_store_thread(): started: cluster %s' %self.clusterName )
1610
1611        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss): cluster %s' %(STORE_INTERVAL, self.clusterName ) )
1612        time.sleep( STORE_INTERVAL )
1613        debug_msg( 1, 'ganglia_store_thread(): Done sleeping: cluster %s' %self.clusterName )
1614
1615        if store_metric_thread.isAlive():
1616
1617            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() (cluster %s) still running, waiting to finish..' %self.clusterName )
1618            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1619
1620            if store_metric_thread.isAlive():
1621
1622                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() (cluster %s) still running :( now what?' %self.clusterName )
1623            else:
1624                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished: cluster %s' %self.clusterName )
1625
1626        debug_msg( 1, 'ganglia_store_thread(): finished: cluster %s' %self.clusterName )
1627
1628        return 0
1629
1630    def storeThread( self ):
1631        """Actual metric storing thread"""
1632
1633        debug_msg( 1, 'ganglia_store_metric_thread(): started: cluster %s' %self.clusterName )
1634        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data.. cluster %s' %self.clusterName )
1635
1636        ret = self.myXMLHandler.storeMetrics()
1637        if ret > 0:
1638            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics! cluster %s' %(str(ret), self.clusterName) )
1639
1640        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing: cluster %s' %self.clusterName )
1641        debug_msg( 1, 'ganglia_store_metric_thread(): finished: cluster %s' %self.clusterName )
1642       
1643        return 0
1644
1645    def processXML( self ):
1646        """Process XML"""
1647
1648        try:
1649            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1650            parsethread.start()
1651        except thread.error, msg:
1652            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()! cluster %s: ' %(self.clusterName, str(msg) ) )
1653            return 1
1654
1655        debug_msg( 1, 'ganglia_xml_thread(): started: cluster %s' %self.clusterName )
1656
1657        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss): cluster %s' %(self.config.getLowestInterval(), self.clusterName) )
1658        time.sleep( float( self.config.getLowestInterval() ) )   
1659        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping: cluster %s' %self.clusterName )
1660
1661        if parsethread.isAlive():
1662
1663            debug_msg( 1, 'ganglia_xml_thread(): parsethread() (cluster %s) still running, waiting (%ss) to finish..' %(self.clusterName, PARSE_TIMEOUT ) )
1664            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1665
1666            if parsethread.isAlive():
1667                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) still running :( now what?' %self.clusterName )
1668            else:
1669                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() (cluster %s) finished' %self.clusterName )
1670
1671        debug_msg( 1, 'ganglia_xml_thread(): finished: cluster %s' %self.clusterName )
1672
1673        return 0
1674
1675    def parseThread( self ):
1676        """Actual parsing thread"""
1677
1678
1679        debug_msg( 1, 'ganglia_parse_thread(): started: cluster %s' %self.clusterName )
1680        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data.. cluster %s' %self.clusterName )
1681       
1682        my_data    = self.myXMLSource.getData()
1683
1684        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving (cluster %s): data size %d' %(self.clusterName, len(my_data) ) )
1685
1686        if my_data:
1687            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML.. cluster %s' %self.clusterName )
1688
1689            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1690
1691            debug_msg( 1, 'ganglia_parse_thread(): Done parsing: cluster %s' %self.clusterName )
1692            #yappi.print_stats()
1693
1694        debug_msg( 1, 'ganglia_parse_thread(): finished: %s' %self.clusterName )
1695
1696        return 0
1697
1698class GangliaConfigParser:
1699
1700    sources = [ ]
1701
1702    def __init__( self, config ):
1703        """Parse some stuff from our gmetad's config, such as polling interval"""
1704
1705        self.config = config
1706        self.parseValues()
1707
1708    def parseValues( self ):
1709        """Parse certain values from gmetad.conf"""
1710
1711        readcfg = open( self.config, 'r' )
1712
1713        for line in readcfg.readlines():
1714
1715            if line.count( '"' ) > 1:
1716
1717                if line.find( 'data_source' ) != -1 and line[0] != '#':
1718
1719                    source                = { }
1720                    source['name']        = line.split( '"' )[1]
1721                    source_value_words    = line.split( '"' )[2].split( ' ' )
1722
1723                    check_interval        = source_value_words[0]
1724
1725                    try:
1726
1727                        source['interval'] = int( check_interval )
1728                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], str( source['interval'] ) ) )
1729                    except ValueError:
1730
1731                        source['interval'] = 15
1732                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1733
1734                    self.sources.append( source )
1735
1736        readcfg.close()
1737
1738    def clusterExists( self, source_name ):
1739
1740        for source in self.sources:
1741
1742            if source['name'] == source_name:
1743
1744                return True
1745
1746        return False
1747
1748    def getInterval( self, source_name ):
1749        """Return interval for source_name"""
1750
1751        for source in self.sources:
1752
1753            if source['name'] == source_name:
1754
1755                return source['interval']
1756
1757        return None
1758
1759    def getLowestInterval( self ):
1760        """Return the lowest interval of all clusters"""
1761
1762        lowest_interval = 0
1763
1764        for source in self.sources:
1765
1766            if not lowest_interval or source['interval'] <= lowest_interval:
1767
1768                lowest_interval = source['interval']
1769
1770        # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1771        if lowest_interval:
1772            return lowest_interval
1773        else:
1774            return 15
1775
1776class RRDHandler:
1777    """Class for handling RRD activity"""
1778
1779    def __init__( self, config, cluster ):
1780        """Setup initial variables"""
1781
1782        global MODRRDTOOL, ARCHIVE_EXCLUDE_METRICS
1783
1784        self.block   = 0
1785        self.cluster = cluster
1786        self.config  = config
1787        self.slot    = threading.Lock()
1788        self.myMetrics   = { }
1789        self.lastStored  = { }
1790        self.timeserials = { }
1791
1792        if MODRRDTOOL:
1793
1794            self.rrdm    = RRDMutator()
1795        else:
1796            self.rrdm    = RRDMutator( RRDTOOL )
1797
1798        global DEBUG_LEVEL
1799
1800        if DEBUG_LEVEL <= 0:
1801            self.gatherLastUpdates()
1802
1803        self.excludes        = [ ]
1804
1805        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1806
1807            self.excludes.append( re.compile( ex_metricstr ) )
1808
1809    def gatherLastUpdates( self ):
1810        """Populate the lastStored list, containing timestamps of all last updates"""
1811
1812        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1813
1814        hosts = [ ]
1815
1816        if os.path.exists( cluster_dir ):
1817
1818            dirlist = os.listdir( cluster_dir )
1819
1820            for dir in dirlist:
1821
1822                hosts.append( dir )
1823
1824        for host in hosts:
1825
1826            host_dir    = cluster_dir + '/' + host
1827            dirlist     = os.listdir( host_dir )
1828
1829            for dir in dirlist:
1830
1831                if not self.timeserials.has_key( host ):
1832
1833                    self.timeserials[ host ] = [ ]
1834
1835                self.timeserials[ host ].append( dir )
1836
1837            last_serial = self.getLastRrdTimeSerial( host )
1838
1839            if last_serial:
1840
1841                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1842
1843                if os.path.exists( metric_dir ):
1844
1845                    dirlist = os.listdir( metric_dir )
1846
1847                    for file in dirlist:
1848
1849                        metricname = file.split( '.rrd' )[0]
1850
1851                        if not self.lastStored.has_key( host ):
1852
1853                            self.lastStored[ host ] = { }
1854
1855                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1856
1857    def getClusterName( self ):
1858        """Return clustername"""
1859
1860        return self.cluster
1861
1862    def memMetric( self, host, metric ):
1863        """Store metric from host in memory"""
1864
1865        # <ATOMIC>
1866        #
1867       
1868        #if host in self.myMetrics:
1869
1870            #if self.myMetrics[ host ].has_key( metric['name'] ):
1871
1872            #    if len( self.myMetrics[ host ][ metric['name'] ] ) > 0:
1873     
1874            #        if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
1875
1876            #            return 1
1877                #for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1878
1879                #    if mymetric['time'] == metric['time']:
1880
1881                        # Allready have this metric, abort
1882                #        return 1
1883            #else:
1884            #if metric['name'] not in self.myMetrics[ host ]:
1885            #    self.myMetrics[ host ][ metric['name'] ] = deque()
1886        #else:
1887        #    self.myMetrics[ host ]                   = { }
1888        #    self.myMetrics[ host ][ metric['name'] ] = deque()
1889
1890        # Push new metric onto stack
1891        # atomic code; only 1 thread at a time may access the stack
1892        #self.slot.acquire()
1893
1894        try:
1895            host_metrics = self.myMetrics[ host ]
1896        except KeyError:
1897            self.myMetrics[ host ] = { }
1898            host_metrics = self.myMetrics[ host ]
1899
1900        try:
1901            metric_values = self.myMetrics[ host ][ metric['name'] ]
1902        except KeyError:
1903            self.myMetrics[ host ][ metric['name'] ] = deque()
1904            metric_values = self.myMetrics[ host ][ metric['name'] ]
1905
1906        try:
1907            if metric['time'] <= self.myMetrics[ host ][ metric['name'] ][-1]['time']:
1908                return 1
1909        except (IndexError, KeyError):
1910            pass
1911
1912        self.myMetrics[ host ][ metric['name'] ].append( metric )
1913
1914        #self.slot.release()
1915        #
1916        # </ATOMIC>
1917
1918    def makeUpdateList( self, host, metriclist ):
1919        """
1920        Make a list of update values for rrdupdate
1921        but only those that we didn't store before
1922        """
1923
1924        update_list    = [ ]
1925        metric        = None
1926
1927        while len( metriclist ) > 0:
1928
1929            metric = metriclist.pop( 0 )
1930
1931            if self.checkStoreMetric( host, metric ):
1932
1933                u_val    = str( metric['time'] ) + ':' + str( metric['val'] )
1934                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1935                update_list.append( u_val )
1936
1937        return update_list
1938
1939    def checkStoreMetric( self, host, metric ):
1940        """Check if supplied metric if newer than last one stored"""
1941
1942        if self.lastStored.has_key( host ):
1943
1944            if self.lastStored[ host ].has_key( metric['name'] ):
1945
1946                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1947
1948                    # This is old
1949                    return 0
1950
1951        return 1
1952
1953    def memLastUpdate( self, host, metricname, metriclist ):
1954        """
1955        Memorize the time of the latest metric from metriclist
1956        but only if it wasn't allready memorized
1957        """
1958
1959        if not self.lastStored.has_key( host ):
1960            self.lastStored[ host ] = { }
1961
1962        last_update_time = 0
1963
1964        for metric in metriclist:
1965
1966            if metric['name'] == metricname:
1967
1968                if metric['time'] > last_update_time:
1969
1970                    last_update_time = metric['time']
1971
1972        if self.lastStored[ host ].has_key( metricname ):
1973           
1974            if last_update_time <= self.lastStored[ host ][ metricname ]:
1975                return 1
1976
1977        self.lastStored[ host ][ metricname ] = last_update_time
1978
1979    def storeMetrics( self ):
1980        """
1981        Store all metrics from memory to disk
1982        and do it to the RRD's in appropriate timeperiod directory
1983        """
1984
1985        debug_msg( 5, "Entering storeMetrics(): cluster %s" %self.cluster )
1986
1987        count_values  = 0
1988        count_metrics = 0
1989        count_bits    = 0
1990
1991        for hostname, mymetrics in self.myMetrics.items():   
1992
1993            for metricname, mymetric in mymetrics.items():
1994
1995                count_metrics += 1
1996
1997                for dmetric in mymetric:
1998
1999                    count_values += 1
2000
2001                    count_bits   += len( dmetric['time'] )
2002                    count_bits   += len( dmetric['val'] )
2003
2004        count_bytes    = count_bits / 8
2005
2006        debug_msg( 1, "size of cluster '" + self.cluster + "': " + 
2007            str( len( self.myMetrics.keys() ) ) + " hosts " + 
2008            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
2009            str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
2010
2011        for hostname, mymetrics in self.myMetrics.items():   
2012
2013            for metricname, mymetric in mymetrics.items():
2014
2015                for e in self.excludes:
2016
2017                    if e.match( metricname ):
2018
2019                        del self.myMetrics[ hostname ][ metricname ]
2020                        continue
2021
2022                metrics_to_store = [ ]
2023
2024                # Pop metrics from stack for storing until none is left
2025                # atomic code: only 1 thread at a time may access myMetrics
2026
2027                # <ATOMIC>
2028                #
2029                #self.slot.acquire()
2030
2031                if metricname in self.myMetrics[ hostname ]:
2032
2033                    while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
2034
2035                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
2036
2037                            try:
2038                                metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].popleft() )
2039                            except IndexError, msg:
2040
2041                                # Somehow sometimes myMetrics[ hostname ][ metricname ]
2042                                # is still len 0 when the statement is executed.
2043                                # Just ignore indexerror's..
2044                                pass
2045
2046                #self.slot.release()
2047                #
2048                # </ATOMIC>
2049
2050                # Create a mapping table, each metric to the period where it should be stored
2051                #
2052                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
2053
2054                update_rets = [ ]
2055
2056                for period, pmetric in metric_serial_table.items():
2057
2058                    create_ret = self.createCheck( hostname, metricname, period )   
2059
2060                    update_ret = self.update( hostname, metricname, period, pmetric )
2061
2062                    if update_ret == 0:
2063
2064                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
2065                    else:
2066                        debug_msg( 9, 'metric update failed' )
2067
2068                    update_rets.append( create_ret )
2069                    update_rets.append( update_ret )
2070
2071                # Lets ignore errors here for now, we need to make sure last update time
2072                # is correct!
2073                #
2074                #if not (1) in update_rets:
2075
2076                self.memLastUpdate( hostname, metricname, metrics_to_store )
2077
2078        debug_msg( 5, "Leaving storeMetrics(): cluster %s" %self.cluster )
2079
2080    def makeTimeSerial( self ):
2081        """Generate a time serial. Seconds since epoch"""
2082
2083        # Seconds since epoch
2084        mytime = int( time.time() )
2085
2086        return mytime
2087
2088    def makeRrdPath( self, host, metricname, timeserial ):
2089        """Make a RRD location/path and filename"""
2090
2091        rrd_dir  = '%s/%s/%s/%s'    %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
2092        rrd_file = '%s/%s.rrd'    %( rrd_dir, metricname )
2093
2094        return rrd_dir, rrd_file
2095
2096    def getLastRrdTimeSerial( self, host ):
2097        """Find the last timeserial (directory) for this host"""
2098
2099        newest_timeserial = 0
2100
2101        for dir in self.timeserials[ host ]:
2102
2103            valid_dir = 1
2104
2105            for letter in dir:
2106                if letter not in string.digits:
2107                    valid_dir = 0
2108
2109            if valid_dir:
2110                timeserial = dir
2111                if timeserial > newest_timeserial:
2112                    newest_timeserial = timeserial
2113
2114        if newest_timeserial:
2115            return newest_timeserial
2116        else:
2117            return 0
2118
2119    def determinePeriod( self, host, check_serial ):
2120        """Determine to which period (directory) this time(serial) belongs"""
2121
2122        period_serial = 0
2123
2124        if self.timeserials.has_key( host ):
2125
2126            for serial in self.timeserials[ host ]:
2127
2128                if check_serial >= serial and period_serial < serial:
2129
2130                    period_serial = serial
2131
2132        return period_serial
2133
2134    def determineSerials( self, host, metricname, metriclist ):
2135        """
2136        Determine the correct serial and corresponding rrd to store
2137        for a list of metrics
2138        """
2139
2140        metric_serial_table = { }
2141
2142        for metric in metriclist:
2143
2144            if metric['name'] == metricname:
2145
2146                period       = self.determinePeriod( host, metric['time'] )   
2147
2148                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
2149
2150                if (int( metric['time'] ) - int( period ) ) > archive_secs:
2151
2152                    # This one should get it's own new period
2153                    period = metric['time']
2154
2155                    if not self.timeserials.has_key( host ):
2156                        self.timeserials[ host ] = [ ]
2157
2158                    self.timeserials[ host ].append( period )
2159
2160                if not metric_serial_table.has_key( period ):
2161
2162                    metric_serial_table[ period ] = [ ]
2163
2164                metric_serial_table[ period ].append( metric )
2165
2166        return metric_serial_table
2167
2168    def createCheck( self, host, metricname, timeserial ):
2169        """Check if an rrd allready exists for this metric, create if not"""
2170
2171        debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
2172       
2173        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
2174
2175        if not os.path.exists( rrd_dir ):
2176
2177            try:
2178                os.makedirs( rrd_dir )
2179
2180            except os.OSError, msg:
2181
2182                if msg.find( 'File exists' ) != -1:
2183
2184                    # Ignore exists errors
2185                    pass
2186
2187                else:
2188
2189                    print msg
2190                    return
2191
2192            debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
2193
2194        if not os.path.exists( rrd_file ):
2195
2196            interval     = self.config.getInterval( self.cluster )
2197            heartbeat    = 8 * int( interval )
2198
2199            params       = [ ]
2200
2201            params.append( '--step' )
2202            params.append( str( interval ) )
2203
2204            params.append( '--start' )
2205            params.append( str( int( timeserial ) - 1 ) )
2206
2207            params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
2208            params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
2209
2210            self.rrdm.create( str(rrd_file), params )
2211
2212            debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
2213
2214    def update( self, host, metricname, timeserial, metriclist ):
2215        """
2216        Update rrd file for host with metricname
2217        in directory timeserial with metriclist
2218        """
2219
2220        debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
2221
2222        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
2223
2224        update_list       = self.makeUpdateList( host, metriclist )
2225
2226        if len( update_list ) > 0:
2227            ret = self.rrdm.update( str(rrd_file), update_list )
2228
2229            if ret:
2230                return 1
2231       
2232            debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
2233
2234        return 0
2235
2236def daemon():
2237    """daemonized threading"""
2238
2239    # Fork the first child
2240    #
2241    pid = os.fork()
2242
2243    if pid > 0:
2244
2245        sys.exit(0)  # end parent
2246
2247    # creates a session and sets the process group ID
2248    #
2249    os.setsid()
2250
2251    # Fork the second child
2252    #
2253    pid = os.fork()
2254
2255    if pid > 0:
2256
2257        sys.exit(0)  # end parent
2258
2259    write_pidfile()
2260
2261    # Go to the root directory and set the umask
2262    #
2263    os.chdir('/')
2264    os.umask(0)
2265
2266    sys.stdin.close()
2267    sys.stdout.close()
2268    sys.stderr.close()
2269
2270    os.open('/dev/null', os.O_RDWR)
2271    os.dup2(0, 1)
2272    os.dup2(0, 2)
2273
2274    run()
2275
2276def run():
2277    """Threading start"""
2278
2279    global ARCHIVE_DATASOURCES
2280
2281    config             = GangliaConfigParser( GMETAD_CONF )
2282
2283    for ds in ARCHIVE_DATASOURCES:
2284
2285        if not config.clusterExists( ds ):
2286
2287            print "FATAL ERROR: Data source with name '%s' not found in %s" %( ds, GMETAD_CONF )
2288            sys.exit( 1 )
2289
2290    s_timeout          = int( config.getLowestInterval() - 1 )
2291
2292    socket.setdefaulttimeout( s_timeout )
2293
2294    myXMLSource        = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
2295    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
2296
2297    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
2298
2299    myGangliaProcessors= [ ]
2300
2301    for archive_cluster in ARCHIVE_DATASOURCES:
2302
2303        myGangliaProcessors.append( GangliaXMLProcessor( myXMLSource, myDataStore, archive_cluster ) )
2304
2305    ganglia_xml_threads = [ ]
2306
2307    try:
2308        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
2309
2310        t = 0
2311
2312        for ganglia_processor in myGangliaProcessors:
2313
2314            ganglia_xml_threads.append( threading.Thread( None, ganglia_processor.run, 'ganglia_proc_thread' + str(t) ) )
2315
2316            t = t + 1
2317
2318        job_xml_thread.start()
2319
2320        for t in ganglia_xml_threads:
2321
2322            t.start()
2323       
2324    except thread.error, msg:
2325        debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
2326        syslog.closelog()
2327        sys.exit(1)
2328       
2329    debug_msg( 0, 'main threading started.' )
2330
2331def main():
2332    """Program startup"""
2333
2334    global DAEMONIZE, USE_SYSLOG
2335
2336    if not processArgs( sys.argv[1:] ):
2337        sys.exit( 1 )
2338
2339    if( DAEMONIZE and USE_SYSLOG ):
2340        syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2341
2342    if DAEMONIZE:
2343        daemon()
2344    else:
2345        run()
2346
2347#
2348# Global functions
2349#
2350
2351def check_dir( directory ):
2352    """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
2353
2354    if directory[-1] == '/':
2355        directory = directory[:-1]
2356
2357    return directory
2358
2359def reqtime2epoch( rtime ):
2360
2361    (hours, minutes, seconds )    = rtime.split( ':' )
2362
2363    etime    = int(seconds)
2364    etime    = etime + ( int(minutes) * 60 )
2365    etime    = etime + ( int(hours) * 60 * 60 )
2366
2367    return etime
2368
2369def debug_msg( level, msg ):
2370    """Only print msg if correct levels"""
2371
2372    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2373        sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2374   
2375    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2376        syslog.syslog( msg )
2377
2378def printTime( ):
2379    """Print current time in human readable format"""
2380
2381    return time.strftime("%a %d %b %Y %H:%M:%S")
2382
2383def write_pidfile():
2384
2385    # Write pidfile if PIDFILE exists
2386    if PIDFILE:
2387
2388        pid     = os.getpid()
2389
2390        pidfile = open(PIDFILE, 'w')
2391
2392        pidfile.write( str( pid ) )
2393        pidfile.close()
2394
2395# Ooohh, someone started me! Let's go..
2396#
2397if __name__ == '__main__':
2398    main()
Note: See TracBrowser for help on using the repository browser.