source: branches/0.4/jobarchived/jobarchived.py @ 773

Last change on this file since 773 was 773, checked in by ramonb, 11 years ago
  • cleanup and fixes
  • use psycopg2 now
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 57.8 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 773 2013-03-29 11:40:39Z ramonb $
22#
23
24import getopt, syslog, ConfigParser, sys
25
26VERSION='0.4+SVN'
27
28def usage( ver ):
29
30    print 'jobarchived %s' %VERSION
31
32    if ver:
33        return 0
34
35    print
36    print 'Purpose:'
37    print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
38    print '  and node statistics in a RRD archive'
39    print
40    print 'Usage:    jobarchived [OPTIONS]'
41    print
42    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobarchived.conf)'
43    print '  -p, --pidfile=FILE    Use pid file to store the process id'
44    print '  -h, --help        Print help and exit'
45    print '  -v, --version        Print version and exit'
46    print
47
48def processArgs( args ):
49
50    SHORT_L    = 'p:hvc:'
51    LONG_L    = [ 'help', 'config=', 'pidfile=', 'version' ]
52
53    config_filename = '/etc/jobarchived.conf'
54
55    global PIDFILE
56
57    PIDFILE    = None
58
59    try:
60
61        opts, args = getopt.getopt( args, SHORT_L, LONG_L )
62
63    except getopt.error, detail:
64
65        print detail
66        sys.exit(1)
67
68    for opt, value in opts:
69
70        if opt in [ '--config', '-c' ]:
71
72            config_filename = value
73
74        if opt in [ '--pidfile', '-p' ]:
75
76            PIDFILE         = value
77
78        if opt in [ '--help', '-h' ]:
79
80            usage( False )
81            sys.exit( 0 )
82
83        if opt in [ '--version', '-v' ]:
84
85            usage( True )
86            sys.exit( 0 )
87
88    try:
89        return loadConfig( config_filename )
90
91    except ConfigParser.NoOptionError, detail:
92
93        print detail
94        sys.exit( 1 )
95
96def loadConfig( filename ):
97
98    def getlist( cfg_string ):
99
100        my_list = [ ]
101
102        for item_txt in cfg_string.split( ',' ):
103
104            sep_char = None
105
106            item_txt = item_txt.strip()
107
108            for s_char in [ "'", '"' ]:
109
110                if item_txt.find( s_char ) != -1:
111
112                    if item_txt.count( s_char ) != 2:
113
114                        print 'Missing quote: %s' %item_txt
115                        sys.exit( 1 )
116
117                    else:
118
119                        sep_char = s_char
120                        break
121
122            if sep_char:
123
124                item_txt = item_txt.split( sep_char )[1]
125
126            my_list.append( item_txt )
127
128        return my_list
129
130    cfg = ConfigParser.ConfigParser()
131
132    cfg.read( filename )
133
134    global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
135    global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
136    global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
137
138    ARCHIVE_PATH        = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
139
140    ARCHIVE_HOURS_PER_RRD    = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
141
142    DEBUG_LEVEL        = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
143
144    USE_SYSLOG        = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
145
146    SYSLOG_LEVEL        = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
147
148    MODRRDTOOL        = False
149
150    try:
151        global rrdtool
152        import rrdtool
153
154        MODRRDTOOL        = True
155
156    except ImportError:
157
158        MODRRDTOOL        = False
159
160        print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!"
161
162        RRDTOOL            = cfg.get( 'DEFAULT', 'RRDTOOL' )
163
164    try:
165
166        SYSLOG_FACILITY    = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
167
168    except AttributeError, detail:
169
170        print 'Unknown syslog facility'
171        sys.exit( 1 )
172
173    GMETAD_CONF        = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
174
175    ARCHIVE_XMLSOURCE    = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
176
177    ARCHIVE_DATASOURCES    = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
178
179    ARCHIVE_EXCLUDE_METRICS    = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
180
181    JOB_SQL_DBASE        = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
182
183    JOB_TIMEOUT        = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
184
185    DAEMONIZE        = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
186
187
188    return True
189
190# What XML data types not to store
191#
192UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
193
194# Maximum time (in seconds) a parsethread may run
195#
196PARSE_TIMEOUT = 60
197
198# Maximum time (in seconds) a storethread may run
199#
200STORE_TIMEOUT = 360
201
202"""
203The Job Archiving Daemon
204"""
205
206from types import *
207
208import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
209
210try:
211    import psycopg2
212
213except ImportError, details:
214
215    print "FATAL ERROR: psycopg2 python module not found"
216    sys.exit( 1 )
217
218class InitVars:
219        Vars = {}
220       
221        def __init__(self, **key_arg):
222                for (key, value) in key_arg.items():
223                        if value:
224                                self.Vars[key] = value
225                        else:   
226                                self.Vars[key] = None
227                               
228        def __call__(self, *key):
229                key = "%s" % key
230                return self.Vars[key]
231               
232        def __getitem__(self, key):
233                return self.Vars[key]
234               
235        def __repr__(self):
236                return repr(self.Vars)
237               
238        def keys(self):
239                barf =  map(None, self.Vars.keys())
240                return barf
241               
242        def values(self):
243                barf =  map(None, self.Vars.values())
244                return barf
245               
246        def has_key(self, key):
247                if self.Vars.has_key(key):
248                        return 1
249                else:   
250                        return 0
251                       
252class DBError(Exception):
253        def __init__(self, msg=''):
254                self.msg = msg
255                Exception.__init__(self, msg)
256        def __repr__(self):
257                return self.msg
258        __str__ = __repr__
259
260#
261# Class to connect to a database
262# and return the queury in a list or dictionairy.
263#
264class DB:
265    def __init__(self, db_vars):
266
267        self.dict = db_vars
268
269        if self.dict.has_key('User'):
270            self.user = self.dict['User']
271        else:
272            self.user = 'postgres'
273
274        if self.dict.has_key('Host'):
275            self.host = self.dict['Host']
276        else:
277            self.host = 'localhost'
278
279        if self.dict.has_key('Password'):
280            self.passwd = self.dict['Password']
281        else:
282            self.passwd = ''
283
284        if self.dict.has_key('DataBaseName'):
285            self.db = self.dict['DataBaseName']
286        else:
287            self.db = 'jobarchive'
288
289        # connect_string = 'host:port:database:user:password:
290        dsn = "host='%s' dbname='%s' user='%s' password='%s'" %(self.host, self.db, self.user, self.passwd)
291
292        try:
293            self.SQL = psycopg2.connect(dsn)
294        except psycopg2.Error, details:
295            str = "%s" %details
296            raise DBError(str)
297
298    def __repr__(self):
299        return repr(self.result)
300
301    def __nonzero__(self):
302        return not(self.result == None)
303
304    def __len__(self):
305        return len(self.result)
306
307    def __getitem__(self,i):
308        return self.result[i]
309
310    def __getslice__(self,i,j):
311        return self.result[i:j]
312
313    def Get(self, q_str):
314        c = self.SQL.cursor()
315        try:
316            c.execute(q_str)
317            result = c.fetchall()
318        except psycopg2.Error, details:
319            c.close()
320            str = "%s" %details
321            raise DBError(str)
322
323        c.close()
324        return result
325
326    def Set(self, q_str):
327        c = self.SQL.cursor()
328        try:
329            c.execute(q_str)
330            result = c.oidValue
331
332        except psycopg2.Error, details:
333            c.close()
334            str = "%s" %details
335            raise DBError(str)
336
337        c.close()
338        return result
339
340    def Commit(self):
341        self.SQL.commit()
342
343class DataSQLStore:
344
345    db_vars = None
346    dbc = None
347
348    def __init__( self, hostname, database ):
349
350        self.db_vars = InitVars(DataBaseName=database,
351                User='root',
352                Host=hostname,
353                Password='',
354                Dictionary='true')
355
356        try:
357            self.dbc     = DB(self.db_vars)
358        except DBError, details:
359            debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
360            sys.exit(1)
361
362    def setDatabase(self, statement):
363        ret = self.doDatabase('set', statement)
364        return ret
365       
366    def getDatabase(self, statement):
367        ret = self.doDatabase('get', statement)
368        return ret
369
370    def doDatabase(self, type, statement):
371
372        debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
373        try:
374            if type == 'set':
375                result = self.dbc.Set( statement )
376                self.dbc.Commit()
377            elif type == 'get':
378                result = self.dbc.Get( statement )
379               
380        except DBError, detail:
381            operation = statement.split(' ')[0]
382            debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
383            sys.exit(1)
384
385        debug_msg( 10, 'doDatabase(): result: %s' %(result) )
386        return result
387
388    def getJobNodeId( self, job_id, node_id ):
389
390        id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
391        if len( id ) > 0:
392
393            if len( id[0] ) > 0 and id[0] != '':
394           
395                return 1
396
397        return 0
398
399    def getNodeId( self, hostname ):
400
401        id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
402
403        if len( id ) > 0:
404
405            id = id[0][0]
406
407            return id
408        else:
409            return None
410
411    def getNodeIds( self, hostnames ):
412
413        ids = [ ]
414
415        for node in hostnames:
416
417            id = self.getNodeId( node )
418
419            if id:
420                ids.append( id )
421
422        return ids
423
424    def getJobId( self, jobid ):
425
426        id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
427
428        if id:
429            id = id[0][0]
430
431            return id
432        else:
433            return None
434
435    def addJob( self, job_id, jobattrs ):
436
437        if not self.getJobId( job_id ):
438
439            self.mutateJob( 'insert', job_id, jobattrs ) 
440        else:
441            self.mutateJob( 'update', job_id, jobattrs )
442
443    def mutateJob( self, action, job_id, jobattrs ):
444
445        job_values    = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
446
447        insert_col_str    = 'job_id'
448        insert_val_str    = "'%s'" %job_id
449        update_str    = None
450
451        debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
452
453        ids = [ ]
454
455        for valname, value in jobattrs.items():
456
457            if valname in job_values and value != '':
458
459                column_name = 'job_' + valname
460
461                if action == 'insert':
462
463                    if not insert_col_str:
464                        insert_col_str = column_name
465                    else:
466                        insert_col_str = insert_col_str + ',' + column_name
467
468                    if not insert_val_str:
469                        insert_val_str = value
470                    else:
471                        insert_val_str = insert_val_str + ",'%s'" %value
472
473                elif action == 'update':
474                   
475                    if not update_str:
476                        update_str = "%s='%s'" %(column_name, value)
477                    else:
478                        update_str = update_str + ",%s='%s'" %(column_name, value)
479
480            elif valname == 'nodes' and value:
481
482                node_valid = 1
483
484                if len(value) == 1:
485               
486                    if jobattrs['status'] == 'Q':
487
488                        node_valid = 0
489
490                    else:
491
492                        node_valid = 0
493
494                        for node_char in str(value[0]):
495
496                            if string.find( string.digits, node_char ) != -1 and not node_valid:
497
498                                node_valid = 1
499
500                if node_valid:
501
502                    ids = self.addNodes( value, jobattrs['domain'] )
503
504        if action == 'insert':
505
506            self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
507
508        elif action == 'update':
509
510            self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
511
512        if len( ids ) > 0:
513            self.addJobNodes( job_id, ids )
514
515    def addNodes( self, hostnames, domain ):
516
517        ids = [ ]
518
519        for node in hostnames:
520
521            node    = '%s.%s' %( node, domain )
522            id    = self.getNodeId( node )
523   
524            if not id:
525                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
526                id = self.getNodeId( node )
527
528            ids.append( id )
529
530        return ids
531
532    def addJobNodes( self, jobid, nodes ):
533
534        for node in nodes:
535
536            if not self.getJobNodeId( jobid, node ):
537
538                self.addJobNode( jobid, node )
539
540    def addJobNode( self, jobid, nodeid ):
541
542        self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
543
544    def storeJobInfo( self, jobid, jobattrs ):
545
546        self.addJob( jobid, jobattrs )
547
548    def checkStaleJobs( self ):
549
550        # Locate all jobs in the database that are not set to finished
551        #
552        q = "SELECT * from jobs WHERE job_status != 'F'"
553
554        r = self.getDatabase( q )
555
556        if len( r ) == 0:
557
558            return None
559
560        cleanjobs    = [ ]
561        timeoutjobs    = [ ]
562
563        jobtimeout_sec    = JOB_TIMEOUT * (60 * 60)
564        cur_time    = time.time()
565
566        for row in r:
567
568            job_id            = row[0]
569            job_requested_time    = row[4]
570            job_status        = row[7]
571            job_start_timestamp    = row[8]
572
573            # If it was set to queued and we didn't see it started
574            # there's not point in keeping it around
575            #
576            if job_status == 'Q' or not job_start_timestamp:
577
578                cleanjobs.append( job_id )
579
580            else:
581
582                start_timestamp = int( job_start_timestamp )
583
584                # If it was set to running longer than JOB_TIMEOUT
585                # close the job: it probably finished while we were not running
586                #
587                if ( cur_time - start_timestamp ) > jobtimeout_sec:
588
589                    if job_requested_time:
590
591                        rtime_epoch    = reqtime2epoch( job_requested_time )
592                    else:
593                        rtime_epoch    = None
594                   
595                    timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
596
597        debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
598
599        # Purge these from database
600        #
601        for j in cleanjobs:
602
603            q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
604            self.setDatabase( q )
605
606        debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
607
608        # Close these jobs in the database
609        # update the stop_timestamp to: start_timestamp + requested wallclock
610        # and set state: finished
611        #
612        for j in timeoutjobs:
613
614            ( i, s, r )        = j
615
616            if r:
617                new_end_timestamp    = int( s ) + r
618
619            q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
620            self.setDatabase( q )
621
622class RRDMutator:
623    """A class for performing RRD mutations"""
624
625    binary = None
626
627    def __init__( self, binary=None ):
628        """Set alternate binary if supplied"""
629
630        if binary:
631            self.binary = binary
632
633    def create( self, filename, args ):
634        """Create a new rrd with args"""
635
636        global MODRRDTOOL
637
638        if MODRRDTOOL:
639            return self.perform( 'create', filename, args )
640        else:
641            return self.perform( 'create', '"' + filename + '"', args )
642
643    def update( self, filename, args ):
644        """Update a rrd with args"""
645
646        global MODRRDTOOL
647
648        if MODRRDTOOL:
649            return self.perform( 'update', filename, args )
650        else:
651            return self.perform( 'update', '"' + filename + '"', args )
652
653    def grabLastUpdate( self, filename ):
654        """Determine the last update time of filename rrd"""
655
656        global MODRRDTOOL
657
658        last_update = 0
659
660        # Use the py-rrdtool module if it's available on this system
661        #
662        if MODRRDTOOL:
663
664            debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
665
666            rrd_header     = { }
667
668            try:
669                rrd_header    = rrdtool.info( filename )
670            except rrdtool.error, msg:
671                debug_msg( 8, str( msg ) )
672
673            if rrd_header.has_key( 'last_update' ):
674                return last_update
675            else:
676                return 0
677
678        # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
679        # DEPRECATED (slow!)
680        #
681        else:
682            debug_msg( 8, self.binary + ' info ' + filename )
683
684            my_pipe        = os.popen( self.binary + ' info "' + filename + '"' )
685
686            for line in my_pipe.readlines():
687
688                if line.find( 'last_update') != -1:
689
690                    last_update = line.split( ' = ' )[1]
691
692            if my_pipe:
693
694                my_pipe.close()
695
696            if last_update:
697                return last_update
698            else:
699                return 0
700
701
702    def perform( self, action, filename, args ):
703        """Perform action on rrd filename with args"""
704
705        global MODRRDTOOL
706
707        arg_string = None
708
709        if type( args ) is not ListType:
710            debug_msg( 8, 'Arguments needs to be of type List' )
711            return 1
712
713        for arg in args:
714
715            if not arg_string:
716
717                arg_string = arg
718            else:
719                arg_string = arg_string + ' ' + arg
720
721        if MODRRDTOOL:
722
723            debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
724
725            try:
726                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
727
728                if action == 'create':
729
730                    rrdtool.create( str( filename ), *args )
731
732                elif action == 'update':
733
734                    rrdtool.update( str( filename ), *args )
735
736            except rrdtool.error, msg:
737
738                error_msg = str( msg )
739                debug_msg( 8, error_msg )
740                return 1
741
742        else:
743
744            debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
745
746            cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
747            lines   = cmd.readlines()
748
749            cmd.close()
750
751            for line in lines:
752
753                if line.find( 'ERROR' ) != -1:
754
755                    error_msg = string.join( line.split( ' ' )[1:] )
756                    debug_msg( 8, error_msg )
757                    return 1
758
759        return 0
760
761class XMLProcessor:
762    """Skeleton class for XML processor's"""
763
764    def run( self ):
765        """Do main processing of XML here"""
766
767        pass
768
769class TorqueXMLProcessor( XMLProcessor ):
770    """Main class for processing XML and acting with it"""
771
772    def __init__( self, XMLSource, DataStore ):
773        """Setup initial XML connection and handlers"""
774
775        self.myXMLSource    = XMLSource
776        self.myXMLHandler    = TorqueXMLHandler( DataStore )
777        self.myXMLError        = XMLErrorHandler()
778
779        self.config        = GangliaConfigParser( GMETAD_CONF )
780
781    def run( self ):
782        """Main XML processing"""
783
784        debug_msg( 1, 'torque_xml_thread(): started.' )
785
786        while( 1 ):
787
788            #self.myXMLSource = self.mXMLGatherer.getFileObject()
789            debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' )
790
791            my_data    = self.myXMLSource.getData()
792
793            debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
794
795            if my_data:
796                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
797
798                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
799
800                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
801               
802            debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
803            time.sleep( self.config.getLowestInterval() )
804
805class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
806    """Parse Torque's jobinfo XML from our plugin"""
807
808    jobAttrs = { }
809
810    def __init__( self, datastore ):
811
812        #self.ds            = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
813        self.ds            = datastore
814        self.jobs_processed    = [ ]
815        self.jobs_to_store    = [ ]
816
817    def startDocument( self ):
818
819        self.heartbeat    = 0
820        self.elementct    = 0
821
822    def startElement( self, name, attrs ):
823        """
824        This XML will be all gmetric XML
825        so there will be no specific start/end element
826        just one XML statement with all info
827        """
828       
829        jobinfo = { }
830
831        self.elementct    += 1
832
833        if name == 'CLUSTER':
834
835            self.clustername = str( attrs.get( 'NAME', "" ) )
836
837        elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
838
839            metricname = str( attrs.get( 'NAME', "" ) )
840
841            if metricname == 'zplugin_monarch_heartbeat':
842                self.heartbeat = str( attrs.get( 'VAL', "" ) )
843
844            elif metricname.find( 'zplugin_monarch_job' ) != -1:
845
846                job_id    = metricname.split( 'zplugin_monarch_job_' )[1].split( '_' )[0]
847                val    = str( attrs.get( 'VAL', "" ) )
848
849                if not job_id in self.jobs_processed:
850
851                    self.jobs_processed.append( job_id )
852
853                check_change = 0
854
855                if self.jobAttrs.has_key( job_id ):
856
857                    check_change = 1
858
859                valinfo = val.split( ' ' )
860
861                for myval in valinfo:
862
863                    if len( myval.split( '=' ) ) > 1:
864
865                        valname    = myval.split( '=' )[0]
866                        value    = myval.split( '=' )[1]
867
868                        if valname == 'nodes':
869                            value = value.split( ';' )
870
871                        jobinfo[ valname ] = value
872
873                if check_change:
874                    if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
875                        self.jobAttrs[ job_id ]['stop_timestamp']    = ''
876                        self.jobAttrs[ job_id ]                = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
877                        if not job_id in self.jobs_to_store:
878                            self.jobs_to_store.append( job_id )
879
880                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
881                else:
882                    self.jobAttrs[ job_id ] = jobinfo
883
884                    if not job_id in self.jobs_to_store:
885                        self.jobs_to_store.append( job_id )
886
887                    debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
888                   
889    def endDocument( self ):
890        """When all metrics have gone, check if any jobs have finished"""
891
892        debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
893
894        if self.heartbeat:
895            for jobid, jobinfo in self.jobAttrs.items():
896
897                # This is an old job, not in current jobinfo list anymore
898                # it must have finished, since we _did_ get a new heartbeat
899                #
900                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
901
902                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
903
904                    if not jobid in self.jobs_processed:
905                        self.jobs_processed.append( jobid )
906
907                    self.jobAttrs[ jobid ]['status'] = 'F'
908                    self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
909
910                    if not jobid in self.jobs_to_store:
911                        self.jobs_to_store.append( jobid )
912
913            debug_msg( 1, 'torque_xml_thread(): Storing..' )
914
915            for jobid in self.jobs_to_store:
916                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
917
918                    self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
919
920                    if self.jobAttrs[ jobid ]['status'] == 'F':
921                        del self.jobAttrs[ jobid ]
922
923            debug_msg( 1, 'torque_xml_thread(): Done storing.' )
924
925            self.jobs_processed    = [ ]
926            self.jobs_to_store    = [ ]
927
928    def setJobAttrs( self, old, new ):
929        """
930        Set new job attributes in old, but not lose existing fields
931        if old attributes doesn't have those
932        """
933
934        for valname, value in new.items():
935            old[ valname ] = value
936
937        return old
938       
939
940    def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
941        """
942        Check if jobinfo has changed from jobattrs[jobid]
943        if it's report time is bigger than previous one
944        and it is report time is recent (equal to heartbeat)
945        """
946
947        ignore_changes = [ 'reported' ]
948
949        if jobattrs.has_key( jobid ):
950
951            for valname, value in jobinfo.items():
952
953                if valname not in ignore_changes:
954
955                    if jobattrs[ jobid ].has_key( valname ):
956
957                        if value != jobattrs[ jobid ][ valname ]:
958
959                            if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
960                                return True
961
962                    else:
963                        return True
964
965        return False
966
967class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
968    """Parse Ganglia's XML"""
969
970    def __init__( self, config, datastore ):
971        """Setup initial variables and gather info on existing rrd archive"""
972
973        self.config    = config
974        self.clusters    = { }
975        self.ds        = datastore
976
977        debug_msg( 1, 'Checking database..' )
978
979        global DEBUG_LEVEL
980
981        if DEBUG_LEVEL <= 2:
982            self.ds.checkStaleJobs()
983
984        debug_msg( 1, 'Check done.' )
985        debug_msg( 1, 'Checking rrd archive..' )
986        self.gatherClusters()
987        debug_msg( 1, 'Check done.' )
988
989    def gatherClusters( self ):
990        """Find all existing clusters in archive dir"""
991
992        archive_dir    = check_dir(ARCHIVE_PATH)
993
994        hosts        = [ ]
995
996        if os.path.exists( archive_dir ):
997
998            dirlist    = os.listdir( archive_dir )
999
1000            for cfgcluster in ARCHIVE_DATASOURCES:
1001
1002                if cfgcluster not in dirlist:
1003
1004                    # Autocreate a directory for this cluster
1005                    # assume it is new
1006                    #
1007                    cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
1008
1009                    os.mkdir( cluster_dir )
1010
1011                    dirlist.append( cfgcluster )
1012
1013            for item in dirlist:
1014
1015                clustername = item
1016
1017                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
1018
1019                    self.clusters[ clustername ] = RRDHandler( self.config, clustername )
1020
1021        debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
1022
1023    def startElement( self, name, attrs ):
1024        """Memorize appropriate data from xml start tags"""
1025
1026        if name == 'GANGLIA_XML':
1027
1028            self.XMLSource        = str( attrs.get( 'SOURCE', "" ) )
1029            self.gangliaVersion    = str( attrs.get( 'VERSION', "" ) )
1030
1031            debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
1032
1033        elif name == 'GRID':
1034
1035            self.gridName    = str( attrs.get( 'NAME', "" ) )
1036            self.time    = str( attrs.get( 'LOCALTIME', "" ) )
1037
1038            debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
1039
1040        elif name == 'CLUSTER':
1041
1042            self.clusterName    = str( attrs.get( 'NAME', "" ) )
1043            self.time        = str( attrs.get( 'LOCALTIME', "" ) )
1044
1045            if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
1046
1047                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
1048
1049                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
1050
1051        elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
1052
1053            self.hostName        = str( attrs.get( 'NAME', "" ) )
1054            self.hostIp        = str( attrs.get( 'IP', "" ) )
1055            self.hostReported    = str( attrs.get( 'REPORTED', "" ) )
1056
1057            debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
1058
1059        elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
1060
1061            type = str( attrs.get( 'TYPE', "" ) )
1062           
1063            exclude_metric = False
1064           
1065            for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1066
1067                orig_name = str( attrs.get( 'NAME', "" ) )
1068
1069                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1070               
1071                    exclude_metric = True
1072
1073                elif re.match( ex_metricstr, orig_name ):
1074
1075                    exclude_metric = True
1076
1077            if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1078
1079                myMetric        = { }
1080                myMetric['name']    = str( attrs.get( 'NAME', "" ) )
1081                myMetric['val']        = str( attrs.get( 'VAL', "" ) )
1082                myMetric['time']    = self.hostReported
1083
1084                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
1085
1086                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
1087
1088    def storeMetrics( self ):
1089        """Store metrics of each cluster rrd handler"""
1090
1091        for clustername, rrdh in self.clusters.items():
1092
1093            ret = rrdh.storeMetrics()
1094
1095            if ret:
1096                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1097                return 1
1098
1099        return 0
1100
1101class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1102
1103    def error( self, exception ):
1104        """Recoverable error"""
1105
1106        debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
1107
1108    def fatalError( self, exception ):
1109        """Non-recoverable error"""
1110
1111        exception_str = str( exception )
1112
1113        # Ignore 'no element found' errors
1114        if exception_str.find( 'no element found' ) != -1:
1115            debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1116            return 0
1117
1118        debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1119        sys.exit( 1 )
1120
1121    def warning( self, exception ):
1122        """Warning"""
1123
1124        debug_msg( 0, 'Warning ' + str( exception ) )
1125
1126class XMLGatherer:
1127    """Setup a connection and file object to Ganglia's XML"""
1128
1129    s        = None
1130    fd        = None
1131    data        = None
1132    slot        = None
1133
1134    # Time since the last update
1135    #
1136    LAST_UPDATE    = 0
1137
1138    # Minimum interval between updates
1139    #
1140    MIN_UPDATE_INT    = 10
1141
1142    # Is a update occuring now
1143    #
1144    update_now    = False
1145
1146    def __init__( self, host, port ):
1147        """Store host and port for connection"""
1148
1149        self.host    = host
1150        self.port    = port
1151        self.slot       = threading.Lock()
1152
1153        self.retrieveData()
1154
1155    def retrieveData( self ):
1156        """Setup connection to XML source"""
1157
1158        self.update_now    = True
1159
1160        self.slot.acquire()
1161
1162        self.data    = None
1163
1164        for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
1165
1166            af, socktype, proto, canonname, sa = res
1167
1168            try:
1169
1170                self.s = socket.socket( af, socktype, proto )
1171
1172            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1173
1174                self.s = None
1175                continue
1176
1177            try:
1178
1179                self.s.connect( sa )
1180
1181            except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1182
1183                self.disconnect()
1184                continue
1185
1186            break
1187
1188        if self.s is None:
1189
1190            debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1191            self.update_now    = False
1192            #sys.exit( 1 )
1193
1194        else:
1195            #self.s.send( '\n' )
1196
1197            my_fp            = self.s.makefile( 'r' )
1198            my_data          = my_fp.readlines()
1199            my_data          = string.join( my_data, '' )
1200
1201            self.data        = my_data
1202
1203            self.LAST_UPDATE = time.time()
1204
1205        self.slot.release()
1206
1207        self.update_now    = False
1208
1209    def disconnect( self ):
1210        """Close socket"""
1211
1212        if self.s:
1213            #self.s.shutdown( 2 )
1214            self.s.close()
1215            self.s = None
1216
1217    def __del__( self ):
1218        """Kill the socket before we leave"""
1219
1220        self.disconnect()
1221
1222    def reGetData( self ):
1223        """Reconnect"""
1224
1225        while self.update_now:
1226
1227            # Must be another update in progress:
1228            # Wait until the update is complete
1229            #
1230            time.sleep( 1 )
1231
1232        if self.s:
1233            self.disconnect()
1234
1235        self.retrieveData()
1236
1237    def getData( self ):
1238
1239        """Return the XML data"""
1240
1241        # If more than MIN_UPDATE_INT seconds passed since last data update
1242        # update the XML first before returning it
1243        #
1244
1245        cur_time    = time.time()
1246
1247        if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1248
1249            self.reGetData()
1250
1251        while self.update_now:
1252
1253            # Must be another update in progress:
1254            # Wait until the update is complete
1255            #
1256            time.sleep( 1 )
1257           
1258        return self.data
1259
1260    def makeFileDescriptor( self ):
1261        """Make file descriptor that points to our socket connection"""
1262
1263        self.reconnect()
1264
1265        if self.s:
1266            self.fd = self.s.makefile( 'r' )
1267
1268    def getFileObject( self ):
1269        """Connect, and return a file object"""
1270
1271        self.makeFileDescriptor()
1272
1273        if self.fd:
1274            return self.fd
1275
1276class GangliaXMLProcessor( XMLProcessor ):
1277    """Main class for processing XML and acting with it"""
1278
1279    def __init__( self, XMLSource, DataStore ):
1280        """Setup initial XML connection and handlers"""
1281
1282        self.config        = GangliaConfigParser( GMETAD_CONF )
1283
1284        #self.myXMLGatherer    = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1285        #self.myXMLSource    = self.myXMLGatherer.getFileObject()
1286        self.myXMLSource    = XMLSource
1287        self.ds            = DataStore
1288        self.myXMLHandler    = GangliaXMLHandler( self.config, self.ds )
1289        self.myXMLError        = XMLErrorHandler()
1290
1291    def run( self ):
1292        """Main XML processing; start a xml and storethread"""
1293
1294        xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1295        store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1296
1297        while( 1 ):
1298
1299            if not xml_thread.isAlive():
1300                # Gather XML at the same interval as gmetad
1301
1302                # threaded call to: self.processXML()
1303                #
1304                try:
1305                    xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1306                    xml_thread.start()
1307                except thread.error, msg:
1308                    debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1309                    #return 1
1310
1311            if not store_thread.isAlive():
1312                # Store metrics every .. sec
1313
1314                # threaded call to: self.storeMetrics()
1315                #
1316                try:
1317                    store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1318                    store_thread.start()
1319                except thread.error, msg:
1320                    debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1321                    #return 1
1322       
1323            # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1324            time.sleep( 1 )   
1325
1326    def storeMetrics( self ):
1327        """Store metrics retained in memory to disk"""
1328
1329        global DEBUG_LEVEL
1330
1331        # Store metrics somewhere between every 360 and 640 seconds
1332        #
1333        if DEBUG_LEVEL > 2:
1334            #STORE_INTERVAL = 60
1335            STORE_INTERVAL = random.randint( 360, 640 )
1336        else:
1337            STORE_INTERVAL = random.randint( 360, 640 )
1338
1339        try:
1340            store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1341            store_metric_thread.start()
1342        except thread.error, msg:
1343            debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1344            return 1
1345
1346        debug_msg( 1, 'ganglia_store_thread(): started.' )
1347
1348        debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1349        time.sleep( STORE_INTERVAL )
1350        debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
1351
1352        if store_metric_thread.isAlive():
1353
1354            debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
1355            store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1356            debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
1357
1358        debug_msg( 1, 'ganglia_store_thread(): finished.' )
1359
1360        return 0
1361
1362    def storeThread( self ):
1363        """Actual metric storing thread"""
1364
1365        debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1366        debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
1367        ret = self.myXMLHandler.storeMetrics()
1368        if ret > 0:
1369            debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
1370        debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1371        debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1372       
1373        return 0
1374
1375    def processXML( self ):
1376        """Process XML"""
1377
1378        try:
1379            parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1380            parsethread.start()
1381        except thread.error, msg:
1382            debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1383            return 1
1384
1385        debug_msg( 1, 'ganglia_xml_thread(): started.' )
1386
1387        debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1388        time.sleep( float( self.config.getLowestInterval() ) )   
1389        debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
1390
1391        if parsethread.isAlive():
1392
1393            debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
1394            parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1395            debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
1396
1397        debug_msg( 1, 'ganglia_xml_thread(): finished.' )
1398
1399        return 0
1400
1401    def parseThread( self ):
1402        """Actual parsing thread"""
1403
1404        debug_msg( 1, 'ganglia_parse_thread(): started.' )
1405        debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
1406       
1407        my_data    = self.myXMLSource.getData()
1408
1409        debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
1410
1411        if my_data:
1412            debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1413            xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1414            debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1415
1416        debug_msg( 1, 'ganglia_parse_thread(): finished.' )
1417
1418        return 0
1419
1420class GangliaConfigParser:
1421
1422    sources = [ ]
1423
1424    def __init__( self, config ):
1425        """Parse some stuff from our gmetad's config, such as polling interval"""
1426
1427        self.config = config
1428        self.parseValues()
1429
1430    def parseValues( self ):
1431        """Parse certain values from gmetad.conf"""
1432
1433        readcfg = open( self.config, 'r' )
1434
1435        for line in readcfg.readlines():
1436
1437            if line.count( '"' ) > 1:
1438
1439                if line.find( 'data_source' ) != -1 and line[0] != '#':
1440
1441                    source        = { }
1442                    source['name']    = line.split( '"' )[1]
1443                    source_words    = line.split( '"' )[2].split( ' ' )
1444
1445                    for word in source_words:
1446
1447                        valid_interval = 1
1448
1449                        for letter in word:
1450
1451                            if letter not in string.digits:
1452
1453                                valid_interval = 0
1454
1455                        if valid_interval and len(word) > 0:
1456
1457                            source['interval'] = word
1458                            debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
1459   
1460                    # No interval found, use Ganglia's default   
1461                    if not source.has_key( 'interval' ):
1462                        source['interval'] = 15
1463                        debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1464
1465                    self.sources.append( source )
1466
1467    def getInterval( self, source_name ):
1468        """Return interval for source_name"""
1469
1470        for source in self.sources:
1471
1472            if source['name'] == source_name:
1473
1474                return source['interval']
1475
1476        return None
1477
1478    def getLowestInterval( self ):
1479        """Return the lowest interval of all clusters"""
1480
1481        lowest_interval = 0
1482
1483        for source in self.sources:
1484
1485            if not lowest_interval or source['interval'] <= lowest_interval:
1486
1487                lowest_interval = source['interval']
1488
1489        # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1490        if lowest_interval:
1491            return lowest_interval
1492        else:
1493            return 15
1494
1495class RRDHandler:
1496    """Class for handling RRD activity"""
1497
1498    myMetrics = { }
1499    lastStored = { }
1500    timeserials = { }
1501    slot = None
1502
1503    def __init__( self, config, cluster ):
1504        """Setup initial variables"""
1505
1506        global MODRRDTOOL
1507
1508        self.block    = 0
1509        self.cluster    = cluster
1510        self.config    = config
1511        self.slot    = threading.Lock()
1512
1513        if MODRRDTOOL:
1514
1515            self.rrdm    = RRDMutator()
1516        else:
1517            self.rrdm    = RRDMutator( RRDTOOL )
1518
1519        global DEBUG_LEVEL
1520
1521        if DEBUG_LEVEL <= 2:
1522            self.gatherLastUpdates()
1523
1524    def gatherLastUpdates( self ):
1525        """Populate the lastStored list, containing timestamps of all last updates"""
1526
1527        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1528
1529        hosts = [ ]
1530
1531        if os.path.exists( cluster_dir ):
1532
1533            dirlist = os.listdir( cluster_dir )
1534
1535            for dir in dirlist:
1536
1537                hosts.append( dir )
1538
1539        for host in hosts:
1540
1541            host_dir    = cluster_dir + '/' + host
1542            dirlist        = os.listdir( host_dir )
1543
1544            for dir in dirlist:
1545
1546                if not self.timeserials.has_key( host ):
1547
1548                    self.timeserials[ host ] = [ ]
1549
1550                self.timeserials[ host ].append( dir )
1551
1552            last_serial = self.getLastRrdTimeSerial( host )
1553
1554            if last_serial:
1555
1556                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1557
1558                if os.path.exists( metric_dir ):
1559
1560                    dirlist = os.listdir( metric_dir )
1561
1562                    for file in dirlist:
1563
1564                        metricname = file.split( '.rrd' )[0]
1565
1566                        if not self.lastStored.has_key( host ):
1567
1568                            self.lastStored[ host ] = { }
1569
1570                        self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1571
1572    def getClusterName( self ):
1573        """Return clustername"""
1574
1575        return self.cluster
1576
1577    def memMetric( self, host, metric ):
1578        """Store metric from host in memory"""
1579
1580        # <ATOMIC>
1581        #
1582        self.slot.acquire()
1583       
1584        if self.myMetrics.has_key( host ):
1585
1586            if self.myMetrics[ host ].has_key( metric['name'] ):
1587
1588                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1589
1590                    if mymetric['time'] == metric['time']:
1591
1592                        # Allready have this metric, abort
1593                        self.slot.release()
1594                        return 1
1595            else:
1596                self.myMetrics[ host ][ metric['name'] ] = [ ]
1597        else:
1598            self.myMetrics[ host ]                = { }
1599            self.myMetrics[ host ][ metric['name'] ]    = [ ]
1600
1601        # Push new metric onto stack
1602        # atomic code; only 1 thread at a time may access the stack
1603
1604        self.myMetrics[ host ][ metric['name'] ].append( metric )
1605
1606        self.slot.release()
1607        #
1608        # </ATOMIC>
1609
1610    def makeUpdateList( self, host, metriclist ):
1611        """
1612        Make a list of update values for rrdupdate
1613        but only those that we didn't store before
1614        """
1615
1616        update_list    = [ ]
1617        metric        = None
1618
1619        while len( metriclist ) > 0:
1620
1621            metric = metriclist.pop( 0 )
1622
1623            if self.checkStoreMetric( host, metric ):
1624
1625                u_val    = str( metric['time'] ) + ':' + str( metric['val'] )
1626                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1627                update_list.append( u_val )
1628
1629        return update_list
1630
1631    def checkStoreMetric( self, host, metric ):
1632        """Check if supplied metric if newer than last one stored"""
1633
1634        if self.lastStored.has_key( host ):
1635
1636            if self.lastStored[ host ].has_key( metric['name'] ):
1637
1638                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1639
1640                    # This is old
1641                    return 0
1642
1643        return 1
1644
1645    def memLastUpdate( self, host, metricname, metriclist ):
1646        """
1647        Memorize the time of the latest metric from metriclist
1648        but only if it wasn't allready memorized
1649        """
1650
1651        if not self.lastStored.has_key( host ):
1652            self.lastStored[ host ] = { }
1653
1654        last_update_time = 0
1655
1656        for metric in metriclist:
1657
1658            if metric['name'] == metricname:
1659
1660                if metric['time'] > last_update_time:
1661
1662                    last_update_time = metric['time']
1663
1664        if self.lastStored[ host ].has_key( metricname ):
1665           
1666            if last_update_time <= self.lastStored[ host ][ metricname ]:
1667                return 1
1668
1669        self.lastStored[ host ][ metricname ] = last_update_time
1670
1671    def storeMetrics( self ):
1672        """
1673        Store all metrics from memory to disk
1674        and do it to the RRD's in appropriate timeperiod directory
1675        """
1676
1677        debug_msg( 5, "Entering storeMetrics()")
1678
1679        count_values    = 0
1680        count_metrics    = 0
1681        count_bits    = 0
1682
1683        for hostname, mymetrics in self.myMetrics.items():   
1684
1685            for metricname, mymetric in mymetrics.items():
1686
1687                count_metrics += 1
1688
1689                for dmetric in mymetric:
1690
1691                    count_values += 1
1692
1693                    count_bits    += len( dmetric['time'] )
1694                    count_bits    += len( dmetric['val'] )
1695
1696        count_bytes    = count_bits / 8
1697
1698        debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1699            str( len( self.myMetrics.keys() ) ) + " hosts " + 
1700            str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1701            str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1702
1703        for hostname, mymetrics in self.myMetrics.items():   
1704
1705            for metricname, mymetric in mymetrics.items():
1706
1707                metrics_to_store = [ ]
1708
1709                # Pop metrics from stack for storing until none is left
1710                # atomic code: only 1 thread at a time may access myMetrics
1711
1712                # <ATOMIC>
1713                #
1714                self.slot.acquire() 
1715
1716                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1717
1718                    if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1719
1720                        try:
1721                            metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1722                        except IndexError, msg:
1723
1724                            # Somehow sometimes myMetrics[ hostname ][ metricname ]
1725                            # is still len 0 when the statement is executed.
1726                            # Just ignore indexerror's..
1727                            pass
1728
1729                self.slot.release()
1730                #
1731                # </ATOMIC>
1732
1733                # Create a mapping table, each metric to the period where it should be stored
1734                #
1735                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1736
1737                update_rets = [ ]
1738
1739                for period, pmetric in metric_serial_table.items():
1740
1741                    create_ret = self.createCheck( hostname, metricname, period )   
1742
1743                    update_ret = self.update( hostname, metricname, period, pmetric )
1744
1745                    if update_ret == 0:
1746
1747                        debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1748                    else:
1749                        debug_msg( 9, 'metric update failed' )
1750
1751                    update_rets.append( create_ret )
1752                    update_rets.append( update_ret )
1753
1754                # Lets ignore errors here for now, we need to make sure last update time
1755                # is correct!
1756                #
1757                #if not (1) in update_rets:
1758
1759                self.memLastUpdate( hostname, metricname, metrics_to_store )
1760
1761        debug_msg( 5, "Leaving storeMetrics()")
1762
1763    def makeTimeSerial( self ):
1764        """Generate a time serial. Seconds since epoch"""
1765
1766        # Seconds since epoch
1767        mytime = int( time.time() )
1768
1769        return mytime
1770
1771    def makeRrdPath( self, host, metricname, timeserial ):
1772        """Make a RRD location/path and filename"""
1773
1774        rrd_dir        = '%s/%s/%s/%s'    %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1775        rrd_file    = '%s/%s.rrd'    %( rrd_dir, metricname )
1776
1777        return rrd_dir, rrd_file
1778
1779    def getLastRrdTimeSerial( self, host ):
1780        """Find the last timeserial (directory) for this host"""
1781
1782        newest_timeserial = 0
1783
1784        for dir in self.timeserials[ host ]:
1785
1786            valid_dir = 1
1787
1788            for letter in dir:
1789                if letter not in string.digits:
1790                    valid_dir = 0
1791
1792            if valid_dir:
1793                timeserial = dir
1794                if timeserial > newest_timeserial:
1795                    newest_timeserial = timeserial
1796
1797        if newest_timeserial:
1798            return newest_timeserial
1799        else:
1800            return 0
1801
1802    def determinePeriod( self, host, check_serial ):
1803        """Determine to which period (directory) this time(serial) belongs"""
1804
1805        period_serial = 0
1806
1807        if self.timeserials.has_key( host ):
1808
1809            for serial in self.timeserials[ host ]:
1810
1811                if check_serial >= serial and period_serial < serial:
1812
1813                    period_serial = serial
1814
1815        return period_serial
1816
1817    def determineSerials( self, host, metricname, metriclist ):
1818        """
1819        Determine the correct serial and corresponding rrd to store
1820        for a list of metrics
1821        """
1822
1823        metric_serial_table = { }
1824
1825        for metric in metriclist:
1826
1827            if metric['name'] == metricname:
1828
1829                period        = self.determinePeriod( host, metric['time'] )   
1830
1831                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1832
1833                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1834
1835                    # This one should get it's own new period
1836                    period = metric['time']
1837
1838                    if not self.timeserials.has_key( host ):
1839                        self.timeserials[ host ] = [ ]
1840
1841                    self.timeserials[ host ].append( period )
1842
1843                if not metric_serial_table.has_key( period ):
1844
1845                    metric_serial_table[ period ] = [ ]
1846
1847                metric_serial_table[ period ].append( metric )
1848
1849        return metric_serial_table
1850
1851    def createCheck( self, host, metricname, timeserial ):
1852        """Check if an rrd allready exists for this metric, create if not"""
1853
1854        debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1855       
1856        rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1857
1858        if not os.path.exists( rrd_dir ):
1859
1860            try:
1861                os.makedirs( rrd_dir )
1862
1863            except os.OSError, msg:
1864
1865                if msg.find( 'File exists' ) != -1:
1866
1867                    # Ignore exists errors
1868                    pass
1869
1870                else:
1871
1872                    print msg
1873                    return
1874
1875            debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
1876
1877        if not os.path.exists( rrd_file ):
1878
1879            interval    = self.config.getInterval( self.cluster )
1880            heartbeat    = 8 * int( interval )
1881
1882            params        = [ ]
1883
1884            params.append( '--step' )
1885            params.append( str( interval ) )
1886
1887            params.append( '--start' )
1888            params.append( str( int( timeserial ) - 1 ) )
1889
1890            params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1891            params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
1892
1893            self.rrdm.create( str(rrd_file), params )
1894
1895            debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1896
1897    def update( self, host, metricname, timeserial, metriclist ):
1898        """
1899        Update rrd file for host with metricname
1900        in directory timeserial with metriclist
1901        """
1902
1903        debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1904
1905        rrd_dir, rrd_file    = self.makeRrdPath( host, metricname, timeserial )
1906
1907        update_list        = self.makeUpdateList( host, metriclist )
1908
1909        if len( update_list ) > 0:
1910            ret = self.rrdm.update( str(rrd_file), update_list )
1911
1912            if ret:
1913                return 1
1914       
1915            debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
1916
1917        return 0
1918
1919def daemon():
1920    """daemonized threading"""
1921
1922    # Fork the first child
1923    #
1924    pid = os.fork()
1925
1926    if pid > 0:
1927
1928        sys.exit(0)  # end parent
1929
1930    # creates a session and sets the process group ID
1931    #
1932    os.setsid()
1933
1934    # Fork the second child
1935    #
1936    pid = os.fork()
1937
1938    if pid > 0:
1939
1940        sys.exit(0)  # end parent
1941
1942    write_pidfile()
1943
1944    # Go to the root directory and set the umask
1945    #
1946    os.chdir('/')
1947    os.umask(0)
1948
1949    sys.stdin.close()
1950    sys.stdout.close()
1951    sys.stderr.close()
1952
1953    os.open('/dev/null', os.O_RDWR)
1954    os.dup2(0, 1)
1955    os.dup2(0, 2)
1956
1957    run()
1958
1959def run():
1960    """Threading start"""
1961
1962    config        = GangliaConfigParser( GMETAD_CONF )
1963    s_timeout    = int( config.getLowestInterval() - 1 )
1964
1965    socket.setdefaulttimeout( s_timeout )
1966
1967    myXMLSource        = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1968    myDataStore        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
1969
1970    myTorqueProcessor    = TorqueXMLProcessor( myXMLSource, myDataStore )
1971    myGangliaProcessor    = GangliaXMLProcessor( myXMLSource, myDataStore )
1972
1973    try:
1974        torque_xml_thread    = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1975        ganglia_xml_thread    = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
1976
1977        torque_xml_thread.start()
1978        ganglia_xml_thread.start()
1979       
1980    except thread.error, msg:
1981        debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1982        syslog.closelog()
1983        sys.exit(1)
1984       
1985    debug_msg( 0, 'main threading started.' )
1986
1987def main():
1988    """Program startup"""
1989
1990    global DAEMONIZE, USE_SYSLOG
1991
1992    if not processArgs( sys.argv[1:] ):
1993        sys.exit( 1 )
1994
1995    if( DAEMONIZE and USE_SYSLOG ):
1996        syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1997
1998    if DAEMONIZE:
1999        daemon()
2000    else:
2001        run()
2002
2003#
2004# Global functions
2005#
2006
2007def check_dir( directory ):
2008    """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
2009
2010    if directory[-1] == '/':
2011        directory = directory[:-1]
2012
2013    return directory
2014
2015def reqtime2epoch( rtime ):
2016
2017    (hours, minutes, seconds )    = rtime.split( ':' )
2018
2019    etime    = int(seconds)
2020    etime    = etime + ( int(minutes) * 60 )
2021    etime    = etime + ( int(hours) * 60 * 60 )
2022
2023    return etime
2024
2025def debug_msg( level, msg ):
2026    """Only print msg if correct levels"""
2027
2028    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2029        sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2030   
2031    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2032        syslog.syslog( msg )
2033
2034def printTime( ):
2035    """Print current time in human readable format"""
2036
2037    return time.strftime("%a %d %b %Y %H:%M:%S")
2038
2039def write_pidfile():
2040
2041    # Write pidfile if PIDFILE exists
2042    if PIDFILE:
2043
2044        pid     = os.getpid()
2045
2046        pidfile = open(PIDFILE, 'w')
2047
2048        pidfile.write( str( pid ) )
2049        pidfile.close()
2050
2051# Ooohh, someone started me! Let's go..
2052#
2053if __name__ == '__main__':
2054    main()
Note: See TracBrowser for help on using the repository browser.