source: branches/1.0/jobarchived/jobarchived.py

Last change on this file was 913, checked in by olahaye, 9 years ago

[rpm&deb packaging] Now fixes the VERSION outside current directory (can be SVN)
This avoids .in files and let generate tarballs and packages (binary and sources) without any VERSION values.
make deb or rpm or install even from svn is now safe from "sed -i -e"

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 61.9 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 913 2013-05-22 17:00:22Z olahaye $
22#
23
24import getopt, syslog, ConfigParser, sys
25
26VERSION='__VERSION__'
27
28def usage( ver ):
29
30    print 'jobarchived %s' %VERSION
31
32    if ver:
33        return 0
34
35    print
36    print 'Purpose:'
37    print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
38    print '  and node statistics in a RRD archive'
39    print
40    print 'Usage:    jobarchived [OPTIONS]'
41    print
42    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobarchived.conf)'
43    print '  -p, --pidfile=FILE    Use pid file to store the process id'
44    print '  -h, --help        Print help and exit'
45    print '  -v, --version        Print version and exit'
46    print
47
48def processArgs( args ):
49
50    SHORT_L   = 'p:hvc:'
51    LONG_L    = [ 'help', 'config=', 'pidfile=', 'version' ]
52
53    config_filename = '/etc/jobarchived.conf'
54
55    global PIDFILE
56
57    PIDFILE   = None
58
59    try:
60
61        opts, args = getopt.getopt( args, SHORT_L, LONG_L )
62
63    except getopt.error, detail:
64
65        print detail
66        sys.exit(1)
67
68    for opt, value in opts:
69
70        if opt in [ '--config', '-c' ]:
71
72            config_filename = value
73
74        if opt in [ '--pidfile', '-p' ]:
75
76            PIDFILE         = value
77
78        if opt in [ '--help', '-h' ]:
79
80            usage( False )
81            sys.exit( 0 )
82
83        if opt in [ '--version', '-v' ]:
84
85            usage( True )
86            sys.exit( 0 )
87
88    try:
89        return loadConfig( config_filename )
90
91    except ConfigParser.NoOptionError, detail:
92
93        print detail
94        sys.exit( 1 )
95
96def loadConfig( filename ):
97
98    def getlist( cfg_string ):
99
100        my_list = [ ]
101
102        for item_txt in cfg_string.split( ',' ):
103
104            sep_char = None
105
106            item_txt = item_txt.strip()
107
108            for s_char in [ "'", '"' ]:
109
110                if item_txt.find( s_char ) != -1:
111
112                    if item_txt.count( s_char ) != 2:
113
114                        print 'Missing quote: %s' %item_txt
115                        sys.exit( 1 )
116
117                    else:
118
119                        sep_char = s_char
120                        break
121
122            if sep_char:
123
124                item_txt = item_txt.split( sep_char )[1]
125
126            my_list.append( item_txt )
127
128        return my_list
129
130    cfg = ConfigParser.ConfigParser()
131
132    cfg.read( filename )
133
134    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
135    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
136    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL, JOB_SQL_PASSWORD, JOB_SQL_USER
137
138    ARCHIVE_PATH           = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
139
140    ARCHIVE_HOURS_PER_RRD  = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
141
142    DEBUG_LEVEL            = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
143
144    USE_SYSLOG             = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
145
146    SYSLOG_LEVEL           = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
147
148    MODRRDTOOL             = False
149
150    try:
151        global rrdtool
152        import rrdtool
153
154        MODRRDTOOL        = True
155
156    except ImportError:
157
158        MODRRDTOOL        = False
159
160        print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
161
162        RRDTOOL            = cfg.get( 'DEFAULT', 'RRDTOOL' )
163
164    try:
165
166        SYSLOG_FACILITY    = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
167
168    except AttributeError, detail:
169
170        print 'Unknown syslog facility'
171        sys.exit( 1 )
172
173    GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
174
175    ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
176
177    ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
178
179    ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
180
181    JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
182    JOB_SQL_USER            = cfg.get( 'DEFAULT', 'JOB_SQL_USER' )
183    JOB_SQL_PASSWORD        = cfg.get( 'DEFAULT', 'JOB_SQL_PASSWORD' )
184
185    JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
186
187    DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
188
189    return True
190
191# What XML data types not to store
192#
193UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
194
195# Maximum time (in seconds) a parsethread may run
196#
197PARSE_TIMEOUT = 60
198
199# Maximum time (in seconds) a storethread may run
200#
201STORE_TIMEOUT = 360
202
203"""
204The Job Archiving Daemon
205"""
206
207from types import *
208
209import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
210
211try:
212    import psycopg2
213
214except ImportError, details:
215
216    print "FATAL ERROR: psycopg2 python module not found"
217    sys.exit( 1 )
218
219class InitVars:
220        Vars = {}
221       
222        def __init__(self, **key_arg):
223                for (key, value) in key_arg.items():
224                        if value:
225                                self.Vars[key] = value
226                        else:   
227                                self.Vars[key] = None
228                               
229        def __call__(self, *key):
230                key = "%s" % key
231                return self.Vars[key]
232               
233        def __getitem__(self, key):
234                return self.Vars[key]
235               
236        def __repr__(self):
237                return repr(self.Vars)
238               
239        def keys(self):
240                barf =  map(None, self.Vars.keys())
241                return barf
242               
243        def values(self):
244                barf =  map(None, self.Vars.values())
245                return barf
246               
247        def has_key(self, key):
248                if self.Vars.has_key(key):
249                        return 1
250                else:   
251                        return 0
252                       
253class DBError(Exception):
254        def __init__(self, msg=''):
255                self.msg = msg
256                Exception.__init__(self, msg)
257        def __repr__(self):
258                return self.msg
259        __str__ = __repr__
260
261#
262# Class to connect to a database
263# and return the queury in a list or dictionairy.
264#
265class DB:
266    def __init__(self, db_vars):
267
268        self.dict = db_vars
269
270        if self.dict.has_key('User'):
271            self.user = self.dict['User']
272        else:
273            self.user = 'postgres'
274
275        if self.dict.has_key('Host'):
276            self.host = self.dict['Host']
277        else:
278            self.host = 'localhost'
279
280        if self.dict.has_key('Password'):
281            self.passwd = self.dict['Password']
282        else:
283            self.passwd = ''
284
285        if self.dict.has_key('DataBaseName'):
286            self.db = self.dict['DataBaseName']
287        else:
288            self.db = 'jobarchive'
289
290        # connect_string = 'host:port:database:user:password:
291        dsn = "host='%s' dbname='%s' user='%s' password='%s'" %(self.host, self.db, self.user, self.passwd)
292
293        try:
294            self.SQL = psycopg2.connect(dsn)
295        except psycopg2.Error, details:
296            str = "%s" %details
297            raise DBError(str)
298
299    def __repr__(self):
300        return repr(self.result)
301
302    def __nonzero__(self):
303        return not(self.result == None)
304
305    def __len__(self):
306        return len(self.result)
307
308    def __getitem__(self,i):
309        return self.result[i]
310
311    def __getslice__(self,i,j):
312        return self.result[i:j]
313
314    def Get(self, q_str):
315        c = self.SQL.cursor()
316        try:
317            c.execute(q_str)
318            result = c.fetchall()
319        except psycopg2.Error, details:
320            c.close()
321            str = "%s" %details
322            raise DBError(str)
323
324        c.close()
325        return result
326
327    def Set(self, q_str):
328        c = self.SQL.cursor()
329        try:
330            c.execute(q_str)
331
332        except psycopg2.Error, details:
333            c.close()
334            str = "%s" %details
335            raise DBError(str)
336
337        c.close()
338        return True
339
340    def Commit(self):
341
342        return self.SQL.commit()
343
344    def Rollback( self ):
345
346        return self.SQL.rollback()
347
348class DataSQLStore:
349
350    db_vars = None
351    dbc = None
352
353    def __init__( self, hostname, database ):
354
355        global JOB_SQL_USER, JOB_SQL_PASSWORD
356
357        self.db_vars = InitVars(DataBaseName=database,
358                User=JOB_SQL_USER,
359                Host=hostname,
360                Password=JOB_SQL_PASSWORD,
361                Dictionary='true')
362
363        try:
364            self.dbc     = DB(self.db_vars)
365        except DBError, details:
366            debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
367            sys.exit(1)
368
369    def setDatabase(self, statement):
370
371        ret = self.doDatabase('set', statement)
372        return ret
373       
374    def getDatabase(self, statement):
375
376        ret = self.doDatabase('get', statement)
377        return ret
378
379    def doCommit( self ):
380
381        return self.dbc.Commit()
382
383    def doRollback( self ):
384
385        return self.dbc.Rollback()
386
387    def doDatabase(self, type, statement):
388
389        debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
390        try:
391            if type == 'set':
392                result = self.dbc.Set( statement )
393            elif type == 'get':
394                result = self.dbc.Get( statement )
395               
396        except DBError, detail:
397            operation = statement.split(' ')[0]
398            debug_msg( 0, 'ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
399            return False
400
401        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
402        return result
403
404    def getJobNodeId( self, job_id, node_id ):
405
406        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
407        if not id:
408            return False
409
410        if len( id ) > 0:
411
412            if len( id[0] ) > 0 and id[0] != '':
413           
414                return True
415
416        return False
417
418    def getNodeId( self, hostname ):
419
420        id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
421
422        if len( id ) > 0:
423
424            id = id[0][0]
425
426            return id
427        else:
428            return None
429
430    def getNodeIds( self, hostnames ):
431
432        ids = [ ]
433
434        for node in hostnames:
435
436            id = self.getNodeId( node )
437
438            if id:
439                ids.append( id )
440
441        return ids
442
443    def getJobId( self, jobid ):
444
445        id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
446
447        if id:
448            id = id[0][0]
449
450            return id
451        else:
452            return None
453
454    def addJob( self, job_id, jobattrs ):
455
456        if not self.getJobId( job_id ):
457
458            return self.mutateJob( 'insert', job_id, jobattrs ) 
459        else:
460            return self.mutateJob( 'update', job_id, jobattrs )
461
462    def mutateJob( self, action, job_id, jobattrs ):
463
464        job_values     = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
465
466        insert_col_str = 'job_id'
467        insert_val_str = "'%s'" %job_id
468        update_str     = None
469
470        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
471
472        ids = [ ]
473
474        for valname, value in jobattrs.items():
475
476            if valname in job_values and value != '':
477
478                column_name = 'job_' + valname
479
480                if action == 'insert':
481
482                    if not insert_col_str:
483                        insert_col_str = column_name
484                    else:
485                        insert_col_str = insert_col_str + ',' + column_name
486
487                    if not insert_val_str:
488                        insert_val_str = value
489                    else:
490                        insert_val_str = insert_val_str + ",'%s'" %value
491
492                elif action == 'update':
493                   
494                    if not update_str:
495                        update_str = "%s='%s'" %(column_name, value)
496                    else:
497                        update_str = update_str + ",%s='%s'" %(column_name, value)
498
499            elif valname == 'nodes' and value:
500
501                node_valid = 1
502
503                if len(value) == 1:
504               
505                    if jobattrs['status'] == 'Q':
506
507                        node_valid = 0
508
509                    else:
510
511                        node_valid = 0
512
513                        for node_char in str(value[0]):
514
515                            if string.find( string.digits, node_char ) != -1 and not node_valid:
516
517                                node_valid = 1
518
519                if node_valid:
520
521                    ids = self.addNodes( value, jobattrs['domain'] )
522
523        if action == 'insert':
524
525            db_ret = self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
526
527        elif action == 'update':
528
529            db_ret = self.setDatabase( "UPDATE jobs SET %s WHERE job_id='%s'" %(update_str, job_id) )
530
531        if len( ids ) > 0:
532            self.addJobNodes( job_id, ids )
533
534        return db_ret
535
536    def addNodes( self, hostnames, domain ):
537
538        ids = [ ]
539
540        for node in hostnames:
541
542            node    = '%s.%s' %( node, domain )
543            id    = self.getNodeId( node )
544   
545            if not id:
546                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
547                id = self.getNodeId( node )
548
549            ids.append( id )
550
551        return ids
552
553    def addJobNodes( self, jobid, nodes ):
554
555        for node in nodes:
556
557            if not self.getJobNodeId( jobid, node ):
558
559                self.addJobNode( jobid, node )
560
561    def addJobNode( self, jobid, nodeid ):
562
563        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( '%s',%s )" %(jobid, nodeid) )
564
565    def storeJobInfo( self, jobid, jobattrs ):
566
567        return self.addJob( jobid, jobattrs )
568
569    def checkTimedoutJobs( self ):
570
571        debug_msg( 1, 'Housekeeping: checking database for timed out jobs..' )
572
573        # Locate all jobs in the database that are not set to finished
574        #
575        q = "SELECT * from jobs WHERE job_status != 'F'"
576
577        r = self.getDatabase( q )
578
579        if len( r ) == 0:
580
581            return None
582
583        timeoutjobs  = [ ]
584
585        jobtimeout_sec = JOB_TIMEOUT * (60 * 60)
586        cur_time       = time.time()
587
588        for row in r:
589
590            job_id              = row[0]
591            job_requested_time  = row[4]
592            job_status          = row[7]
593            job_start_timestamp = row[8]
594
595            # If it was set to queued and we didn't see it started
596            # there's not point in keeping it around
597            #
598            if job_status == 'R' and job_start_timestamp:
599
600                start_timestamp = int( job_start_timestamp )
601
602                # If it was set to running longer than JOB_TIMEOUT
603                # close the job: it probably finished while we were not running
604                #
605                if ( cur_time - start_timestamp ) > jobtimeout_sec:
606
607                    if job_requested_time:
608
609                        rtime_epoch    = reqtime2epoch( job_requested_time )
610                    else:
611                        rtime_epoch    = None
612                   
613                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
614
615        debug_msg( 1, 'Housekeeping: Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
616
617        ret_jobids_clean = [ ]
618
619        # Close these jobs in the database
620        # update the stop_timestamp to: start_timestamp + requested wallclock
621        # and set state: finished
622        #
623        for j in timeoutjobs:
624
625            ( i, s, r )        = j
626
627            if r:
628                new_end_timestamp    = int( s ) + r
629
630                q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
631                self.setDatabase( q )
632            else:
633
634                # Requested walltime unknown: cannot guess end time: sorry delete them
635                q = "DELETE FROM jobs WHERE job_id = '" + str( i ) + "'"
636                self.setDatabase( q )
637
638            ret_jobids_clean.append( i )
639
640        debug_msg( 1, 'Housekeeping: done.' )
641
642        return ret_jobids_clean
643
644    def checkStaleJobs( self ):
645
646        debug_msg( 1, 'Housekeeping: checking database for stale jobs..' )
647
648        # Locate all jobs in the database that are not set to finished
649        #
650        q = "SELECT * from jobs WHERE job_status != 'F'"
651
652        r = self.getDatabase( q )
653
654        if len( r ) == 0:
655
656            return None
657
658        cleanjobs      = [ ]
659
660        cur_time       = time.time()
661
662        for row in r:
663
664            job_id              = row[0]
665            job_requested_time  = row[4]
666            job_status          = row[7]
667            job_start_timestamp = row[8]
668
669            # If it was set to queued and we didn't see it started
670            # there's not point in keeping it around
671            #
672            if job_status == 'Q' or not job_start_timestamp:
673
674                cleanjobs.append( job_id )
675
676        debug_msg( 1, 'Housekeeping: Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
677
678        # Purge these from database
679        #
680        for j in cleanjobs:
681
682            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
683            self.setDatabase( q )
684
685        debug_msg( 1, 'Housekeeping: done.' )
686
687        return cleanjobs
688
689class RRDMutator:
690    """A class for performing RRD mutations"""
691
692    binary = None
693
694    def __init__( self, binary=None ):
695        """Set alternate binary if supplied"""
696
697        if binary:
698            self.binary = binary
699
700    def create( self, filename, args ):
701        """Create a new rrd with args"""
702
703        global MODRRDTOOL
704
705        if MODRRDTOOL:
706            return self.perform( 'create', filename, args )
707        else:
708            return self.perform( 'create', '"' + filename + '"', args )
709
710    def update( self, filename, args ):
711        """Update a rrd with args"""
712
713        global MODRRDTOOL
714
715        if MODRRDTOOL:
716            return self.perform( 'update', filename, args )
717        else:
718            return self.perform( 'update', '"' + filename + '"', args )
719
720    def grabLastUpdate( self, filename ):
721        """Determine the last update time of filename rrd"""
722
723        global MODRRDTOOL
724
725        last_update = 0
726
727        # Use the py-rrdtool module if it's available on this system
728        #
729        if MODRRDTOOL:
730
731            debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
732
733            rrd_header     = { }
734
735            try:
736                rrd_header    = rrdtool.info( filename )
737            except rrdtool.error, msg:
738                debug_msg( 8, str( msg ) )
739
740            if rrd_header.has_key( 'last_update' ):
741                return last_update
742            else:
743                return 0
744
745        # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
746        # DEPRECATED (slow!)
747        #
748        else:
749            debug_msg( 8, self.binary + ' info ' + filename )
750
751            my_pipe        = os.popen( self.binary + ' info "' + filename + '"' )
752
753            for line in my_pipe.readlines():
754
755                if line.find( 'last_update') != -1:
756
757                    last_update = line.split( ' = ' )[1]
758
759            if my_pipe:
760
761                my_pipe.close()
762
763            if last_update:
764                return last_update
765            else:
766                return 0
767
768
769    def perform( self, action, filename, args ):
770        """Perform action on rrd filename with args"""
771
772        global MODRRDTOOL
773
774        arg_string = None
775
776        if type( args ) is not ListType:
777            debug_msg( 8, 'Arguments needs to be of type List' )
778            return 1
779
780        for arg in args:
781
782            if not arg_string:
783
784                arg_string = arg
785            else:
786                arg_string = arg_string + ' ' + arg
787
788        if MODRRDTOOL:
789
790            debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
791
792            try:
793                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
794
795                if action == 'create':
796
797                    rrdtool.create( str( filename ), *args )
798
799                elif action == 'update':
800
801                    rrdtool.update( str( filename ), *args )
802
803            except rrdtool.error, msg:
804
805                error_msg = str( msg )
806                debug_msg( 8, error_msg )
807                return 1
808
809        else:
810
811            debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
812
813            cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
814            lines   = cmd.readlines()
815
816            cmd.close()
817
818            for line in lines:
819
820                if line.find( 'ERROR' ) != -1:
821
822                    error_msg = string.join( line.split( ' ' )[1:] )
823                    debug_msg( 8, error_msg )
824                    return 1
825
826        return 0
827
828class XMLProcessor:
829    """Skeleton class for XML processor's"""
830
831    def run( self ):
832        """Do main processing of XML here"""
833
834        pass
835
836class JobXMLProcessor( XMLProcessor ):
837    """Main class for processing XML and acting with it"""
838
839    def __init__( self, XMLSource, DataStore ):
840        """Setup initial XML connection and handlers"""
841
842        self.myXMLSource  = XMLSource
843        self.myXMLHandler = JobXMLHandler( DataStore )
844        self.myXMLError   = XMLErrorHandler()
845
846        self.config       = GangliaConfigParser( GMETAD_CONF )
847
848        self.kill_thread  = False
849
850    def killThread( self ):
851
852        self.kill_thread  = True
853
854    def run( self ):
855        """Main XML processing"""
856
857        debug_msg( 1, 'job_xml_thread(): started.' )
858
859        while( 1 ):
860
861            debug_msg( 1, 'job_xml_thread(): Retrieving XML data..' )
862
863            my_data    = self.myXMLSource.getData()
864
865            debug_msg( 1, 'job_xml_thread(): Done retrieving: data size %d' %len(my_data) )
866
867            if my_data:
868                debug_msg( 1, 'job_xml_thread(): Parsing XML..' )
869
870                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
871
872                debug_msg( 1, 'job_xml_thread(): Done parsing.' )
873            else:
874                debug_msg( 1, 'job_xml_thread(): Got no data.' )
875
876            if self.kill_thread:
877
878                debug_msg( 1, 'job_xml_thread(): killed.' )
879                return None
880               
881            debug_msg( 1, 'job_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
882            time.sleep( self.config.getLowestInterval() )
883
884class JobXMLHandler( xml.sax.handler.ContentHandler ):
885    """Parse Job's jobinfo XML from our plugin"""
886
887    def __init__( self, datastore ):
888
889        self.ds              = datastore
890        self.jobs_processed  = [ ]
891        self.jobs_to_store   = [ ]
892        self.jobAttrs        = { }
893        self.jobAttrsSaved   = { }
894
895        self.iteration       = 0
896
897        self.ds.checkTimedoutJobs()
898        self.ds.checkStaleJobs()
899
900        debug_msg( 1, "XML: Handler created" )
901
902    def startDocument( self ):
903
904        self.jobs_processed = [ ]
905        self.heartbeat      = 0
906        self.elementct      = 0
907        self.iteration      = self.iteration + 1
908
909        if self.iteration > 20:
910
911            timedout_jobs = self.ds.checkTimedoutJobs()
912            self.iteration = 0
913
914            for j in timedout_jobs:
915
916                del self.jobAttrs[ j ]
917                del self.jobAttrsSaved[ j ]
918
919        debug_msg( 1, "XML: Start document: iteration %s" %str(self.iteration) )
920
921    def startElement( self, name, attrs ):
922        """
923        This XML will be all gmetric XML
924        so there will be no specific start/end element
925        just one XML statement with all info
926        """
927
928        jobinfo = { }
929
930        self.elementct    += 1
931
932        if name == 'CLUSTER':
933
934            self.clustername = str( attrs.get( 'NAME', "" ) )
935
936        elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
937
938            metricname = str( attrs.get( 'NAME', "" ) )
939
940            if metricname == 'zplugin_monarch_heartbeat':
941
942                self.heartbeat = str( attrs.get( 'VAL', "" ) )
943
944            elif metricname.find( 'zplugin_monarch_job' ) != -1:
945
946                job_id  = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[1]
947                val     = str( attrs.get( 'VAL', "" ) )
948
949                valinfo = val.split( ' ' )
950
951                for myval in valinfo:
952
953                    if len( myval.split( '=' ) ) > 1:
954
955                        valname = myval.split( '=' )[0]
956                        value   = myval.split( '=' )[1]
957
958                        if valname == 'nodes':
959
960                            value = value.split( ';' )
961
962                        jobinfo[ valname ] = value
963
964                self.jobAttrs[ job_id ] = jobinfo
965
966                self.jobs_processed.append( job_id )
967                   
968    def endDocument( self ):
969        """When all metrics have gone, check if any jobs have finished"""
970
971        jobs_finished = [ ]
972
973        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_processed))+" jobs" )
974
975        if self.heartbeat == 0:
976            return None
977
978        for jobid, jobinfo in self.jobAttrs.items():
979
980            if jobinfo['reported'] != self.heartbeat:
981
982                if (jobinfo['status'] != 'R'):
983                    debug_msg( 1, 'job %s report time %s does not match current heartbeat %s : ignoring job' %(jobid, jobinfo['reported'], self.heartbeat ) )
984                    del self.jobAttrs[ jobid ]
985
986                    if jobid in self.jobs_to_store:
987                        del self.jobs_to_store[ jobid ]
988
989                    continue
990
991                elif jobid not in self.jobs_processed:
992
993                    # Was running previous heartbeat but not anymore: must be finished
994                    self.jobAttrs[ jobid ]['status'] = 'F'
995                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
996                    debug_msg( 1, 'job %s appears to have finished' %jobid )
997
998                    jobs_finished.append( jobid )
999
1000                    if not jobid in self.jobs_to_store:
1001                        self.jobs_to_store.append( jobid )
1002
1003                    continue
1004
1005            elif self.jobAttrsSaved.has_key( jobid ):
1006
1007                # This should pretty much never happen, but hey let's be careful
1008                # Perhaps if someone altered their job while in queue with qalter
1009
1010                if self.jobinfoChanged( jobid, jobinfo ):
1011
1012                    self.jobAttrs[ jobid ]['stop_timestamp'] = ''
1013                    self.jobAttrs[ jobid ]                   = self.setJobAttrs( self.jobAttrs[ jobid ], jobinfo )
1014
1015                    if not jobid in self.jobs_to_store:
1016
1017                        self.jobs_to_store.append( jobid )
1018
1019                    debug_msg( 10, 'jobinfo for job %s has changed' %jobid )
1020            else:
1021                debug_msg( 1, 'new job %s' %jobid )
1022
1023                if not jobid in self.jobs_to_store:
1024
1025                    self.jobs_to_store.append( jobid )
1026
1027        debug_msg( 1, 'job_xml_thread(): Found %s updated jobs.' %len(self.jobs_to_store) )
1028
1029        failed_store = [ ]
1030        succes_store = [ ]
1031
1032        if len( self.jobs_to_store ) > 0:
1033
1034            debug_msg( 1, 'job_xml_thread(): Storing jobs to database..' )
1035
1036            for n in range( 0, len(self.jobs_to_store ) ):
1037
1038                if len( self.jobs_to_store ) == 0:
1039                    break
1040
1041                jobid = self.jobs_to_store.pop( 0 )
1042
1043                db_ok = self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
1044
1045                if not db_ok:
1046
1047                    self.ds.doRollback()
1048                    failed_store.append( jobid )
1049                    continue
1050
1051                self.ds.doCommit()
1052                succes_store.append( jobid )
1053
1054                if not jobid in jobs_finished:
1055
1056                    self.jobAttrsSaved[ jobid ] = self.jobAttrs[ jobid ]
1057
1058                elif self.jobAttrsSaved.has_key( jobid ):
1059
1060                    del self.jobAttrsSaved[ jobid ]
1061
1062                if self.jobAttrs[ jobid ]['status'] == 'F':
1063
1064                    del self.jobAttrs[ jobid ]
1065
1066            result_str = 'succesfully stored: %s jobs' %str(len(succes_store))
1067
1068            if len( failed_store ) > 0:
1069                result_str = result_str + ' - failed to store: %s jobs - deferred to next interval' %str(len(failed_store))
1070
1071            debug_msg( 1, 'job_xml_thread(): Done storing. %s' %result_str )
1072
1073        else:
1074            debug_msg( 1, 'job_xml_thread(): No jobs to store.' )
1075
1076        self.jobs_processed = [ ]
1077
1078        # TODO: once in while check database AND self.jobAttrsSaved for stale jobs
1079
1080    def setJobAttrs( self, old, new ):
1081        """
1082        Set new job attributes in old, but not lose existing fields
1083        if old attributes doesn't have those
1084        """
1085
1086        for valname, value in new.items():
1087            old[ valname ] = value
1088
1089        return old
1090       
1091
1092    def jobinfoChanged( self, jobid, jobinfo ):
1093        """
1094        Check if jobinfo has changed from jobattrs[jobid]
1095        if it's report time is bigger than previous one
1096        and it is report time is recent (equal to heartbeat)
1097        """
1098
1099        ignore_changes = [ 'reported' ]
1100
1101        if self.jobAttrsSaved.has_key( jobid ):
1102
1103            for valname, value in jobinfo.items():
1104
1105                if valname not in ignore_changes:
1106
1107                    if self.jobAttrsSaved[ jobid ].has_key( valname ):
1108
1109                        if value != self.jobAttrsSaved[ jobid ][ valname ]:
1110
1111                            if jobinfo['reported'] > self.jobAttrsSaved[ jobid ][ 'reported' ]:
1112
1113                                debug_msg( 1, "job %s field '%s' changed since saved from: %s to: %s" %( jobid, valname, value, self.jobAttrsSaved[ jobid ][ valname ] ) )
1114
1115                                return True
1116
1117                    else:
1118                        debug_msg( 1, "job %s did not have field '%s'" %( jobid, valname )  )
1119                        return True
1120
1121        return False
1122
1123class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
1124    """Parse Ganglia's XML"""
1125
1126    def __init__( self, config, datastore ):
1127        """Setup initial variables and gather info on existing rrd archive"""
1128
1129        self.config          = config
1130        self.clusters        = { }
1131        self.ds              = datastore
1132
1133        debug_msg( 1, 'Housekeeping: checking RRD archive (may take a while)..' )
1134        self.gatherClusters()
1135        debug_msg( 1, 'Housekeeping: RRD check complete.' )
1136
1137    def gatherClusters( self ):
1138        """Find all existing clusters in archive dir"""
1139
1140        archive_dir    = check_dir(ARCHIVE_PATH)
1141
1142        hosts        = [ ]
1143
1144        if os.path.exists( archive_dir ):
1145
1146            dirlist    = os.listdir( archive_dir )
1147
1148            for cfgcluster in ARCHIVE_DATASOURCES:
1149
1150                if cfgcluster not in dirlist:
1151
1152                    # Autocreate a directory for this cluster
1153                    # assume it is new
1154                    #
1155                    cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
1156
1157                    os.mkdir( cluster_dir )
1158
1159                    dirlist.append( cfgcluster )
1160
1161            for item in dirlist:
1162
1163                clustername = item
1164
1165                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
1166
1167                    self.clusters[ clustername ] = RRDHandler( self.config, clustername )
1168
1169        debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
1170
1171    def startElement( self, name, attrs ):
1172        """Memorize appropriate data from xml start tags"""
1173
1174        if name == 'GANGLIA_XML':
1175
1176            self.XMLSource      = str( attrs.get( 'SOURCE',  "" ) )
1177            self.gangliaVersion = str( attrs.get( 'VERSION', "" ) )
1178
1179            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
1180
1181        elif name == 'GRID':
1182
1183            self.gridName    = str( attrs.get( 'NAME', "" ) )
1184            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
1185
1186            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
1187
1188        elif name == 'CLUSTER':
1189
1190            self.clusterName = str( attrs.get( 'NAME',      "" ) )
1191            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
1192
1193            if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
1194
1195                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
1196
1197                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
1198
1199        elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
1200
1201            self.hostName     = str( attrs.get( 'NAME',     "" ) )
1202            self.hostIp       = str( attrs.get( 'IP',       "" ) )
1203            self.hostReported = str( attrs.get( 'REPORTED', "" ) )
1204
1205            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
1206
1207        elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
1208
1209            type = str( attrs.get( 'TYPE', "" ) )
1210           
1211            exclude_metric = False
1212           
1213            for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1214
1215                orig_name = str( attrs.get( 'NAME', "" ) )
1216
1217                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1218               
1219                    exclude_metric = True
1220
1221                elif re.match( ex_metricstr, orig_name ):
1222
1223                    exclude_metric = True
1224
1225            if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1226
1227                myMetric         = { }
1228                myMetric['name'] = str( attrs.get( 'NAME', "" ) )
1229                myMetric['val']  = str( attrs.get( 'VAL',  "" ) )
1230                myMetric['time'] = self.hostReported
1231
1232                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
1233
1234                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
1235
1236    def storeMetrics( self ):
1237        """Store metrics of each cluster rrd handler"""
1238
1239        for clustername, rrdh in self.clusters.items():
1240
1241            ret = rrdh.storeMetrics()
1242
1243            if ret:
1244                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1245                return 1
1246
1247        return 0
1248
1249class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1250
1251    def error( self, exception ):
1252        """Recoverable error"""
1253
1254        debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
1255
1256    def fatalError( self, exception ):
1257        """Non-recoverable error"""
1258
1259        exception_str = str( exception )
1260
1261        # Ignore 'no element found' errors
1262        if exception_str.find( 'no element found' ) != -1:
1263            debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1264            return 0
1265
1266        debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1267        sys.exit( 1 )
1268
1269    def warning( self, exception ):
1270        """Warning"""
1271
1272        debug_msg( 0, 'Warning ' + str( exception ) )
1273
1274class XMLGatherer:
1275    """Setup a connection and file object to Ganglia's XML"""
1276
1277    s           = None
1278    fd          = None
1279    data        = None
1280    slot        = None
1281
1282    # Time since the last update
1283    #
1284    LAST_UPDATE    = 0
1285
1286    # Minimum interval between updates
1287    #
1288    MIN_UPDATE_INT    = 10
1289
1290    # Is a update occuring now
1291    #
1292    update_now    = False
1293
1294    def __init__( self, host, port ):
1295        """Store host and port for connection"""
1296
1297        self.host    = host
1298        self.port    = port
1299        self.slot    = threading.Lock()
1300
1301        self.retrieveData()
1302
1303    def retrieveData( self ):
1304        """Setup connection to XML source"""
1305
1306        self.update_now = True
1307
1308        self.slot.acquire()
1309
1310        self.data       = None
1311
1312        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
1313
1314            af, socktype, proto, canonname, sa = res
1315
1316            try:
1317
1318                self.s = socket.socket( af, socktype, proto )
1319
1320            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1321
1322                self.s = None
1323                continue
1324
1325            try:
1326
1327                self.s.connect( sa )
1328
1329            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1330
1331                self.disconnect()
1332                continue
1333
1334            break
1335
1336        if self.s is None:
1337
1338            debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1339            self.update_now    = False
1340            #sys.exit( 1 )
1341
1342        else:
1343            #self.s.send( '\n' )
1344
1345            my_fp            = self.s.makefile( 'r' )
1346            my_data          = my_fp.readlines()
1347            my_data          = string.join( my_data, '' )
1348
1349            self.data        = my_data
1350
1351            self.LAST_UPDATE = time.time()
1352
1353        self.slot.release()
1354
1355        self.update_now    = False
1356
1357    def disconnect( self ):
1358        """Close socket"""
1359
1360        if self.s:
1361            #self.s.shutdown( 2 )
1362            self.s.close()
1363            self.s = None
1364
1365    def __del__( self ):
1366        """Kill the socket before we leave"""
1367
1368        self.disconnect()
1369
1370    def reGetData( self ):
1371        """Reconnect"""
1372
1373        while self.update_now:
1374
1375            # Must be another update in progress:
1376            # Wait until the update is complete
1377            #
1378            time.sleep( 1 )
1379
1380        if self.s:
1381            self.disconnect()
1382
1383        self.retrieveData()
1384
1385    def getData( self ):
1386
1387        """Return the XML data"""
1388
1389        # If more than MIN_UPDATE_INT seconds passed since last data update
1390        # update the XML first before returning it
1391        #
1392
1393        cur_time    = time.time()
1394
1395        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1396
1397            self.reGetData()
1398
1399        while self.update_now:
1400
1401            # Must be another update in progress:
1402            # Wait until the update is complete
1403            #
1404            time.sleep( 1 )
1405           
1406        return self.data
1407
1408    def makeFileDescriptor( self ):
1409        """Make file descriptor that points to our socket connection"""
1410
1411        self.reconnect()
1412
1413        if self.s:
1414            self.fd = self.s.makefile( 'r' )
1415
1416    def getFileObject( self ):
1417        """Connect, and return a file object"""
1418
1419        self.makeFileDescriptor()
1420
1421        if self.fd:
1422            return self.fd
1423
1424class GangliaXMLProcessor( XMLProcessor ):
1425    """Main class for processing XML and acting with it"""
1426
1427    def __init__( self, XMLSource, DataStore ):
1428        """Setup initial XML connection and handlers"""
1429
1430        self.config       = GangliaConfigParser( GMETAD_CONF )
1431        self.myXMLSource  = XMLSource
1432        self.ds           = DataStore
1433        self.myXMLHandler = GangliaXMLHandler( self.config, self.ds )
1434        self.myXMLError   = XMLErrorHandler()
1435
1436    def run( self ):
1437        """Main XML processing; start a xml and storethread"""
1438
1439        xml_thread   = threading.Thread( None, self.processXML,   'xmlthread' )
1440        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1441
1442        while( 1 ):
1443
1444            if not xml_thread.isAlive():
1445                # Gather XML at the same interval as gmetad
1446
1447                # threaded call to: self.processXML()
1448                #
1449                try:
1450                    xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1451                    xml_thread.start()
1452                except thread.error, msg:
1453                    debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1454                    #return 1
1455
1456            if not store_thread.isAlive():
1457                # Store metrics every .. sec
1458
1459                # threaded call to: self.storeMetrics()
1460                #
1461                try:
1462                    store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1463                    store_thread.start()
1464                except thread.error, msg:
1465                    debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1466                    #return 1
1467       
1468            # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1469            time.sleep( 1 )   
1470
1471    def storeMetrics( self ):
1472        """Store metrics retained in memory to disk"""
1473
1474        global DEBUG_LEVEL
1475
1476        # Store metrics somewhere between every 360 and 640 seconds
1477        #
1478        if DEBUG_LEVEL >= 1:
1479            STORE_INTERVAL = 60
1480        else:
1481            STORE_INTERVAL = random.randint( 300, 600 )
1482
1483        try:
1484            store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1485            store_metric_thread.start()
1486        except thread.error, msg:
1487            debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1488            return 1
1489
1490        debug_msg( 1, 'ganglia_store_thread(): started.' )
1491
1492        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1493        time.sleep( STORE_INTERVAL )
1494        debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
1495
1496        if store_metric_thread.isAlive():
1497
1498            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
1499            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1500
1501            if store_metric_thread.isAlive():
1502
1503                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() still running :( now what?' )
1504            else:
1505                debug_msg( 1, 'ganglia_store_thread(): Done waiting: storemetricthread() has finished' )
1506
1507        debug_msg( 1, 'ganglia_store_thread(): finished.' )
1508
1509        return 0
1510
1511    def storeThread( self ):
1512        """Actual metric storing thread"""
1513
1514        debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1515        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
1516
1517        ret = self.myXMLHandler.storeMetrics()
1518        if ret > 0:
1519            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
1520
1521        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1522        debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1523       
1524        return 0
1525
1526    def processXML( self ):
1527        """Process XML"""
1528
1529        try:
1530            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1531            parsethread.start()
1532        except thread.error, msg:
1533            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1534            return 1
1535
1536        debug_msg( 1, 'ganglia_xml_thread(): started.' )
1537
1538        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1539        time.sleep( float( self.config.getLowestInterval() ) )   
1540        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
1541
1542        if parsethread.isAlive():
1543
1544            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
1545            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1546
1547            if parsethread.isAlive():
1548                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() still running :( now what?' )
1549            else:
1550                debug_msg( 1, 'ganglia_xml_thread(): Done waiting: parsethread() finished' )
1551
1552        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
1553
1554        return 0
1555
1556    def parseThread( self ):
1557        """Actual parsing thread"""
1558
1559        debug_msg( 1, 'ganglia_parse_thread(): started.' )
1560        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
1561       
1562        my_data    = self.myXMLSource.getData()
1563
1564        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving: data size %d' %len(my_data) )
1565
1566        if my_data:
1567            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1568            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1569            debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1570
1571        debug_msg( 1, 'ganglia_parse_thread(): finished.' )
1572
1573        return 0
1574
1575class GangliaConfigParser:
1576
1577    sources = [ ]
1578
1579    def __init__( self, config ):
1580        """Parse some stuff from our gmetad's config, such as polling interval"""
1581
1582        self.config = config
1583        self.parseValues()
1584
1585    def parseValues( self ):
1586        """Parse certain values from gmetad.conf"""
1587
1588        readcfg = open( self.config, 'r' )
1589
1590        for line in readcfg.readlines():
1591
1592            if line.count( '"' ) > 1:
1593
1594                if line.find( 'data_source' ) != -1 and line[0] != '#':
1595
1596                    source                = { }
1597                    source['name']        = line.split( '"' )[1]
1598                    source_value_words    = line.split( '"' )[2].split( ' ' )
1599
1600                    check_interval        = source_value_words[0]
1601
1602                    try:
1603
1604                        source['interval'] = int( check_interval )
1605                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], str( source['interval'] ) ) )
1606                    except ValueError:
1607
1608                        source['interval'] = 15
1609                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1610
1611                    self.sources.append( source )
1612
1613        readcfg.close()
1614
1615    def clusterExists( self, source_name ):
1616
1617        for source in self.sources:
1618
1619            if source['name'] == source_name:
1620
1621                return True
1622
1623        return False
1624
1625    def getInterval( self, source_name ):
1626        """Return interval for source_name"""
1627
1628        for source in self.sources:
1629
1630            if source['name'] == source_name:
1631
1632                return source['interval']
1633
1634        return None
1635
1636    def getLowestInterval( self ):
1637        """Return the lowest interval of all clusters"""
1638
1639        lowest_interval = 0
1640
1641        for source in self.sources:
1642
1643            if not lowest_interval or source['interval'] <= lowest_interval:
1644
1645                lowest_interval = source['interval']
1646
1647        # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1648        if lowest_interval:
1649            return lowest_interval
1650        else:
1651            return 15
1652
1653class RRDHandler:
1654    """Class for handling RRD activity"""
1655
1656    myMetrics   = { }
1657    lastStored  = { }
1658    timeserials = { }
1659    slot = None
1660
1661    def __init__( self, config, cluster ):
1662        """Setup initial variables"""
1663
1664        global MODRRDTOOL
1665
1666        self.block   = 0
1667        self.cluster = cluster
1668        self.config  = config
1669        self.slot    = threading.Lock()
1670
1671        if MODRRDTOOL:
1672
1673            self.rrdm    = RRDMutator()
1674        else:
1675            self.rrdm    = RRDMutator( RRDTOOL )
1676
1677        global DEBUG_LEVEL
1678
1679        if DEBUG_LEVEL <= 2:
1680            self.gatherLastUpdates()
1681
1682    def gatherLastUpdates( self ):
1683        """Populate the lastStored list, containing timestamps of all last updates"""
1684
1685        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1686
1687        hosts = [ ]
1688
1689        if os.path.exists( cluster_dir ):
1690
1691            dirlist = os.listdir( cluster_dir )
1692
1693            for dir in dirlist:
1694
1695                hosts.append( dir )
1696
1697        for host in hosts:
1698
1699            host_dir    = cluster_dir + '/' + host
1700            dirlist     = os.listdir( host_dir )
1701
1702            for dir in dirlist:
1703
1704                if not self.timeserials.has_key( host ):
1705
1706                    self.timeserials[ host ] = [ ]
1707
1708                self.timeserials[ host ].append( dir )
1709
1710            last_serial = self.getLastRrdTimeSerial( host )
1711
1712            if last_serial:
1713
1714                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1715
1716                if os.path.exists( metric_dir ):
1717
1718                    dirlist = os.listdir( metric_dir )
1719
1720                    for file in dirlist:
1721
1722                        metricname = file.split( '.rrd' )[0]
1723
1724                        if not self.lastStored.has_key( host ):
1725
1726                            self.lastStored[ host ] = { }
1727
1728                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1729
1730    def getClusterName( self ):
1731        """Return clustername"""
1732
1733        return self.cluster
1734
1735    def memMetric( self, host, metric ):
1736        """Store metric from host in memory"""
1737
1738        # <ATOMIC>
1739        #
1740        self.slot.acquire()
1741       
1742        if self.myMetrics.has_key( host ):
1743
1744            if self.myMetrics[ host ].has_key( metric['name'] ):
1745
1746                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1747
1748                    if mymetric['time'] == metric['time']:
1749
1750                        # Allready have this metric, abort
1751                        self.slot.release()
1752                        return 1
1753            else:
1754                self.myMetrics[ host ][ metric['name'] ] = [ ]
1755        else:
1756            self.myMetrics[ host ]                   = { }
1757            self.myMetrics[ host ][ metric['name'] ] = [ ]
1758
1759        # Push new metric onto stack
1760        # atomic code; only 1 thread at a time may access the stack
1761
1762        self.myMetrics[ host ][ metric['name'] ].append( metric )
1763
1764        self.slot.release()
1765        #
1766        # </ATOMIC>
1767
1768    def makeUpdateList( self, host, metriclist ):
1769        """
1770        Make a list of update values for rrdupdate
1771        but only those that we didn't store before
1772        """
1773
1774        update_list    = [ ]
1775        metric        = None
1776
1777        while len( metriclist ) > 0:
1778
1779            metric = metriclist.pop( 0 )
1780
1781            if self.checkStoreMetric( host, metric ):
1782
1783                u_val    = str( metric['time'] ) + ':' + str( metric['val'] )
1784                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1785                update_list.append( u_val )
1786
1787        return update_list
1788
1789    def checkStoreMetric( self, host, metric ):
1790        """Check if supplied metric if newer than last one stored"""
1791
1792        if self.lastStored.has_key( host ):
1793
1794            if self.lastStored[ host ].has_key( metric['name'] ):
1795
1796                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1797
1798                    # This is old
1799                    return 0
1800
1801        return 1
1802
1803    def memLastUpdate( self, host, metricname, metriclist ):
1804        """
1805        Memorize the time of the latest metric from metriclist
1806        but only if it wasn't allready memorized
1807        """
1808
1809        if not self.lastStored.has_key( host ):
1810            self.lastStored[ host ] = { }
1811
1812        last_update_time = 0
1813
1814        for metric in metriclist:
1815
1816            if metric['name'] == metricname:
1817
1818                if metric['time'] > last_update_time:
1819
1820                    last_update_time = metric['time']
1821
1822        if self.lastStored[ host ].has_key( metricname ):
1823           
1824            if last_update_time <= self.lastStored[ host ][ metricname ]:
1825                return 1
1826
1827        self.lastStored[ host ][ metricname ] = last_update_time
1828
1829    def storeMetrics( self ):
1830        """
1831        Store all metrics from memory to disk
1832        and do it to the RRD's in appropriate timeperiod directory
1833        """
1834
1835        debug_msg( 5, "Entering storeMetrics()")
1836
1837        count_values  = 0
1838        count_metrics = 0
1839        count_bits    = 0
1840
1841        for hostname, mymetrics in self.myMetrics.items():   
1842
1843            for metricname, mymetric in mymetrics.items():
1844
1845                count_metrics += 1
1846
1847                for dmetric in mymetric:
1848
1849                    count_values += 1
1850
1851                    count_bits   += len( dmetric['time'] )
1852                    count_bits   += len( dmetric['val'] )
1853
1854        count_bytes    = count_bits / 8
1855
1856        debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1857            str( len( self.myMetrics.keys() ) ) + " hosts " + 
1858            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1859            str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1860
1861        for hostname, mymetrics in self.myMetrics.items():   
1862
1863            for metricname, mymetric in mymetrics.items():
1864
1865                metrics_to_store = [ ]
1866
1867                # Pop metrics from stack for storing until none is left
1868                # atomic code: only 1 thread at a time may access myMetrics
1869
1870                # <ATOMIC>
1871                #
1872                self.slot.acquire() 
1873
1874                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1875
1876                    if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1877
1878                        try:
1879                            metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1880                        except IndexError, msg:
1881
1882                            # Somehow sometimes myMetrics[ hostname ][ metricname ]
1883                            # is still len 0 when the statement is executed.
1884                            # Just ignore indexerror's..
1885                            pass
1886
1887                self.slot.release()
1888                #
1889                # </ATOMIC>
1890
1891                # Create a mapping table, each metric to the period where it should be stored
1892                #
1893                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1894
1895                update_rets = [ ]
1896
1897                for period, pmetric in metric_serial_table.items():
1898
1899                    create_ret = self.createCheck( hostname, metricname, period )   
1900
1901                    update_ret = self.update( hostname, metricname, period, pmetric )
1902
1903                    if update_ret == 0:
1904
1905                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1906                    else:
1907                        debug_msg( 9, 'metric update failed' )
1908
1909                    update_rets.append( create_ret )
1910                    update_rets.append( update_ret )
1911
1912                # Lets ignore errors here for now, we need to make sure last update time
1913                # is correct!
1914                #
1915                #if not (1) in update_rets:
1916
1917                self.memLastUpdate( hostname, metricname, metrics_to_store )
1918
1919        debug_msg( 5, "Leaving storeMetrics()")
1920
1921    def makeTimeSerial( self ):
1922        """Generate a time serial. Seconds since epoch"""
1923
1924        # Seconds since epoch
1925        mytime = int( time.time() )
1926
1927        return mytime
1928
1929    def makeRrdPath( self, host, metricname, timeserial ):
1930        """Make a RRD location/path and filename"""
1931
1932        rrd_dir  = '%s/%s/%s/%s'    %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1933        rrd_file = '%s/%s.rrd'    %( rrd_dir, metricname )
1934
1935        return rrd_dir, rrd_file
1936
1937    def getLastRrdTimeSerial( self, host ):
1938        """Find the last timeserial (directory) for this host"""
1939
1940        newest_timeserial = 0
1941
1942        for dir in self.timeserials[ host ]:
1943
1944            valid_dir = 1
1945
1946            for letter in dir:
1947                if letter not in string.digits:
1948                    valid_dir = 0
1949
1950            if valid_dir:
1951                timeserial = dir
1952                if timeserial > newest_timeserial:
1953                    newest_timeserial = timeserial
1954
1955        if newest_timeserial:
1956            return newest_timeserial
1957        else:
1958            return 0
1959
1960    def determinePeriod( self, host, check_serial ):
1961        """Determine to which period (directory) this time(serial) belongs"""
1962
1963        period_serial = 0
1964
1965        if self.timeserials.has_key( host ):
1966
1967            for serial in self.timeserials[ host ]:
1968
1969                if check_serial >= serial and period_serial < serial:
1970
1971                    period_serial = serial
1972
1973        return period_serial
1974
1975    def determineSerials( self, host, metricname, metriclist ):
1976        """
1977        Determine the correct serial and corresponding rrd to store
1978        for a list of metrics
1979        """
1980
1981        metric_serial_table = { }
1982
1983        for metric in metriclist:
1984
1985            if metric['name'] == metricname:
1986
1987                period       = self.determinePeriod( host, metric['time'] )   
1988
1989                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1990
1991                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1992
1993                    # This one should get it's own new period
1994                    period = metric['time']
1995
1996                    if not self.timeserials.has_key( host ):
1997                        self.timeserials[ host ] = [ ]
1998
1999                    self.timeserials[ host ].append( period )
2000
2001                if not metric_serial_table.has_key( period ):
2002
2003                    metric_serial_table[ period ] = [ ]
2004
2005                metric_serial_table[ period ].append( metric )
2006
2007        return metric_serial_table
2008
2009    def createCheck( self, host, metricname, timeserial ):
2010        """Check if an rrd allready exists for this metric, create if not"""
2011
2012        debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
2013       
2014        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
2015
2016        if not os.path.exists( rrd_dir ):
2017
2018            try:
2019                os.makedirs( rrd_dir )
2020
2021            except os.OSError, msg:
2022
2023                if msg.find( 'File exists' ) != -1:
2024
2025                    # Ignore exists errors
2026                    pass
2027
2028                else:
2029
2030                    print msg
2031                    return
2032
2033            debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
2034
2035        if not os.path.exists( rrd_file ):
2036
2037            interval     = self.config.getInterval( self.cluster )
2038            heartbeat    = 8 * int( interval )
2039
2040            params       = [ ]
2041
2042            params.append( '--step' )
2043            params.append( str( interval ) )
2044
2045            params.append( '--start' )
2046            params.append( str( int( timeserial ) - 1 ) )
2047
2048            params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
2049            params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
2050
2051            self.rrdm.create( str(rrd_file), params )
2052
2053            debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
2054
2055    def update( self, host, metricname, timeserial, metriclist ):
2056        """
2057        Update rrd file for host with metricname
2058        in directory timeserial with metriclist
2059        """
2060
2061        debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
2062
2063        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
2064
2065        update_list       = self.makeUpdateList( host, metriclist )
2066
2067        if len( update_list ) > 0:
2068            ret = self.rrdm.update( str(rrd_file), update_list )
2069
2070            if ret:
2071                return 1
2072       
2073            debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
2074
2075        return 0
2076
2077def daemon():
2078    """daemonized threading"""
2079
2080    # Fork the first child
2081    #
2082    pid = os.fork()
2083
2084    if pid > 0:
2085
2086        sys.exit(0)  # end parent
2087
2088    # creates a session and sets the process group ID
2089    #
2090    os.setsid()
2091
2092    # Fork the second child
2093    #
2094    pid = os.fork()
2095
2096    if pid > 0:
2097
2098        sys.exit(0)  # end parent
2099
2100    write_pidfile()
2101
2102    # Go to the root directory and set the umask
2103    #
2104    os.chdir('/')
2105    os.umask(0)
2106
2107    sys.stdin.close()
2108    sys.stdout.close()
2109    sys.stderr.close()
2110
2111    os.open('/dev/null', os.O_RDWR)
2112    os.dup2(0, 1)
2113    os.dup2(0, 2)
2114
2115    run()
2116
2117def run():
2118    """Threading start"""
2119
2120    global ARCHIVE_DATASOURCES
2121
2122    config             = GangliaConfigParser( GMETAD_CONF )
2123
2124    for ds in ARCHIVE_DATASOURCES:
2125
2126        if not config.clusterExists( ds ):
2127
2128            print "FATAL ERROR: Data source with name '%s' not found in %s" %( ds, GMETAD_CONF )
2129            sys.exit( 1 )
2130
2131    s_timeout          = int( config.getLowestInterval() - 1 )
2132
2133    socket.setdefaulttimeout( s_timeout )
2134
2135    myXMLSource        = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
2136    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
2137
2138    myJobProcessor     = JobXMLProcessor( myXMLSource, myDataStore )
2139    myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore )
2140
2141    try:
2142        job_xml_thread     = threading.Thread( None, myJobProcessor.run, 'job_proc_thread' )
2143        ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
2144
2145        job_xml_thread.start()
2146        ganglia_xml_thread.start()
2147       
2148    except thread.error, msg:
2149        debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
2150        syslog.closelog()
2151        sys.exit(1)
2152       
2153    debug_msg( 0, 'main threading started.' )
2154
2155def main():
2156    """Program startup"""
2157
2158    global DAEMONIZE, USE_SYSLOG
2159
2160    if not processArgs( sys.argv[1:] ):
2161        sys.exit( 1 )
2162
2163    if( DAEMONIZE and USE_SYSLOG ):
2164        syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2165
2166    if DAEMONIZE:
2167        daemon()
2168    else:
2169        run()
2170
2171#
2172# Global functions
2173#
2174
2175def check_dir( directory ):
2176    """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
2177
2178    if directory[-1] == '/':
2179        directory = directory[:-1]
2180
2181    return directory
2182
2183def reqtime2epoch( rtime ):
2184
2185    (hours, minutes, seconds )    = rtime.split( ':' )
2186
2187    etime    = int(seconds)
2188    etime    = etime + ( int(minutes) * 60 )
2189    etime    = etime + ( int(hours) * 60 * 60 )
2190
2191    return etime
2192
2193def debug_msg( level, msg ):
2194    """Only print msg if correct levels"""
2195
2196    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2197        sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2198   
2199    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2200        syslog.syslog( msg )
2201
2202def printTime( ):
2203    """Print current time in human readable format"""
2204
2205    return time.strftime("%a %d %b %Y %H:%M:%S")
2206
2207def write_pidfile():
2208
2209    # Write pidfile if PIDFILE exists
2210    if PIDFILE:
2211
2212        pid     = os.getpid()
2213
2214        pidfile = open(PIDFILE, 'w')
2215
2216        pidfile.write( str( pid ) )
2217        pidfile.close()
2218
2219# Ooohh, someone started me! Let's go..
2220#
2221if __name__ == '__main__':
2222    main()
Note: See TracBrowser for help on using the repository browser.