source: trunk/jobarchived/jobarchived.py @ 478

Last change on this file since 478 was 473, checked in by bastiaans, 16 years ago

jobarchived/jobarchived.py:

  • version: prepare for 0.3 release
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 48.4 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 473 2008-02-22 10:44:40Z bastiaans $
22#
23
24import getopt, syslog, ConfigParser, sys
25
26VERSION='0.3'
27
28def usage( ver ):
29
30        print 'jobarchived %s' %VERSION
31
32        if ver:
33                return 0
34
35        print
36        print 'Purpose:'
37        print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
38        print '  and node statistics in a RRD archive'
39        print
40        print 'Usage:   jobarchived [OPTIONS]'
41        print
42        print '  -c, --config=FILE      The configuration file to use (default: /etc/jobarchived.conf)'
43        print '  -p, --pidfile=FILE     Use pid file to store the process id'
44        print '  -h, --help             Print help and exit'
45        print '  -v, --version          Print version and exit'
46        print
47
48def processArgs( args ):
49
50        SHORT_L = 'p:hvc:'
51        LONG_L  = [ 'help', 'config=', 'pidfile=', 'version' ]
52
53        config_filename = '/etc/jobarchived.conf'
54
55        global PIDFILE
56
57        PIDFILE = None
58
59        try:
60
61                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
62
63        except getopt.error, detail:
64
65                print detail
66                sys.exit(1)
67
68        for opt, value in opts:
69
70                if opt in [ '--config', '-c' ]:
71
72                        config_filename = value
73
74                if opt in [ '--pidfile', '-p' ]:
75
76                        PIDFILE         = value
77
78                if opt in [ '--help', '-h' ]:
79
80                        usage( False )
81                        sys.exit( 0 )
82
83                if opt in [ '--version', '-v' ]:
84
85                        usage( True )
86                        sys.exit( 0 )
87
88        try:
89                return loadConfig( config_filename )
90
91        except ConfigParser.NoOptionError, detail:
92
93                print detail
94                sys.exit( 1 )
95
96def loadConfig( filename ):
97
98        def getlist( cfg_string ):
99
100                my_list = [ ]
101
102                for item_txt in cfg_string.split( ',' ):
103
104                        sep_char = None
105
106                        item_txt = item_txt.strip()
107
108                        for s_char in [ "'", '"' ]:
109
110                                if item_txt.find( s_char ) != -1:
111
112                                        if item_txt.count( s_char ) != 2:
113
114                                                print 'Missing quote: %s' %item_txt
115                                                sys.exit( 1 )
116
117                                        else:
118
119                                                sep_char = s_char
120                                                break
121
122                        if sep_char:
123
124                                item_txt = item_txt.split( sep_char )[1]
125
126                        my_list.append( item_txt )
127
128                return my_list
129
130        cfg = ConfigParser.ConfigParser()
131
132        cfg.read( filename )
133
134        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
135        global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
136        global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
137
138        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
139
140        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
141
142        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
143
144        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
145
146        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
147
148        MODRRDTOOL              = False
149
150        try:
151                global rrdtool
152                import rrdtool
153
154                MODRRDTOOL              = True
155
156        except ImportError:
157
158                MODRRDTOOL              = False
159
160                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!" )
161
162                RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
163
164        try:
165
166                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
167
168        except AttributeError, detail:
169
170                print 'Unknown syslog facility'
171                sys.exit( 1 )
172
173        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
174
175        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
176
177        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
178
179        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
180
181        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
182
183        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
184
185        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
186
187
188        return True
189
190# What XML data types not to store
191#
192UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
193
194# Maximum time (in seconds) a parsethread may run
195#
196PARSE_TIMEOUT = 60
197
198# Maximum time (in seconds) a storethread may run
199#
200STORE_TIMEOUT = 360
201
202"""
203The Job Archiving Daemon
204"""
205
206from types import *
207
208import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
209from pyPgSQL import PgSQL
210
211# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
212# and added some more functions for postgres.
213#       
214#
215# Changed by: Bas van der Vlies <basv@sara.nl>
216#       
217# SARA API for Postgres Database
218#
219# Changed by: Ramon Bastiaans for Job Monarch
220#
221
222class InitVars:
223        Vars = {}
224       
225        def __init__(self, **key_arg):
226                for (key, value) in key_arg.items():
227                        if value:
228                                self.Vars[key] = value
229                        else:   
230                                self.Vars[key] = None
231                               
232        def __call__(self, *key):
233                key = "%s" % key
234                return self.Vars[key]
235               
236        def __getitem__(self, key):
237                return self.Vars[key]
238               
239        def __repr__(self):
240                return repr(self.Vars)
241               
242        def keys(self):
243                barf =  map(None, self.Vars.keys())
244                return barf
245               
246        def values(self):
247                barf =  map(None, self.Vars.values())
248                return barf
249               
250        def has_key(self, key):
251                if self.Vars.has_key(key):
252                        return 1
253                else:   
254                        return 0
255                       
256class DBError(Exception):
257        def __init__(self, msg=''):
258                self.msg = msg
259                Exception.__init__(self, msg)
260        def __repr__(self):
261                return self.msg
262        __str__ = __repr__
263
264#
265# Class to connect to a database
266# and return the queury in a list or dictionairy.
267#
268class DB:
269        def __init__(self, db_vars):
270
271                self.dict = db_vars
272
273                if self.dict.has_key('User'):
274                        self.user = self.dict['User']
275                else:
276                        self.user = 'postgres'
277
278                if self.dict.has_key('Host'):
279                        self.host = self.dict['Host']
280                else:
281                        self.host = 'localhost'
282
283                if self.dict.has_key('Password'):
284                        self.passwd = self.dict['Password']
285                else:
286                        self.passwd = ''
287
288                if self.dict.has_key('DataBaseName'):
289                        self.db = self.dict['DataBaseName']
290                else:
291                        self.db = 'uva_cluster_db'
292
293                # connect_string = 'host:port:database:user:password:
294                dsn = "%s::%s:%s:%s" %(self.host, self.db, self.user, self.passwd)
295
296                try:
297                        self.SQL = PgSQL.connect(dsn)
298                except PgSQL.Error, details:
299                        str = "%s" %details
300                        raise DBError(str)
301
302        def __repr__(self):
303                return repr(self.result)
304
305        def __nonzero__(self):
306                return not(self.result == None)
307
308        def __len__(self):
309                return len(self.result)
310
311        def __getitem__(self,i):
312                return self.result[i]
313
314        def __getslice__(self,i,j):
315                return self.result[i:j]
316
317        def Get(self, q_str):
318                c = self.SQL.cursor()
319                try:
320                        c.execute(q_str)
321                        result = c.fetchall()
322                except PgSQL.Error, details:
323                        c.close()
324                        str = "%s" %details
325                        raise DBError(str)
326
327                c.close()
328                return result
329
330        def Set(self, q_str):
331                c = self.SQL.cursor()
332                try:
333                        c.execute(q_str)
334                        result = c.oidValue
335
336                except PgSQL.Error, details:
337                        c.close()
338                        str = "%s" %details
339                        raise DBError(str)
340
341                c.close()
342                return result
343
344        def Commit(self):
345                self.SQL.commit()
346
347class DataSQLStore:
348
349        db_vars = None
350        dbc = None
351
352        def __init__( self, hostname, database ):
353
354                self.db_vars = InitVars(DataBaseName=database,
355                                User='root',
356                                Host=hostname,
357                                Password='',
358                                Dictionary='true')
359
360                try:
361                        self.dbc     = DB(self.db_vars)
362                except DBError, details:
363                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
364                        sys.exit(1)
365
366        def setDatabase(self, statement):
367                ret = self.doDatabase('set', statement)
368                return ret
369               
370        def getDatabase(self, statement):
371                ret = self.doDatabase('get', statement)
372                return ret
373
374        def doDatabase(self, type, statement):
375
376                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
377                try:
378                        if type == 'set':
379                                result = self.dbc.Set( statement )
380                                self.dbc.Commit()
381                        elif type == 'get':
382                                result = self.dbc.Get( statement )
383                               
384                except DBError, detail:
385                        operation = statement.split(' ')[0]
386                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
387                        sys.exit(1)
388
389                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
390                return result
391
392        def getJobNodeId( self, job_id, node_id ):
393
394                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
395                if len( id ) > 0:
396
397                        if len( id[0] ) > 0 and id[0] != '':
398                       
399                                return 1
400
401                return 0
402
403        def getNodeId( self, hostname ):
404
405                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
406
407                if len( id ) > 0:
408
409                        id = id[0][0]
410
411                        return id
412                else:
413                        return None
414
415        def getNodeIds( self, hostnames ):
416
417                ids = [ ]
418
419                for node in hostnames:
420
421                        id = self.getNodeId( node )
422
423                        if id:
424                                ids.append( id )
425
426                return ids
427
428        def getJobId( self, jobid ):
429
430                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
431
432                if id:
433                        id = id[0][0]
434
435                        return id
436                else:
437                        return None
438
439        def addJob( self, job_id, jobattrs ):
440
441                if not self.getJobId( job_id ):
442
443                        self.mutateJob( 'insert', job_id, jobattrs ) 
444                else:
445                        self.mutateJob( 'update', job_id, jobattrs )
446
447        def mutateJob( self, action, job_id, jobattrs ):
448
449                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
450
451                insert_col_str  = 'job_id'
452                insert_val_str  = "'%s'" %job_id
453                update_str      = None
454
455                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
456
457                ids = [ ]
458
459                for valname, value in jobattrs.items():
460
461                        if valname in job_values and value != '':
462
463                                column_name = 'job_' + valname
464
465                                if action == 'insert':
466
467                                        if not insert_col_str:
468                                                insert_col_str = column_name
469                                        else:
470                                                insert_col_str = insert_col_str + ',' + column_name
471
472                                        if not insert_val_str:
473                                                insert_val_str = value
474                                        else:
475                                                insert_val_str = insert_val_str + ",'%s'" %value
476
477                                elif action == 'update':
478                                       
479                                        if not update_str:
480                                                update_str = "%s='%s'" %(column_name, value)
481                                        else:
482                                                update_str = update_str + ",%s='%s'" %(column_name, value)
483
484                        elif valname == 'nodes' and value:
485
486                                node_valid = 1
487
488                                if len(value) == 1:
489                               
490                                        if jobattrs['status'] == 'Q':
491
492                                                node_valid = 0
493
494                                        else:
495
496                                                node_valid = 0
497
498                                                for node_char in str(value[0]):
499
500                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
501
502                                                                node_valid = 1
503
504                                if node_valid:
505
506                                        ids = self.addNodes( value, jobattrs['domain'] )
507
508                if action == 'insert':
509
510                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
511
512                elif action == 'update':
513
514                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
515
516                if len( ids ) > 0:
517                        self.addJobNodes( job_id, ids )
518
519        def addNodes( self, hostnames, domain ):
520
521                ids = [ ]
522
523                for node in hostnames:
524
525                        node    = '%s.%s' %( node, domain )
526                        id      = self.getNodeId( node )
527       
528                        if not id:
529                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
530                                id = self.getNodeId( node )
531
532                        ids.append( id )
533
534                return ids
535
536        def addJobNodes( self, jobid, nodes ):
537
538                for node in nodes:
539
540                        if not self.getJobNodeId( jobid, node ):
541
542                                self.addJobNode( jobid, node )
543
544        def addJobNode( self, jobid, nodeid ):
545
546                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
547
548        def storeJobInfo( self, jobid, jobattrs ):
549
550                self.addJob( jobid, jobattrs )
551
552        def checkStaleJobs( self ):
553
554                # Locate all jobs in the database that are not set to finished
555                #
556                q = "SELECT * from jobs WHERE job_status != 'F'"
557
558                r = self.getDatabase( q )
559
560                if len( r ) == 0:
561
562                        return None
563
564                cleanjobs       = [ ]
565                timeoutjobs     = [ ]
566
567                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
568                cur_time        = time.time()
569
570                for row in r:
571
572                        job_id                  = row[0]
573                        job_requested_time      = row[4]
574                        job_status              = row[7]
575                        job_start_timestamp     = row[8]
576
577                        # If it was set to queued and we didn't see it started
578                        # there's not point in keeping it around
579                        #
580                        if job_status == 'Q' or not job_start_timestamp:
581
582                                cleanjobs.append( job_id )
583
584                        else:
585
586                                start_timestamp = int( job_start_timestamp )
587
588                                # If it was set to running longer than JOB_TIMEOUT
589                                # close the job: it probably finished while we were not running
590                                #
591                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
592
593                                        if job_requested_time:
594
595                                                rtime_epoch     = reqtime2epoch( job_requested_time )
596                                        else:
597                                                rtime_epoch     = None
598                                       
599                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
600
601                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
602
603                # Purge these from database
604                #
605                for j in cleanjobs:
606
607                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
608                        self.setDatabase( q )
609
610                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
611
612                # Close these jobs in the database
613                # update the stop_timestamp to: start_timestamp + requested wallclock
614                # and set state: finished
615                #
616                for j in timeoutjobs:
617
618                        ( i, s, r )             = j
619
620                        if r:
621                                new_end_timestamp       = int( s ) + r
622
623                        q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
624                        self.setDatabase( q )
625
626class RRDMutator:
627        """A class for performing RRD mutations"""
628
629        binary = None
630
631        def __init__( self, binary=None ):
632                """Set alternate binary if supplied"""
633
634                if binary:
635                        self.binary = binary
636
637        def create( self, filename, args ):
638                """Create a new rrd with args"""
639
640                global MODRRDTOOL
641
642                if MODRRDTOOL:
643                        return self.perform( 'create', filename, args )
644                else:
645                        return self.perform( 'create', '"' + filename + '"', args )
646
647        def update( self, filename, args ):
648                """Update a rrd with args"""
649
650                global MODRRDTOOL
651
652                if MODRRDTOOL:
653                        return self.perform( 'update', filename, args )
654                else:
655                        return self.perform( 'update', '"' + filename + '"', args )
656
657        def grabLastUpdate( self, filename ):
658                """Determine the last update time of filename rrd"""
659
660                global MODRRDTOOL
661
662                last_update = 0
663
664                # Use the py-rrdtool module if it's available on this system
665                #
666                if MODRRDTOOL:
667
668                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
669
670                        rrd_header      = { }
671
672                        try:
673                                rrd_header      = rrdtool.info( filename )
674                        except rrdtool.error, msg:
675                                debug_msg( 8, str( msg ) )
676
677                        if rrd_header.has_key( 'last_update' ):
678                                return last_update
679                        else:
680                                return 0
681
682                # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
683                # DEPRECATED (slow!)
684                #
685                else:
686                        debug_msg( 8, self.binary + ' info ' + filename )
687
688                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
689
690                        for line in my_pipe.readlines():
691
692                                if line.find( 'last_update') != -1:
693
694                                        last_update = line.split( ' = ' )[1]
695
696                        if my_pipe:
697
698                                my_pipe.close()
699
700                        if last_update:
701                                return last_update
702                        else:
703                                return 0
704
705
706        def perform( self, action, filename, args ):
707                """Perform action on rrd filename with args"""
708
709                global MODRRDTOOL
710
711                arg_string = None
712
713                if type( args ) is not ListType:
714                        debug_msg( 8, 'Arguments needs to be of type List' )
715                        return 1
716
717                for arg in args:
718
719                        if not arg_string:
720
721                                arg_string = arg
722                        else:
723                                arg_string = arg_string + ' ' + arg
724
725                if MODRRDTOOL:
726
727                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
728
729                        try:
730                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
731
732                                if action == 'create':
733
734                                        rrdtool.create( str( filename ), *args )
735
736                                elif action == 'update':
737
738                                        rrdtool.update( str( filename ), *args )
739
740                        except rrdtool.error, msg:
741
742                                error_msg = str( msg )
743                                debug_msg( 8, error_msg )
744                                return 1
745
746                else:
747
748                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
749
750                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
751                        lines   = cmd.readlines()
752
753                        cmd.close()
754
755                        for line in lines:
756
757                                if line.find( 'ERROR' ) != -1:
758
759                                        error_msg = string.join( line.split( ' ' )[1:] )
760                                        debug_msg( 8, error_msg )
761                                        return 1
762
763                return 0
764
765class XMLProcessor:
766        """Skeleton class for XML processor's"""
767
768        def run( self ):
769                """Do main processing of XML here"""
770
771                pass
772
773class TorqueXMLProcessor( XMLProcessor ):
774        """Main class for processing XML and acting with it"""
775
776        def __init__( self, XMLSource, DataStore ):
777                """Setup initial XML connection and handlers"""
778
779                self.myXMLSource        = XMLSource
780                self.myXMLHandler       = TorqueXMLHandler( DataStore )
781                self.myXMLError         = XMLErrorHandler()
782
783                self.config             = GangliaConfigParser( GMETAD_CONF )
784
785        def run( self ):
786                """Main XML processing"""
787
788                debug_msg( 1, 'torque_xml_thread(): started.' )
789
790                while( 1 ):
791
792                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
793                        debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' )
794
795                        my_data = self.myXMLSource.getData()
796
797                        debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
798
799                        if my_data:
800                                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
801
802                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
803
804                                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
805                               
806                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
807                        time.sleep( self.config.getLowestInterval() )
808
809class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
810        """Parse Torque's jobinfo XML from our plugin"""
811
812        jobAttrs = { }
813
814        def __init__( self, datastore ):
815
816                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
817                self.ds                 = datastore
818                self.jobs_processed     = [ ]
819                self.jobs_to_store      = [ ]
820
821        def startDocument( self ):
822
823                self.heartbeat  = 0
824                self.elementct  = 0
825
826        def startElement( self, name, attrs ):
827                """
828                This XML will be all gmetric XML
829                so there will be no specific start/end element
830                just one XML statement with all info
831                """
832               
833                jobinfo = { }
834
835                self.elementct  += 1
836
837                if name == 'CLUSTER':
838
839                        self.clustername = str( attrs.get( 'NAME', "" ) )
840
841                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
842
843                        metricname = str( attrs.get( 'NAME', "" ) )
844
845                        if metricname == 'MONARCH-HEARTBEAT':
846                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
847
848                        elif metricname.find( 'MONARCH-JOB' ) != -1:
849
850                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
851                                val     = str( attrs.get( 'VAL', "" ) )
852
853                                if not job_id in self.jobs_processed:
854
855                                        self.jobs_processed.append( job_id )
856
857                                check_change = 0
858
859                                if self.jobAttrs.has_key( job_id ):
860
861                                        check_change = 1
862
863                                valinfo = val.split( ' ' )
864
865                                for myval in valinfo:
866
867                                        if len( myval.split( '=' ) ) > 1:
868
869                                                valname = myval.split( '=' )[0]
870                                                value   = myval.split( '=' )[1]
871
872                                                if valname == 'nodes':
873                                                        value = value.split( ';' )
874
875                                                jobinfo[ valname ] = value
876
877                                if check_change:
878                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
879                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
880                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
881                                                if not job_id in self.jobs_to_store:
882                                                        self.jobs_to_store.append( job_id )
883
884                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
885                                else:
886                                        self.jobAttrs[ job_id ] = jobinfo
887
888                                        if not job_id in self.jobs_to_store:
889                                                self.jobs_to_store.append( job_id )
890
891                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
892                                       
893        def endDocument( self ):
894                """When all metrics have gone, check if any jobs have finished"""
895
896                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
897
898                if self.heartbeat:
899                        for jobid, jobinfo in self.jobAttrs.items():
900
901                                # This is an old job, not in current jobinfo list anymore
902                                # it must have finished, since we _did_ get a new heartbeat
903                                #
904                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
905
906                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
907
908                                        if not jobid in self.jobs_processed:
909                                                self.jobs_processed.append( jobid )
910
911                                        self.jobAttrs[ jobid ]['status'] = 'F'
912                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
913
914                                        if not jobid in self.jobs_to_store:
915                                                self.jobs_to_store.append( jobid )
916
917                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
918
919                        for jobid in self.jobs_to_store:
920                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
921
922                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
923
924                                        if self.jobAttrs[ jobid ]['status'] == 'F':
925                                                del self.jobAttrs[ jobid ]
926
927                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
928
929                        self.jobs_processed     = [ ]
930                        self.jobs_to_store      = [ ]
931
932        def setJobAttrs( self, old, new ):
933                """
934                Set new job attributes in old, but not lose existing fields
935                if old attributes doesn't have those
936                """
937
938                for valname, value in new.items():
939                        old[ valname ] = value
940
941                return old
942               
943
944        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
945                """
946                Check if jobinfo has changed from jobattrs[jobid]
947                if it's report time is bigger than previous one
948                and it is report time is recent (equal to heartbeat)
949                """
950
951                ignore_changes = [ 'reported' ]
952
953                if jobattrs.has_key( jobid ):
954
955                        for valname, value in jobinfo.items():
956
957                                if valname not in ignore_changes:
958
959                                        if jobattrs[ jobid ].has_key( valname ):
960
961                                                if value != jobattrs[ jobid ][ valname ]:
962
963                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
964                                                                return True
965
966                                        else:
967                                                return True
968
969                return False
970
971class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
972        """Parse Ganglia's XML"""
973
974        def __init__( self, config, datastore ):
975                """Setup initial variables and gather info on existing rrd archive"""
976
977                self.config     = config
978                self.clusters   = { }
979                self.ds         = datastore
980
981                debug_msg( 1, 'Checking database..' )
982
983                global DEBUG_LEVEL
984
985                if DEBUG_LEVEL <= 2:
986                        self.ds.checkStaleJobs()
987
988                debug_msg( 1, 'Check done.' )
989                debug_msg( 1, 'Checking rrd archive..' )
990                self.gatherClusters()
991                debug_msg( 1, 'Check done.' )
992
993        def gatherClusters( self ):
994                """Find all existing clusters in archive dir"""
995
996                archive_dir     = check_dir(ARCHIVE_PATH)
997
998                hosts           = [ ]
999
1000                if os.path.exists( archive_dir ):
1001
1002                        dirlist = os.listdir( archive_dir )
1003
1004                        for cfgcluster in ARCHIVE_DATASOURCES:
1005
1006                                if cfgcluster not in dirlist:
1007
1008                                        # Autocreate a directory for this cluster
1009                                        # assume it is new
1010                                        #
1011                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
1012
1013                                        os.mkdir( cluster_dir )
1014
1015                                        dirlist.append( cfgcluster )
1016
1017                        for item in dirlist:
1018
1019                                clustername = item
1020
1021                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
1022
1023                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
1024
1025                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
1026
1027        def startElement( self, name, attrs ):
1028                """Memorize appropriate data from xml start tags"""
1029
1030                if name == 'GANGLIA_XML':
1031
1032                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
1033                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
1034
1035                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
1036
1037                elif name == 'GRID':
1038
1039                        self.gridName   = str( attrs.get( 'NAME', "" ) )
1040                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
1041
1042                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
1043
1044                elif name == 'CLUSTER':
1045
1046                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
1047                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
1048
1049                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
1050
1051                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
1052
1053                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
1054
1055                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
1056
1057                        self.hostName           = str( attrs.get( 'NAME', "" ) )
1058                        self.hostIp             = str( attrs.get( 'IP', "" ) )
1059                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
1060
1061                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
1062
1063                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
1064
1065                        type = str( attrs.get( 'TYPE', "" ) )
1066                       
1067                        exclude_metric = False
1068                       
1069                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1070
1071                                orig_name = str( attrs.get( 'NAME', "" ) )
1072
1073                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1074                               
1075                                        exclude_metric = True
1076
1077                                elif re.match( ex_metricstr, orig_name ):
1078
1079                                        exclude_metric = True
1080
1081                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1082
1083                                myMetric                = { }
1084                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
1085                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
1086                                myMetric['time']        = self.hostReported
1087
1088                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
1089
1090                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
1091
1092        def storeMetrics( self ):
1093                """Store metrics of each cluster rrd handler"""
1094
1095                for clustername, rrdh in self.clusters.items():
1096
1097                        ret = rrdh.storeMetrics()
1098
1099                        if ret:
1100                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1101                                return 1
1102
1103                return 0
1104
1105class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1106
1107        def error( self, exception ):
1108                """Recoverable error"""
1109
1110                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
1111
1112        def fatalError( self, exception ):
1113                """Non-recoverable error"""
1114
1115                exception_str = str( exception )
1116
1117                # Ignore 'no element found' errors
1118                if exception_str.find( 'no element found' ) != -1:
1119                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1120                        return 0
1121
1122                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1123                sys.exit( 1 )
1124
1125        def warning( self, exception ):
1126                """Warning"""
1127
1128                debug_msg( 0, 'Warning ' + str( exception ) )
1129
1130class XMLGatherer:
1131        """Setup a connection and file object to Ganglia's XML"""
1132
1133        s               = None
1134        fd              = None
1135        data            = None
1136        slot            = None
1137
1138        # Time since the last update
1139        #
1140        LAST_UPDATE     = 0
1141
1142        # Minimum interval between updates
1143        #
1144        MIN_UPDATE_INT  = 10
1145
1146        # Is a update occuring now
1147        #
1148        update_now      = False
1149
1150        def __init__( self, host, port ):
1151                """Store host and port for connection"""
1152
1153                self.host       = host
1154                self.port       = port
1155                self.slot       = threading.Lock()
1156
1157                self.retrieveData()
1158
1159        def retrieveData( self ):
1160                """Setup connection to XML source"""
1161
1162                self.update_now = True
1163
1164                self.slot.acquire()
1165
1166                self.data       = None
1167
1168                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
1169
1170                        af, socktype, proto, canonname, sa = res
1171
1172                        try:
1173
1174                                self.s = socket.socket( af, socktype, proto )
1175
1176                        except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1177
1178                                self.s = None
1179                                continue
1180
1181                        try:
1182
1183                                self.s.connect( sa )
1184
1185                        except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
1186
1187                                self.disconnect()
1188                                continue
1189
1190                        break
1191
1192                if self.s is None:
1193
1194                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1195                        self.update_now = False
1196                        #sys.exit( 1 )
1197
1198                else:
1199                        #self.s.send( '\n' )
1200
1201                        my_fp                   = self.s.makefile( 'r' )
1202                        my_data                 = my_fp.readlines()
1203                        my_data                 = string.join( my_data, '' )
1204
1205                        self.data               = my_data
1206
1207                        self.LAST_UPDATE        = time.time()
1208
1209                self.slot.release()
1210
1211                self.update_now = False
1212
1213        def disconnect( self ):
1214                """Close socket"""
1215
1216                if self.s:
1217                        #self.s.shutdown( 2 )
1218                        self.s.close()
1219                        self.s = None
1220
1221        def __del__( self ):
1222                """Kill the socket before we leave"""
1223
1224                self.disconnect()
1225
1226        def reGetData( self ):
1227                """Reconnect"""
1228
1229                while self.update_now:
1230
1231                        # Must be another update in progress:
1232                        # Wait until the update is complete
1233                        #
1234                        time.sleep( 1 )
1235
1236                if self.s:
1237                        self.disconnect()
1238
1239                self.retrieveData()
1240
1241        def getData( self ):
1242
1243                """Return the XML data"""
1244
1245                # If more than MIN_UPDATE_INT seconds passed since last data update
1246                # update the XML first before returning it
1247                #
1248
1249                cur_time        = time.time()
1250
1251                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1252
1253                        self.reGetData()
1254
1255                while self.update_now:
1256
1257                        # Must be another update in progress:
1258                        # Wait until the update is complete
1259                        #
1260                        time.sleep( 1 )
1261                       
1262                return self.data
1263
1264        def makeFileDescriptor( self ):
1265                """Make file descriptor that points to our socket connection"""
1266
1267                self.reconnect()
1268
1269                if self.s:
1270                        self.fd = self.s.makefile( 'r' )
1271
1272        def getFileObject( self ):
1273                """Connect, and return a file object"""
1274
1275                self.makeFileDescriptor()
1276
1277                if self.fd:
1278                        return self.fd
1279
1280class GangliaXMLProcessor( XMLProcessor ):
1281        """Main class for processing XML and acting with it"""
1282
1283        def __init__( self, XMLSource, DataStore ):
1284                """Setup initial XML connection and handlers"""
1285
1286                self.config             = GangliaConfigParser( GMETAD_CONF )
1287
1288                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1289                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1290                self.myXMLSource        = XMLSource
1291                self.ds                 = DataStore
1292                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
1293                self.myXMLError         = XMLErrorHandler()
1294
1295        def run( self ):
1296                """Main XML processing; start a xml and storethread"""
1297
1298                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1299                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1300
1301                while( 1 ):
1302
1303                        if not xml_thread.isAlive():
1304                                # Gather XML at the same interval as gmetad
1305
1306                                # threaded call to: self.processXML()
1307                                #
1308                                try:
1309                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1310                                        xml_thread.start()
1311                                except thread.error, msg:
1312                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1313                                        #return 1
1314
1315                        if not store_thread.isAlive():
1316                                # Store metrics every .. sec
1317
1318                                # threaded call to: self.storeMetrics()
1319                                #
1320                                try:
1321                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1322                                        store_thread.start()
1323                                except thread.error, msg:
1324                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1325                                        #return 1
1326               
1327                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1328                        time.sleep( 1 ) 
1329
1330        def storeMetrics( self ):
1331                """Store metrics retained in memory to disk"""
1332
1333                global DEBUG_LEVEL
1334
1335                # Store metrics somewhere between every 360 and 640 seconds
1336                #
1337                if DEBUG_LEVEL > 2:
1338                        #STORE_INTERVAL = 60
1339                        STORE_INTERVAL = random.randint( 360, 640 )
1340                else:
1341                        STORE_INTERVAL = random.randint( 360, 640 )
1342
1343                try:
1344                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1345                        store_metric_thread.start()
1346                except thread.error, msg:
1347                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1348                        return 1
1349
1350                debug_msg( 1, 'ganglia_store_thread(): started.' )
1351
1352                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1353                time.sleep( STORE_INTERVAL )
1354                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
1355
1356                if store_metric_thread.isAlive():
1357
1358                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
1359                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1360                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
1361
1362                debug_msg( 1, 'ganglia_store_thread(): finished.' )
1363
1364                return 0
1365
1366        def storeThread( self ):
1367                """Actual metric storing thread"""
1368
1369                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1370                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
1371                ret = self.myXMLHandler.storeMetrics()
1372                if ret > 0:
1373                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
1374                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1375                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1376               
1377                return 0
1378
1379        def processXML( self ):
1380                """Process XML"""
1381
1382                try:
1383                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1384                        parsethread.start()
1385                except thread.error, msg:
1386                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1387                        return 1
1388
1389                debug_msg( 1, 'ganglia_xml_thread(): started.' )
1390
1391                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1392                time.sleep( float( self.config.getLowestInterval() ) ) 
1393                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
1394
1395                if parsethread.isAlive():
1396
1397                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
1398                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1399                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
1400
1401                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
1402
1403                return 0
1404
1405        def parseThread( self ):
1406                """Actual parsing thread"""
1407
1408                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1409                debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
1410               
1411                my_data = self.myXMLSource.getData()
1412
1413                debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
1414
1415                if my_data:
1416                        debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1417                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1418                        debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1419
1420                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
1421
1422                return 0
1423
1424class GangliaConfigParser:
1425
1426        sources = [ ]
1427
1428        def __init__( self, config ):
1429                """Parse some stuff from our gmetad's config, such as polling interval"""
1430
1431                self.config = config
1432                self.parseValues()
1433
1434        def parseValues( self ):
1435                """Parse certain values from gmetad.conf"""
1436
1437                readcfg = open( self.config, 'r' )
1438
1439                for line in readcfg.readlines():
1440
1441                        if line.count( '"' ) > 1:
1442
1443                                if line.find( 'data_source' ) != -1 and line[0] != '#':
1444
1445                                        source          = { }
1446                                        source['name']  = line.split( '"' )[1]
1447                                        source_words    = line.split( '"' )[2].split( ' ' )
1448
1449                                        for word in source_words:
1450
1451                                                valid_interval = 1
1452
1453                                                for letter in word:
1454
1455                                                        if letter not in string.digits:
1456
1457                                                                valid_interval = 0
1458
1459                                                if valid_interval and len(word) > 0:
1460
1461                                                        source['interval'] = word
1462                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
1463       
1464                                        # No interval found, use Ganglia's default     
1465                                        if not source.has_key( 'interval' ):
1466                                                source['interval'] = 15
1467                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1468
1469                                        self.sources.append( source )
1470
1471        def getInterval( self, source_name ):
1472                """Return interval for source_name"""
1473
1474                for source in self.sources:
1475
1476                        if source['name'] == source_name:
1477
1478                                return source['interval']
1479
1480                return None
1481
1482        def getLowestInterval( self ):
1483                """Return the lowest interval of all clusters"""
1484
1485                lowest_interval = 0
1486
1487                for source in self.sources:
1488
1489                        if not lowest_interval or source['interval'] <= lowest_interval:
1490
1491                                lowest_interval = source['interval']
1492
1493                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1494                if lowest_interval:
1495                        return lowest_interval
1496                else:
1497                        return 15
1498
1499class RRDHandler:
1500        """Class for handling RRD activity"""
1501
1502        myMetrics = { }
1503        lastStored = { }
1504        timeserials = { }
1505        slot = None
1506
1507        def __init__( self, config, cluster ):
1508                """Setup initial variables"""
1509
1510                global MODRRDTOOL
1511
1512                self.block      = 0
1513                self.cluster    = cluster
1514                self.config     = config
1515                self.slot       = threading.Lock()
1516
1517                if MODRRDTOOL:
1518
1519                        self.rrdm       = RRDMutator()
1520                else:
1521                        self.rrdm       = RRDMutator( RRDTOOL )
1522
1523                global DEBUG_LEVEL
1524
1525                if DEBUG_LEVEL <= 2:
1526                        self.gatherLastUpdates()
1527
1528        def gatherLastUpdates( self ):
1529                """Populate the lastStored list, containing timestamps of all last updates"""
1530
1531                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1532
1533                hosts = [ ]
1534
1535                if os.path.exists( cluster_dir ):
1536
1537                        dirlist = os.listdir( cluster_dir )
1538
1539                        for dir in dirlist:
1540
1541                                hosts.append( dir )
1542
1543                for host in hosts:
1544
1545                        host_dir        = cluster_dir + '/' + host
1546                        dirlist         = os.listdir( host_dir )
1547
1548                        for dir in dirlist:
1549
1550                                if not self.timeserials.has_key( host ):
1551
1552                                        self.timeserials[ host ] = [ ]
1553
1554                                self.timeserials[ host ].append( dir )
1555
1556                        last_serial = self.getLastRrdTimeSerial( host )
1557
1558                        if last_serial:
1559
1560                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1561
1562                                if os.path.exists( metric_dir ):
1563
1564                                        dirlist = os.listdir( metric_dir )
1565
1566                                        for file in dirlist:
1567
1568                                                metricname = file.split( '.rrd' )[0]
1569
1570                                                if not self.lastStored.has_key( host ):
1571
1572                                                        self.lastStored[ host ] = { }
1573
1574                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1575
1576        def getClusterName( self ):
1577                """Return clustername"""
1578
1579                return self.cluster
1580
1581        def memMetric( self, host, metric ):
1582                """Store metric from host in memory"""
1583
1584                # <ATOMIC>
1585                #
1586                self.slot.acquire()
1587               
1588                if self.myMetrics.has_key( host ):
1589
1590                        if self.myMetrics[ host ].has_key( metric['name'] ):
1591
1592                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1593
1594                                        if mymetric['time'] == metric['time']:
1595
1596                                                # Allready have this metric, abort
1597                                                self.slot.release()
1598                                                return 1
1599                        else:
1600                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1601                else:
1602                        self.myMetrics[ host ]                          = { }
1603                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
1604
1605                # Push new metric onto stack
1606                # atomic code; only 1 thread at a time may access the stack
1607
1608                self.myMetrics[ host ][ metric['name'] ].append( metric )
1609
1610                self.slot.release()
1611                #
1612                # </ATOMIC>
1613
1614        def makeUpdateList( self, host, metriclist ):
1615                """
1616                Make a list of update values for rrdupdate
1617                but only those that we didn't store before
1618                """
1619
1620                update_list     = [ ]
1621                metric          = None
1622
1623                while len( metriclist ) > 0:
1624
1625                        metric = metriclist.pop( 0 )
1626
1627                        if self.checkStoreMetric( host, metric ):
1628
1629                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1630                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1631                                update_list.append( u_val )
1632
1633                return update_list
1634
1635        def checkStoreMetric( self, host, metric ):
1636                """Check if supplied metric if newer than last one stored"""
1637
1638                if self.lastStored.has_key( host ):
1639
1640                        if self.lastStored[ host ].has_key( metric['name'] ):
1641
1642                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1643
1644                                        # This is old
1645                                        return 0
1646
1647                return 1
1648
1649        def memLastUpdate( self, host, metricname, metriclist ):
1650                """
1651                Memorize the time of the latest metric from metriclist
1652                but only if it wasn't allready memorized
1653                """
1654
1655                if not self.lastStored.has_key( host ):
1656                        self.lastStored[ host ] = { }
1657
1658                last_update_time = 0
1659
1660                for metric in metriclist:
1661
1662                        if metric['name'] == metricname:
1663
1664                                if metric['time'] > last_update_time:
1665
1666                                        last_update_time = metric['time']
1667
1668                if self.lastStored[ host ].has_key( metricname ):
1669                       
1670                        if last_update_time <= self.lastStored[ host ][ metricname ]:
1671                                return 1
1672
1673                self.lastStored[ host ][ metricname ] = last_update_time
1674
1675        def storeMetrics( self ):
1676                """
1677                Store all metrics from memory to disk
1678                and do it to the RRD's in appropriate timeperiod directory
1679                """
1680
1681                debug_msg( 5, "Entering storeMetrics()")
1682
1683                count_values    = 0
1684                count_metrics   = 0
1685                count_bits      = 0
1686
1687                for hostname, mymetrics in self.myMetrics.items():     
1688
1689                        for metricname, mymetric in mymetrics.items():
1690
1691                                count_metrics += 1
1692
1693                                for dmetric in mymetric:
1694
1695                                        count_values += 1
1696
1697                                        count_bits      += len( dmetric['time'] )
1698                                        count_bits      += len( dmetric['val'] )
1699
1700                count_bytes     = count_bits / 8
1701
1702                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1703                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1704                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1705                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1706
1707                for hostname, mymetrics in self.myMetrics.items():     
1708
1709                        for metricname, mymetric in mymetrics.items():
1710
1711                                metrics_to_store = [ ]
1712
1713                                # Pop metrics from stack for storing until none is left
1714                                # atomic code: only 1 thread at a time may access myMetrics
1715
1716                                # <ATOMIC>
1717                                #
1718                                self.slot.acquire() 
1719
1720                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1721
1722                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1723
1724                                                try:
1725                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1726                                                except IndexError, msg:
1727
1728                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1729                                                        # is still len 0 when the statement is executed.
1730                                                        # Just ignore indexerror's..
1731                                                        pass
1732
1733                                self.slot.release()
1734                                #
1735                                # </ATOMIC>
1736
1737                                # Create a mapping table, each metric to the period where it should be stored
1738                                #
1739                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1740
1741                                update_rets = [ ]
1742
1743                                for period, pmetric in metric_serial_table.items():
1744
1745                                        create_ret = self.createCheck( hostname, metricname, period )   
1746
1747                                        update_ret = self.update( hostname, metricname, period, pmetric )
1748
1749                                        if update_ret == 0:
1750
1751                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1752                                        else:
1753                                                debug_msg( 9, 'metric update failed' )
1754
1755                                        update_rets.append( create_ret )
1756                                        update_rets.append( update_ret )
1757
1758                                # Lets ignore errors here for now, we need to make sure last update time
1759                                # is correct!
1760                                #
1761                                #if not (1) in update_rets:
1762
1763                                self.memLastUpdate( hostname, metricname, metrics_to_store )
1764
1765                debug_msg( 5, "Leaving storeMetrics()")
1766
1767        def makeTimeSerial( self ):
1768                """Generate a time serial. Seconds since epoch"""
1769
1770                # Seconds since epoch
1771                mytime = int( time.time() )
1772
1773                return mytime
1774
1775        def makeRrdPath( self, host, metricname, timeserial ):
1776                """Make a RRD location/path and filename"""
1777
1778                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1779                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
1780
1781                return rrd_dir, rrd_file
1782
1783        def getLastRrdTimeSerial( self, host ):
1784                """Find the last timeserial (directory) for this host"""
1785
1786                newest_timeserial = 0
1787
1788                for dir in self.timeserials[ host ]:
1789
1790                        valid_dir = 1
1791
1792                        for letter in dir:
1793                                if letter not in string.digits:
1794                                        valid_dir = 0
1795
1796                        if valid_dir:
1797                                timeserial = dir
1798                                if timeserial > newest_timeserial:
1799                                        newest_timeserial = timeserial
1800
1801                if newest_timeserial:
1802                        return newest_timeserial
1803                else:
1804                        return 0
1805
1806        def determinePeriod( self, host, check_serial ):
1807                """Determine to which period (directory) this time(serial) belongs"""
1808
1809                period_serial = 0
1810
1811                if self.timeserials.has_key( host ):
1812
1813                        for serial in self.timeserials[ host ]:
1814
1815                                if check_serial >= serial and period_serial < serial:
1816
1817                                        period_serial = serial
1818
1819                return period_serial
1820
1821        def determineSerials( self, host, metricname, metriclist ):
1822                """
1823                Determine the correct serial and corresponding rrd to store
1824                for a list of metrics
1825                """
1826
1827                metric_serial_table = { }
1828
1829                for metric in metriclist:
1830
1831                        if metric['name'] == metricname:
1832
1833                                period          = self.determinePeriod( host, metric['time'] ) 
1834
1835                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1836
1837                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1838
1839                                        # This one should get it's own new period
1840                                        period = metric['time']
1841
1842                                        if not self.timeserials.has_key( host ):
1843                                                self.timeserials[ host ] = [ ]
1844
1845                                        self.timeserials[ host ].append( period )
1846
1847                                if not metric_serial_table.has_key( period ):
1848
1849                                        metric_serial_table[ period ] = [ ]
1850
1851                                metric_serial_table[ period ].append( metric )
1852
1853                return metric_serial_table
1854
1855        def createCheck( self, host, metricname, timeserial ):
1856                """Check if an rrd allready exists for this metric, create if not"""
1857
1858                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1859               
1860                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1861
1862                if not os.path.exists( rrd_dir ):
1863
1864                        try:
1865                                os.makedirs( rrd_dir )
1866
1867                        except os.OSError, msg:
1868
1869                                if msg.find( 'File exists' ) != -1:
1870
1871                                        # Ignore exists errors
1872                                        pass
1873
1874                                else:
1875
1876                                        print msg
1877                                        return
1878
1879                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
1880
1881                if not os.path.exists( rrd_file ):
1882
1883                        interval        = self.config.getInterval( self.cluster )
1884                        heartbeat       = 8 * int( interval )
1885
1886                        params          = [ ]
1887
1888                        params.append( '--step' )
1889                        params.append( str( interval ) )
1890
1891                        params.append( '--start' )
1892                        params.append( str( int( timeserial ) - 1 ) )
1893
1894                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1895                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
1896
1897                        self.rrdm.create( str(rrd_file), params )
1898
1899                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1900
1901        def update( self, host, metricname, timeserial, metriclist ):
1902                """
1903                Update rrd file for host with metricname
1904                in directory timeserial with metriclist
1905                """
1906
1907                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1908
1909                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
1910
1911                update_list             = self.makeUpdateList( host, metriclist )
1912
1913                if len( update_list ) > 0:
1914                        ret = self.rrdm.update( str(rrd_file), update_list )
1915
1916                        if ret:
1917                                return 1
1918               
1919                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
1920
1921                return 0
1922
1923def daemon():
1924        """daemonized threading"""
1925
1926        # Fork the first child
1927        #
1928        pid = os.fork()
1929
1930        if pid > 0:
1931
1932                sys.exit(0)  # end parent
1933
1934        # creates a session and sets the process group ID
1935        #
1936        os.setsid()
1937
1938        # Fork the second child
1939        #
1940        pid = os.fork()
1941
1942        if pid > 0:
1943
1944                sys.exit(0)  # end parent
1945
1946        write_pidfile()
1947
1948        # Go to the root directory and set the umask
1949        #
1950        os.chdir('/')
1951        os.umask(0)
1952
1953        sys.stdin.close()
1954        sys.stdout.close()
1955        sys.stderr.close()
1956
1957        os.open('/dev/null', os.O_RDWR)
1958        os.dup2(0, 1)
1959        os.dup2(0, 2)
1960
1961        run()
1962
1963def run():
1964        """Threading start"""
1965
1966        config          = GangliaConfigParser( GMETAD_CONF )
1967        s_timeout       = int( config.getLowestInterval() - 1 )
1968
1969        socket.setdefaulttimeout( s_timeout )
1970
1971        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1972        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
1973
1974        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1975        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
1976
1977        try:
1978                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1979                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
1980
1981                torque_xml_thread.start()
1982                ganglia_xml_thread.start()
1983               
1984        except thread.error, msg:
1985                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1986                syslog.closelog()
1987                sys.exit(1)
1988               
1989        debug_msg( 0, 'main threading started.' )
1990
1991def main():
1992        """Program startup"""
1993
1994        global DAEMONIZE, USE_SYSLOG
1995
1996        if not processArgs( sys.argv[1:] ):
1997                sys.exit( 1 )
1998
1999        if( DAEMONIZE and USE_SYSLOG ):
2000                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2001
2002        if DAEMONIZE:
2003                daemon()
2004        else:
2005                run()
2006
2007#
2008# Global functions
2009#
2010
2011def check_dir( directory ):
2012        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
2013
2014        if directory[-1] == '/':
2015                directory = directory[:-1]
2016
2017        return directory
2018
2019def reqtime2epoch( rtime ):
2020
2021        (hours, minutes, seconds )      = rtime.split( ':' )
2022
2023        etime   = int(seconds)
2024        etime   = etime + ( int(minutes) * 60 )
2025        etime   = etime + ( int(hours) * 60 * 60 )
2026
2027        return etime
2028
2029def debug_msg( level, msg ):
2030        """Only print msg if correct levels"""
2031
2032        if (not DAEMONIZE and DEBUG_LEVEL >= level):
2033                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2034       
2035        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2036                syslog.syslog( msg )
2037
2038def printTime( ):
2039        """Print current time in human readable format"""
2040
2041        return time.strftime("%a %d %b %Y %H:%M:%S")
2042
2043def write_pidfile():
2044
2045        # Write pidfile if PIDFILE exists
2046        if PIDFILE:
2047
2048                pid     = os.getpid()
2049
2050                pidfile = open(PIDFILE, 'w')
2051
2052                pidfile.write( str( pid ) )
2053                pidfile.close()
2054
2055# Ooohh, someone started me! Let's go..
2056#
2057if __name__ == '__main__':
2058        main()
Note: See TracBrowser for help on using the repository browser.