source: trunk/jobarchived/jobarchived.py @ 475

Last change on this file since 475 was 473, checked in by bastiaans, 16 years ago

jobarchived/jobarchived.py:

  • version: prepare for 0.3 release
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 48.4 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 473 2008-02-22 10:44:40Z bastiaans $
22#
[3]23
[466]24import getopt, syslog, ConfigParser, sys
[3]25
[473]26VERSION='0.3'
[284]27
[466]28def usage( ver ):
[284]29
[466]30        print 'jobarchived %s' %VERSION
[284]31
[466]32        if ver:
33                return 0
[284]34
[435]35        print
[466]36        print 'Purpose:'
37        print '  The Job Archive Daemon (jobarchived) stores batch job information in a SQL database'
38        print '  and node statistics in a RRD archive'
[435]39        print
[466]40        print 'Usage:   jobarchived [OPTIONS]'
41        print
42        print '  -c, --config=FILE      The configuration file to use (default: /etc/jobarchived.conf)'
43        print '  -p, --pidfile=FILE     Use pid file to store the process id'
44        print '  -h, --help             Print help and exit'
45        print '  -v, --version          Print version and exit'
46        print
[435]47
[214]48def processArgs( args ):
[6]49
[467]50        SHORT_L = 'p:hvc:'
51        LONG_L  = [ 'help', 'config=', 'pidfile=', 'version' ]
[169]52
[466]53        config_filename = '/etc/jobarchived.conf'
[169]54
[435]55        global PIDFILE
56
57        PIDFILE = None
58
[214]59        try:
[169]60
[214]61                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]62
[214]63        except getopt.error, detail:
[60]64
[214]65                print detail
66                sys.exit(1)
[9]67
[214]68        for opt, value in opts:
[60]69
[214]70                if opt in [ '--config', '-c' ]:
[13]71
[214]72                        config_filename = value
[198]73
[435]74                if opt in [ '--pidfile', '-p' ]:
75
76                        PIDFILE         = value
77
78                if opt in [ '--help', '-h' ]:
79
[469]80                        usage( False )
[435]81                        sys.exit( 0 )
82
[466]83                if opt in [ '--version', '-v' ]:
[60]84
[469]85                        usage( True )
[466]86                        sys.exit( 0 )
[22]87
[214]88        try:
89                return loadConfig( config_filename )
[13]90
[214]91        except ConfigParser.NoOptionError, detail:
92
93                print detail
94                sys.exit( 1 )
95
96def loadConfig( filename ):
97
98        def getlist( cfg_string ):
99
100                my_list = [ ]
101
102                for item_txt in cfg_string.split( ',' ):
103
104                        sep_char = None
105
106                        item_txt = item_txt.strip()
107
108                        for s_char in [ "'", '"' ]:
109
110                                if item_txt.find( s_char ) != -1:
111
112                                        if item_txt.count( s_char ) != 2:
113
114                                                print 'Missing quote: %s' %item_txt
115                                                sys.exit( 1 )
116
117                                        else:
118
119                                                sep_char = s_char
120                                                break
121
122                        if sep_char:
123
124                                item_txt = item_txt.split( sep_char )[1]
125
126                        my_list.append( item_txt )
127
128                return my_list
129
130        cfg = ConfigParser.ConfigParser()
131
132        cfg.read( filename )
133
[466]134        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE
135        global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS
136        global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
[214]137
[292]138        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
[214]139
[292]140        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
[214]141
[292]142        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[214]143
[292]144        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[214]145
[292]146        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[214]147
[375]148        MODRRDTOOL              = False
149
[214]150        try:
[469]151                global rrdtool
[375]152                import rrdtool
[214]153
[375]154                MODRRDTOOL              = True
155
156        except ImportError:
157
158                MODRRDTOOL              = False
159
[466]160                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!" )
[375]161
[455]162                RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
163
[375]164        try:
165
[292]166                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[214]167
168        except AttributeError, detail:
169
170                print 'Unknown syslog facility'
171                sys.exit( 1 )
172
[292]173        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
[214]174
[292]175        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
[214]176
[292]177        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
[214]178
[292]179        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
[214]180
[292]181        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
[214]182
[295]183        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
184
[292]185        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[214]186
[224]187
[214]188        return True
189
[17]190# What XML data types not to store
[13]191#
[17]192UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]193
[47]194# Maximum time (in seconds) a parsethread may run
195#
196PARSE_TIMEOUT = 60
197
198# Maximum time (in seconds) a storethread may run
199#
200STORE_TIMEOUT = 360
201
[8]202"""
[224]203The Job Archiving Daemon
[8]204"""
205
[214]206from types import *
207
208import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
[379]209from pyPgSQL import PgSQL
[214]210
[379]211# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
212# and added some more functions for postgres.
213#       
214#
215# Changed by: Bas van der Vlies <basv@sara.nl>
216#       
217# SARA API for Postgres Database
218#
219# Changed by: Ramon Bastiaans for Job Monarch
220#
221
222class InitVars:
223        Vars = {}
224       
225        def __init__(self, **key_arg):
226                for (key, value) in key_arg.items():
227                        if value:
228                                self.Vars[key] = value
229                        else:   
230                                self.Vars[key] = None
231                               
232        def __call__(self, *key):
233                key = "%s" % key
234                return self.Vars[key]
235               
236        def __getitem__(self, key):
237                return self.Vars[key]
238               
239        def __repr__(self):
240                return repr(self.Vars)
241               
242        def keys(self):
243                barf =  map(None, self.Vars.keys())
244                return barf
245               
246        def values(self):
247                barf =  map(None, self.Vars.values())
248                return barf
249               
250        def has_key(self, key):
251                if self.Vars.has_key(key):
252                        return 1
253                else:   
254                        return 0
255                       
256class DBError(Exception):
257        def __init__(self, msg=''):
258                self.msg = msg
259                Exception.__init__(self, msg)
260        def __repr__(self):
261                return self.msg
262        __str__ = __repr__
263
264#
265# Class to connect to a database
266# and return the queury in a list or dictionairy.
267#
268class DB:
269        def __init__(self, db_vars):
270
271                self.dict = db_vars
272
273                if self.dict.has_key('User'):
274                        self.user = self.dict['User']
275                else:
276                        self.user = 'postgres'
277
278                if self.dict.has_key('Host'):
279                        self.host = self.dict['Host']
280                else:
281                        self.host = 'localhost'
282
283                if self.dict.has_key('Password'):
284                        self.passwd = self.dict['Password']
285                else:
286                        self.passwd = ''
287
288                if self.dict.has_key('DataBaseName'):
289                        self.db = self.dict['DataBaseName']
290                else:
291                        self.db = 'uva_cluster_db'
292
293                # connect_string = 'host:port:database:user:password:
294                dsn = "%s::%s:%s:%s" %(self.host, self.db, self.user, self.passwd)
295
296                try:
297                        self.SQL = PgSQL.connect(dsn)
298                except PgSQL.Error, details:
299                        str = "%s" %details
300                        raise DBError(str)
301
302        def __repr__(self):
303                return repr(self.result)
304
305        def __nonzero__(self):
306                return not(self.result == None)
307
308        def __len__(self):
309                return len(self.result)
310
311        def __getitem__(self,i):
312                return self.result[i]
313
314        def __getslice__(self,i,j):
315                return self.result[i:j]
316
317        def Get(self, q_str):
318                c = self.SQL.cursor()
319                try:
320                        c.execute(q_str)
321                        result = c.fetchall()
322                except PgSQL.Error, details:
323                        c.close()
324                        str = "%s" %details
325                        raise DBError(str)
326
327                c.close()
328                return result
329
330        def Set(self, q_str):
331                c = self.SQL.cursor()
332                try:
333                        c.execute(q_str)
334                        result = c.oidValue
335
336                except PgSQL.Error, details:
337                        c.close()
338                        str = "%s" %details
339                        raise DBError(str)
340
341                c.close()
342                return result
343
344        def Commit(self):
345                self.SQL.commit()
346
[84]347class DataSQLStore:
348
349        db_vars = None
350        dbc = None
351
352        def __init__( self, hostname, database ):
353
[379]354                self.db_vars = InitVars(DataBaseName=database,
[84]355                                User='root',
356                                Host=hostname,
357                                Password='',
358                                Dictionary='true')
359
360                try:
[379]361                        self.dbc     = DB(self.db_vars)
362                except DBError, details:
[169]363                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
[84]364                        sys.exit(1)
365
366        def setDatabase(self, statement):
367                ret = self.doDatabase('set', statement)
368                return ret
369               
370        def getDatabase(self, statement):
371                ret = self.doDatabase('get', statement)
372                return ret
373
374        def doDatabase(self, type, statement):
375
[365]376                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
[84]377                try:
378                        if type == 'set':
379                                result = self.dbc.Set( statement )
380                                self.dbc.Commit()
381                        elif type == 'get':
382                                result = self.dbc.Get( statement )
383                               
[379]384                except DBError, detail:
[84]385                        operation = statement.split(' ')[0]
[169]386                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
[84]387                        sys.exit(1)
388
[365]389                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
[84]390                return result
391
[191]392        def getJobNodeId( self, job_id, node_id ):
393
394                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
395                if len( id ) > 0:
396
397                        if len( id[0] ) > 0 and id[0] != '':
398                       
399                                return 1
400
401                return 0
402
[84]403        def getNodeId( self, hostname ):
404
[89]405                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]406
[89]407                if len( id ) > 0:
[84]408
[89]409                        id = id[0][0]
410
[84]411                        return id
412                else:
413                        return None
414
415        def getNodeIds( self, hostnames ):
416
417                ids = [ ]
418
419                for node in hostnames:
420
421                        id = self.getNodeId( node )
422
423                        if id:
424                                ids.append( id )
425
426                return ids
427
428        def getJobId( self, jobid ):
429
430                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
431
432                if id:
[89]433                        id = id[0][0]
[84]434
435                        return id
436                else:
437                        return None
438
439        def addJob( self, job_id, jobattrs ):
440
441                if not self.getJobId( job_id ):
442
443                        self.mutateJob( 'insert', job_id, jobattrs ) 
444                else:
445                        self.mutateJob( 'update', job_id, jobattrs )
446
447        def mutateJob( self, action, job_id, jobattrs ):
448
[292]449                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
[84]450
[292]451                insert_col_str  = 'job_id'
452                insert_val_str  = "'%s'" %job_id
453                update_str      = None
[84]454
[365]455                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
[84]456
[99]457                ids = [ ]
458
[84]459                for valname, value in jobattrs.items():
460
[96]461                        if valname in job_values and value != '':
[84]462
463                                column_name = 'job_' + valname
464
465                                if action == 'insert':
466
467                                        if not insert_col_str:
468                                                insert_col_str = column_name
469                                        else:
470                                                insert_col_str = insert_col_str + ',' + column_name
471
472                                        if not insert_val_str:
473                                                insert_val_str = value
474                                        else:
475                                                insert_val_str = insert_val_str + ",'%s'" %value
476
477                                elif action == 'update':
478                                       
479                                        if not update_str:
480                                                update_str = "%s='%s'" %(column_name, value)
481                                        else:
482                                                update_str = update_str + ",%s='%s'" %(column_name, value)
483
[90]484                        elif valname == 'nodes' and value:
[84]485
[191]486                                node_valid = 1
[190]487
488                                if len(value) == 1:
489                               
[191]490                                        if jobattrs['status'] == 'Q':
[190]491
[191]492                                                node_valid = 0
[190]493
[191]494                                        else:
[190]495
[191]496                                                node_valid = 0
[190]497
[191]498                                                for node_char in str(value[0]):
[190]499
[191]500                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]501
[191]502                                                                node_valid = 1
[84]503
[191]504                                if node_valid:
505
506                                        ids = self.addNodes( value, jobattrs['domain'] )
507
[84]508                if action == 'insert':
509
510                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]511
[84]512                elif action == 'update':
513
[89]514                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
[84]515
[191]516                if len( ids ) > 0:
517                        self.addJobNodes( job_id, ids )
[190]518
[154]519        def addNodes( self, hostnames, domain ):
[84]520
[98]521                ids = [ ]
522
[84]523                for node in hostnames:
524
[292]525                        node    = '%s.%s' %( node, domain )
526                        id      = self.getNodeId( node )
[84]527       
528                        if not id:
529                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
[98]530                                id = self.getNodeId( node )
[84]531
[98]532                        ids.append( id )
533
534                return ids
535
[86]536        def addJobNodes( self, jobid, nodes ):
537
538                for node in nodes:
539
[191]540                        if not self.getJobNodeId( jobid, node ):
541
542                                self.addJobNode( jobid, node )
543
[84]544        def addJobNode( self, jobid, nodeid ):
545
546                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
547
548        def storeJobInfo( self, jobid, jobattrs ):
549
550                self.addJob( jobid, jobattrs )
551
[295]552        def checkStaleJobs( self ):
553
[466]554                # Locate all jobs in the database that are not set to finished
555                #
[295]556                q = "SELECT * from jobs WHERE job_status != 'F'"
557
558                r = self.getDatabase( q )
559
560                if len( r ) == 0:
561
562                        return None
563
564                cleanjobs       = [ ]
565                timeoutjobs     = [ ]
566
567                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
568                cur_time        = time.time()
569
570                for row in r:
571
572                        job_id                  = row[0]
573                        job_requested_time      = row[4]
574                        job_status              = row[7]
575                        job_start_timestamp     = row[8]
576
[466]577                        # If it was set to queued and we didn't see it started
578                        # there's not point in keeping it around
579                        #
[295]580                        if job_status == 'Q' or not job_start_timestamp:
581
582                                cleanjobs.append( job_id )
583
584                        else:
585
586                                start_timestamp = int( job_start_timestamp )
587
[466]588                                # If it was set to running longer than JOB_TIMEOUT
589                                # close the job: it probably finished while we were not running
590                                #
[295]591                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
592
593                                        if job_requested_time:
594
595                                                rtime_epoch     = reqtime2epoch( job_requested_time )
596                                        else:
597                                                rtime_epoch     = None
598                                       
599                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
600
601                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
602
[466]603                # Purge these from database
604                #
[295]605                for j in cleanjobs:
606
607                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
608                        self.setDatabase( q )
609
610                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
611
[466]612                # Close these jobs in the database
613                # update the stop_timestamp to: start_timestamp + requested wallclock
614                # and set state: finished
615                #
[295]616                for j in timeoutjobs:
617
618                        ( i, s, r )             = j
619
620                        if r:
621                                new_end_timestamp       = int( s ) + r
622
[468]623                        q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
[295]624                        self.setDatabase( q )
625
[37]626class RRDMutator:
[63]627        """A class for performing RRD mutations"""
[37]628
[277]629        binary = None
[37]630
631        def __init__( self, binary=None ):
[63]632                """Set alternate binary if supplied"""
[37]633
634                if binary:
635                        self.binary = binary
636
637        def create( self, filename, args ):
[63]638                """Create a new rrd with args"""
639
[375]640                global MODRRDTOOL
[37]641
[375]642                if MODRRDTOOL:
643                        return self.perform( 'create', filename, args )
644                else:
645                        return self.perform( 'create', '"' + filename + '"', args )
646
[37]647        def update( self, filename, args ):
[63]648                """Update a rrd with args"""
649
[375]650                global MODRRDTOOL
[37]651
[375]652                if MODRRDTOOL:
653                        return self.perform( 'update', filename, args )
654                else:
655                        return self.perform( 'update', '"' + filename + '"', args )
656
[42]657        def grabLastUpdate( self, filename ):
[63]658                """Determine the last update time of filename rrd"""
[42]659
[375]660                global MODRRDTOOL
661
[42]662                last_update = 0
663
[466]664                # Use the py-rrdtool module if it's available on this system
665                #
[375]666                if MODRRDTOOL:
[53]667
[375]668                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
[42]669
[375]670                        rrd_header      = { }
[292]671
[375]672                        try:
673                                rrd_header      = rrdtool.info( filename )
674                        except rrdtool.error, msg:
675                                debug_msg( 8, str( msg ) )
676
677                        if rrd_header.has_key( 'last_update' ):
678                                return last_update
679                        else:
680                                return 0
681
[466]682                # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable
683                # DEPRECATED (slow!)
684                #
[42]685                else:
[375]686                        debug_msg( 8, self.binary + ' info ' + filename )
[42]687
[375]688                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
689
690                        for line in my_pipe.readlines():
691
692                                if line.find( 'last_update') != -1:
693
694                                        last_update = line.split( ' = ' )[1]
695
696                        if my_pipe:
697
698                                my_pipe.close()
699
700                        if last_update:
701                                return last_update
702                        else:
703                                return 0
704
705
[40]706        def perform( self, action, filename, args ):
[63]707                """Perform action on rrd filename with args"""
[37]708
[375]709                global MODRRDTOOL
710
[37]711                arg_string = None
712
[40]713                if type( args ) is not ListType:
714                        debug_msg( 8, 'Arguments needs to be of type List' )
715                        return 1
716
[37]717                for arg in args:
718
719                        if not arg_string:
720
721                                arg_string = arg
722                        else:
723                                arg_string = arg_string + ' ' + arg
724
[375]725                if MODRRDTOOL:
[37]726
[375]727                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
[292]728
[375]729                        try:
730                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
[37]731
[375]732                                if action == 'create':
[146]733
[375]734                                        rrdtool.create( str( filename ), *args )
[37]735
[375]736                                elif action == 'update':
[37]737
[375]738                                        rrdtool.update( str( filename ), *args )
[365]739
[375]740                        except rrdtool.error, msg:
[365]741
[375]742                                error_msg = str( msg )
743                                debug_msg( 8, error_msg )
744                                return 1
745
746                else:
747
748                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
749
750                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
751                        lines   = cmd.readlines()
752
753                        cmd.close()
754
755                        for line in lines:
756
757                                if line.find( 'ERROR' ) != -1:
758
759                                        error_msg = string.join( line.split( ' ' )[1:] )
760                                        debug_msg( 8, error_msg )
761                                        return 1
762
[37]763                return 0
764
[78]765class XMLProcessor:
766        """Skeleton class for XML processor's"""
767
768        def run( self ):
769                """Do main processing of XML here"""
770
771                pass
772
773class TorqueXMLProcessor( XMLProcessor ):
774        """Main class for processing XML and acting with it"""
775
[295]776        def __init__( self, XMLSource, DataStore ):
[78]777                """Setup initial XML connection and handlers"""
778
[287]779                self.myXMLSource        = XMLSource
[295]780                self.myXMLHandler       = TorqueXMLHandler( DataStore )
[287]781                self.myXMLError         = XMLErrorHandler()
[78]782
[287]783                self.config             = GangliaConfigParser( GMETAD_CONF )
784
[78]785        def run( self ):
786                """Main XML processing"""
787
[169]788                debug_msg( 1, 'torque_xml_thread(): started.' )
[87]789
[78]790                while( 1 ):
791
[287]792                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
[469]793                        debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' )
[176]794
[287]795                        my_data = self.myXMLSource.getData()
796
[469]797                        debug_msg( 1, 'torque_xml_thread(): Done retrieving.' )
798
799                        if my_data:
800                                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
801
[287]802                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[469]803
804                                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
[176]805                               
[169]806                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[87]807                        time.sleep( self.config.getLowestInterval() )
[78]808
[71]809class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
[63]810        """Parse Torque's jobinfo XML from our plugin"""
811
[72]812        jobAttrs = { }
813
[295]814        def __init__( self, datastore ):
[84]815
[295]816                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
817                self.ds                 = datastore
[292]818                self.jobs_processed     = [ ]
819                self.jobs_to_store      = [ ]
[84]820
[183]821        def startDocument( self ):
822
[292]823                self.heartbeat  = 0
[365]824                self.elementct  = 0
[183]825
[63]826        def startElement( self, name, attrs ):
827                """
828                This XML will be all gmetric XML
829                so there will be no specific start/end element
830                just one XML statement with all info
831                """
832               
[73]833                jobinfo = { }
[63]834
[365]835                self.elementct  += 1
836
[199]837                if name == 'CLUSTER':
[63]838
[372]839                        self.clustername = str( attrs.get( 'NAME', "" ) )
[199]840
841                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
842
[372]843                        metricname = str( attrs.get( 'NAME', "" ) )
[63]844
[285]845                        if metricname == 'MONARCH-HEARTBEAT':
[372]846                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
[63]847
[285]848                        elif metricname.find( 'MONARCH-JOB' ) != -1:
[63]849
[292]850                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
[372]851                                val     = str( attrs.get( 'VAL', "" ) )
[63]852
[96]853                                if not job_id in self.jobs_processed:
[292]854
[96]855                                        self.jobs_processed.append( job_id )
856
[73]857                                check_change = 0
858
859                                if self.jobAttrs.has_key( job_id ):
[292]860
[73]861                                        check_change = 1
862
[63]863                                valinfo = val.split( ' ' )
864
865                                for myval in valinfo:
866
[84]867                                        if len( myval.split( '=' ) ) > 1:
[63]868
[292]869                                                valname = myval.split( '=' )[0]
870                                                value   = myval.split( '=' )[1]
[70]871
[84]872                                                if valname == 'nodes':
873                                                        value = value.split( ';' )
[72]874
[84]875                                                jobinfo[ valname ] = value
876
[73]877                                if check_change:
[182]878                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
[292]879                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
880                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
[84]881                                                if not job_id in self.jobs_to_store:
882                                                        self.jobs_to_store.append( job_id )
883
[365]884                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]885                                else:
886                                        self.jobAttrs[ job_id ] = jobinfo
[84]887
888                                        if not job_id in self.jobs_to_store:
889                                                self.jobs_to_store.append( job_id )
890
[365]891                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]892                                       
[77]893        def endDocument( self ):
[74]894                """When all metrics have gone, check if any jobs have finished"""
[72]895
[371]896                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
[365]897
[182]898                if self.heartbeat:
899                        for jobid, jobinfo in self.jobAttrs.items():
[74]900
[182]901                                # This is an old job, not in current jobinfo list anymore
902                                # it must have finished, since we _did_ get a new heartbeat
903                                #
904                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
[102]905
[184]906                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
[74]907
[182]908                                        if not jobid in self.jobs_processed:
909                                                self.jobs_processed.append( jobid )
[96]910
[182]911                                        self.jobAttrs[ jobid ]['status'] = 'F'
[360]912                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
[96]913
[182]914                                        if not jobid in self.jobs_to_store:
915                                                self.jobs_to_store.append( jobid )
[74]916
[182]917                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
[87]918
[182]919                        for jobid in self.jobs_to_store:
[295]920                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
[84]921
[184]922                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
923
924                                        if self.jobAttrs[ jobid ]['status'] == 'F':
925                                                del self.jobAttrs[ jobid ]
926
[182]927                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
[87]928
[292]929                        self.jobs_processed     = [ ]
930                        self.jobs_to_store      = [ ]
[84]931
[82]932        def setJobAttrs( self, old, new ):
933                """
934                Set new job attributes in old, but not lose existing fields
935                if old attributes doesn't have those
936                """
937
938                for valname, value in new.items():
939                        old[ valname ] = value
940
941                return old
942               
943
[73]944        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
[74]945                """
946                Check if jobinfo has changed from jobattrs[jobid]
947                if it's report time is bigger than previous one
948                and it is report time is recent (equal to heartbeat)
949                """
[72]950
[87]951                ignore_changes = [ 'reported' ]
952
[73]953                if jobattrs.has_key( jobid ):
954
955                        for valname, value in jobinfo.items():
956
[87]957                                if valname not in ignore_changes:
[73]958
[87]959                                        if jobattrs[ jobid ].has_key( valname ):
[73]960
[87]961                                                if value != jobattrs[ jobid ][ valname ]:
[73]962
[87]963                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
[360]964                                                                return True
[73]965
[87]966                                        else:
[360]967                                                return True
[87]968
[360]969                return False
[73]970
[71]971class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[63]972        """Parse Ganglia's XML"""
[3]973
[295]974        def __init__( self, config, datastore ):
[63]975                """Setup initial variables and gather info on existing rrd archive"""
976
[292]977                self.config     = config
978                self.clusters   = { }
[295]979                self.ds         = datastore
[324]980
[295]981                debug_msg( 1, 'Checking database..' )
[365]982
983                global DEBUG_LEVEL
984
985                if DEBUG_LEVEL <= 2:
986                        self.ds.checkStaleJobs()
987
[295]988                debug_msg( 1, 'Check done.' )
[296]989                debug_msg( 1, 'Checking rrd archive..' )
[293]990                self.gatherClusters()
[169]991                debug_msg( 1, 'Check done.' )
[33]992
[44]993        def gatherClusters( self ):
[63]994                """Find all existing clusters in archive dir"""
[44]995
[292]996                archive_dir     = check_dir(ARCHIVE_PATH)
[44]997
[292]998                hosts           = [ ]
[44]999
1000                if os.path.exists( archive_dir ):
1001
[292]1002                        dirlist = os.listdir( archive_dir )
[44]1003
[369]1004                        for cfgcluster in ARCHIVE_DATASOURCES:
1005
1006                                if cfgcluster not in dirlist:
1007
1008                                        # Autocreate a directory for this cluster
1009                                        # assume it is new
1010                                        #
1011                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
1012
1013                                        os.mkdir( cluster_dir )
1014
[370]1015                                        dirlist.append( cfgcluster )
1016
[44]1017                        for item in dirlist:
1018
1019                                clustername = item
1020
[60]1021                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]1022
1023                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
1024
[365]1025                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
1026
[6]1027        def startElement( self, name, attrs ):
[63]1028                """Memorize appropriate data from xml start tags"""
[3]1029
[7]1030                if name == 'GANGLIA_XML':
[32]1031
[372]1032                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
1033                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
[32]1034
[12]1035                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]1036
[7]1037                elif name == 'GRID':
[32]1038
[372]1039                        self.gridName   = str( attrs.get( 'NAME', "" ) )
1040                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
[32]1041
[12]1042                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]1043
[7]1044                elif name == 'CLUSTER':
[32]1045
[372]1046                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
1047                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
[32]1048
[60]1049                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]1050
[34]1051                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]1052
[35]1053                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]1054
[60]1055                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]1056
[372]1057                        self.hostName           = str( attrs.get( 'NAME', "" ) )
1058                        self.hostIp             = str( attrs.get( 'IP', "" ) )
1059                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
[32]1060
[12]1061                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]1062
[60]1063                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]1064
[372]1065                        type = str( attrs.get( 'TYPE', "" ) )
[198]1066                       
1067                        exclude_metric = False
1068                       
1069                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]1070
[372]1071                                orig_name = str( attrs.get( 'NAME', "" ) )
[3]1072
[198]1073                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1074                               
1075                                        exclude_metric = True
1076
1077                                elif re.match( ex_metricstr, orig_name ):
1078
1079                                        exclude_metric = True
1080
1081                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1082
[292]1083                                myMetric                = { }
[372]1084                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
1085                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
[292]1086                                myMetric['time']        = self.hostReported
[3]1087
[34]1088                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]1089
[34]1090                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]1091
[34]1092        def storeMetrics( self ):
[63]1093                """Store metrics of each cluster rrd handler"""
[9]1094
[34]1095                for clustername, rrdh in self.clusters.items():
[16]1096
[38]1097                        ret = rrdh.storeMetrics()
[9]1098
[38]1099                        if ret:
1100                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1101                                return 1
1102
1103                return 0
1104
[71]1105class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1106
1107        def error( self, exception ):
1108                """Recoverable error"""
1109
[169]1110                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]1111
1112        def fatalError( self, exception ):
1113                """Non-recoverable error"""
1114
1115                exception_str = str( exception )
1116
1117                # Ignore 'no element found' errors
1118                if exception_str.find( 'no element found' ) != -1:
[169]1119                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
[71]1120                        return 0
1121
[170]1122                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]1123                sys.exit( 1 )
1124
1125        def warning( self, exception ):
1126                """Warning"""
1127
1128                debug_msg( 0, 'Warning ' + str( exception ) )
1129
[78]1130class XMLGatherer:
[63]1131        """Setup a connection and file object to Ganglia's XML"""
[3]1132
[287]1133        s               = None
1134        fd              = None
1135        data            = None
[293]1136        slot            = None
[8]1137
[287]1138        # Time since the last update
1139        #
1140        LAST_UPDATE     = 0
1141
1142        # Minimum interval between updates
1143        #
1144        MIN_UPDATE_INT  = 10
1145
1146        # Is a update occuring now
1147        #
1148        update_now      = False
1149
[8]1150        def __init__( self, host, port ):
[63]1151                """Store host and port for connection"""
[8]1152
[293]1153                self.host       = host
1154                self.port       = port
1155                self.slot       = threading.Lock()
[3]1156
[287]1157                self.retrieveData()
1158
1159        def retrieveData( self ):
[63]1160                """Setup connection to XML source"""
[8]1161
[287]1162                self.update_now = True
1163
[293]1164                self.slot.acquire()
1165
[469]1166                self.data       = None
1167
[8]1168                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]1169
[5]1170                        af, socktype, proto, canonname, sa = res
[32]1171
[5]1172                        try:
[32]1173
[8]1174                                self.s = socket.socket( af, socktype, proto )
[32]1175
[469]1176                        except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
[32]1177
[8]1178                                self.s = None
[5]1179                                continue
[32]1180
[5]1181                        try:
[32]1182
[8]1183                                self.s.connect( sa )
[32]1184
[469]1185                        except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg:
[32]1186
[70]1187                                self.disconnect()
[5]1188                                continue
[32]1189
[5]1190                        break
[3]1191
[8]1192                if self.s is None:
[32]1193
[170]1194                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
[287]1195                        self.update_now = False
[469]1196                        #sys.exit( 1 )
[5]1197
[287]1198                else:
[324]1199                        #self.s.send( '\n' )
[287]1200
1201                        my_fp                   = self.s.makefile( 'r' )
1202                        my_data                 = my_fp.readlines()
1203                        my_data                 = string.join( my_data, '' )
1204
1205                        self.data               = my_data
1206
1207                        self.LAST_UPDATE        = time.time()
1208
[293]1209                self.slot.release()
1210
[287]1211                self.update_now = False
1212
[33]1213        def disconnect( self ):
[63]1214                """Close socket"""
[33]1215
1216                if self.s:
[287]1217                        #self.s.shutdown( 2 )
[33]1218                        self.s.close()
1219                        self.s = None
1220
1221        def __del__( self ):
[63]1222                """Kill the socket before we leave"""
[33]1223
1224                self.disconnect()
1225
[287]1226        def reGetData( self ):
[70]1227                """Reconnect"""
[33]1228
[287]1229                while self.update_now:
1230
1231                        # Must be another update in progress:
1232                        # Wait until the update is complete
1233                        #
1234                        time.sleep( 1 )
1235
[38]1236                if self.s:
1237                        self.disconnect()
[33]1238
[287]1239                self.retrieveData()
[5]1240
[287]1241        def getData( self ):
1242
1243                """Return the XML data"""
1244
1245                # If more than MIN_UPDATE_INT seconds passed since last data update
1246                # update the XML first before returning it
1247                #
1248
1249                cur_time        = time.time()
1250
1251                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1252
1253                        self.reGetData()
1254
1255                while self.update_now:
1256
1257                        # Must be another update in progress:
1258                        # Wait until the update is complete
1259                        #
1260                        time.sleep( 1 )
1261                       
1262                return self.data
1263
[70]1264        def makeFileDescriptor( self ):
1265                """Make file descriptor that points to our socket connection"""
1266
1267                self.reconnect()
1268
1269                if self.s:
1270                        self.fd = self.s.makefile( 'r' )
1271
1272        def getFileObject( self ):
1273                """Connect, and return a file object"""
1274
[78]1275                self.makeFileDescriptor()
1276
[70]1277                if self.fd:
1278                        return self.fd
1279
[78]1280class GangliaXMLProcessor( XMLProcessor ):
[63]1281        """Main class for processing XML and acting with it"""
[5]1282
[295]1283        def __init__( self, XMLSource, DataStore ):
[63]1284                """Setup initial XML connection and handlers"""
[33]1285
[287]1286                self.config             = GangliaConfigParser( GMETAD_CONF )
[33]1287
[293]1288                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]1289                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1290                self.myXMLSource        = XMLSource
[295]1291                self.ds                 = DataStore
1292                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
[287]1293                self.myXMLError         = XMLErrorHandler()
[73]1294
[9]1295        def run( self ):
[63]1296                """Main XML processing; start a xml and storethread"""
[8]1297
[102]1298                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1299                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]1300
[36]1301                while( 1 ):
1302
[102]1303                        if not xml_thread.isAlive():
[36]1304                                # Gather XML at the same interval as gmetad
1305
1306                                # threaded call to: self.processXML()
1307                                #
[169]1308                                try:
1309                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1310                                        xml_thread.start()
[176]1311                                except thread.error, msg:
[169]1312                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1313                                        #return 1
[36]1314
[102]1315                        if not store_thread.isAlive():
[55]1316                                # Store metrics every .. sec
[36]1317
[55]1318                                # threaded call to: self.storeMetrics()
1319                                #
[169]1320                                try:
1321                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1322                                        store_thread.start()
[176]1323                                except thread.error, msg:
[169]1324                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1325                                        #return 1
[36]1326               
1327                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1328                        time.sleep( 1 ) 
1329
[33]1330        def storeMetrics( self ):
[63]1331                """Store metrics retained in memory to disk"""
[22]1332
[365]1333                global DEBUG_LEVEL
1334
[63]1335                # Store metrics somewhere between every 360 and 640 seconds
[38]1336                #
[365]1337                if DEBUG_LEVEL > 2:
1338                        #STORE_INTERVAL = 60
1339                        STORE_INTERVAL = random.randint( 360, 640 )
1340                else:
1341                        STORE_INTERVAL = random.randint( 360, 640 )
[22]1342
[169]1343                try:
1344                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1345                        store_metric_thread.start()
[176]1346                except thread.error, msg:
[169]1347                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1348                        return 1
[36]1349
[169]1350                debug_msg( 1, 'ganglia_store_thread(): started.' )
1351
1352                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]1353                time.sleep( STORE_INTERVAL )
[169]1354                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]1355
[102]1356                if store_metric_thread.isAlive():
[36]1357
[169]1358                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[136]1359                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[169]1360                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
[36]1361
[169]1362                debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]1363
1364                return 0
1365
[39]1366        def storeThread( self ):
[63]1367                """Actual metric storing thread"""
[39]1368
[169]1369                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1370                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[78]1371                ret = self.myXMLHandler.storeMetrics()
[176]1372                if ret > 0:
1373                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[169]1374                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1375                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
[39]1376               
[176]1377                return 0
[39]1378
[8]1379        def processXML( self ):
[63]1380                """Process XML"""
[8]1381
[169]1382                try:
1383                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1384                        parsethread.start()
[176]1385                except thread.error, msg:
[169]1386                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1387                        return 1
[8]1388
[169]1389                debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]1390
[169]1391                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]1392                time.sleep( float( self.config.getLowestInterval() ) ) 
[169]1393                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]1394
1395                if parsethread.isAlive():
1396
[169]1397                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[47]1398                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[169]1399                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
[36]1400
[169]1401                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]1402
1403                return 0
1404
[39]1405        def parseThread( self ):
[63]1406                """Actual parsing thread"""
[39]1407
[169]1408                debug_msg( 1, 'ganglia_parse_thread(): started.' )
[469]1409                debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' )
[287]1410               
1411                my_data = self.myXMLSource.getData()
[176]1412
[469]1413                debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' )
[293]1414
[469]1415                if my_data:
1416                        debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
[287]1417                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[469]1418                        debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
[176]1419
[169]1420                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]1421
[176]1422                return 0
[39]1423
[9]1424class GangliaConfigParser:
1425
[34]1426        sources = [ ]
[9]1427
1428        def __init__( self, config ):
[63]1429                """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1430
[9]1431                self.config = config
1432                self.parseValues()
1433
[32]1434        def parseValues( self ):
[63]1435                """Parse certain values from gmetad.conf"""
[9]1436
1437                readcfg = open( self.config, 'r' )
1438
1439                for line in readcfg.readlines():
1440
1441                        if line.count( '"' ) > 1:
1442
[10]1443                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1444
[292]1445                                        source          = { }
1446                                        source['name']  = line.split( '"' )[1]
1447                                        source_words    = line.split( '"' )[2].split( ' ' )
[9]1448
1449                                        for word in source_words:
1450
1451                                                valid_interval = 1
1452
1453                                                for letter in word:
[32]1454
[9]1455                                                        if letter not in string.digits:
[32]1456
[9]1457                                                                valid_interval = 0
1458
[10]1459                                                if valid_interval and len(word) > 0:
[32]1460
[9]1461                                                        source['interval'] = word
[12]1462                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]1463       
1464                                        # No interval found, use Ganglia's default     
1465                                        if not source.has_key( 'interval' ):
1466                                                source['interval'] = 15
1467                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1468
[33]1469                                        self.sources.append( source )
[9]1470
1471        def getInterval( self, source_name ):
[63]1472                """Return interval for source_name"""
[32]1473
[9]1474                for source in self.sources:
[32]1475
[12]1476                        if source['name'] == source_name:
[32]1477
[9]1478                                return source['interval']
[32]1479
[9]1480                return None
1481
[34]1482        def getLowestInterval( self ):
[63]1483                """Return the lowest interval of all clusters"""
[34]1484
1485                lowest_interval = 0
1486
1487                for source in self.sources:
1488
1489                        if not lowest_interval or source['interval'] <= lowest_interval:
1490
1491                                lowest_interval = source['interval']
1492
1493                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1494                if lowest_interval:
1495                        return lowest_interval
1496                else:
1497                        return 15
1498
[9]1499class RRDHandler:
[63]1500        """Class for handling RRD activity"""
[9]1501
[32]1502        myMetrics = { }
[40]1503        lastStored = { }
[47]1504        timeserials = { }
[36]1505        slot = None
[32]1506
[33]1507        def __init__( self, config, cluster ):
[63]1508                """Setup initial variables"""
[78]1509
[455]1510                global MODRRDTOOL
1511
[292]1512                self.block      = 0
1513                self.cluster    = cluster
1514                self.config     = config
1515                self.slot       = threading.Lock()
1516
[455]1517                if MODRRDTOOL:
1518
1519                        self.rrdm       = RRDMutator()
1520                else:
1521                        self.rrdm       = RRDMutator( RRDTOOL )
1522
[365]1523                global DEBUG_LEVEL
[9]1524
[365]1525                if DEBUG_LEVEL <= 2:
1526                        self.gatherLastUpdates()
1527
[42]1528        def gatherLastUpdates( self ):
[63]1529                """Populate the lastStored list, containing timestamps of all last updates"""
[42]1530
1531                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1532
1533                hosts = [ ]
1534
1535                if os.path.exists( cluster_dir ):
1536
[44]1537                        dirlist = os.listdir( cluster_dir )
[42]1538
[44]1539                        for dir in dirlist:
[42]1540
[44]1541                                hosts.append( dir )
[42]1542
1543                for host in hosts:
1544
[292]1545                        host_dir        = cluster_dir + '/' + host
1546                        dirlist         = os.listdir( host_dir )
[47]1547
1548                        for dir in dirlist:
1549
1550                                if not self.timeserials.has_key( host ):
1551
1552                                        self.timeserials[ host ] = [ ]
1553
1554                                self.timeserials[ host ].append( dir )
1555
[42]1556                        last_serial = self.getLastRrdTimeSerial( host )
[292]1557
[42]1558                        if last_serial:
1559
1560                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
[292]1561
[42]1562                                if os.path.exists( metric_dir ):
1563
[44]1564                                        dirlist = os.listdir( metric_dir )
[42]1565
[44]1566                                        for file in dirlist:
[42]1567
[44]1568                                                metricname = file.split( '.rrd' )[0]
[42]1569
[44]1570                                                if not self.lastStored.has_key( host ):
[42]1571
[44]1572                                                        self.lastStored[ host ] = { }
[42]1573
[44]1574                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1575
[32]1576        def getClusterName( self ):
[63]1577                """Return clustername"""
1578
[32]1579                return self.cluster
1580
1581        def memMetric( self, host, metric ):
[63]1582                """Store metric from host in memory"""
[32]1583
[179]1584                # <ATOMIC>
1585                #
1586                self.slot.acquire()
1587               
[34]1588                if self.myMetrics.has_key( host ):
[32]1589
[34]1590                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1591
[34]1592                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1593
[34]1594                                        if mymetric['time'] == metric['time']:
[32]1595
[34]1596                                                # Allready have this metric, abort
[179]1597                                                self.slot.release()
[34]1598                                                return 1
1599                        else:
1600                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1601                else:
[292]1602                        self.myMetrics[ host ]                          = { }
1603                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
[32]1604
[63]1605                # Push new metric onto stack
1606                # atomic code; only 1 thread at a time may access the stack
1607
[32]1608                self.myMetrics[ host ][ metric['name'] ].append( metric )
1609
[40]1610                self.slot.release()
[53]1611                #
1612                # </ATOMIC>
[40]1613
[47]1614        def makeUpdateList( self, host, metriclist ):
[63]1615                """
1616                Make a list of update values for rrdupdate
1617                but only those that we didn't store before
1618                """
[37]1619
[292]1620                update_list     = [ ]
1621                metric          = None
[37]1622
[47]1623                while len( metriclist ) > 0:
[37]1624
[53]1625                        metric = metriclist.pop( 0 )
[37]1626
[53]1627                        if self.checkStoreMetric( host, metric ):
[292]1628
[365]1629                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1630                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1631                                update_list.append( u_val )
[40]1632
[37]1633                return update_list
1634
[49]1635        def checkStoreMetric( self, host, metric ):
[63]1636                """Check if supplied metric if newer than last one stored"""
[40]1637
1638                if self.lastStored.has_key( host ):
1639
[47]1640                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]1641
[47]1642                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1643
[50]1644                                        # This is old
[40]1645                                        return 0
1646
[50]1647                return 1
1648
[54]1649        def memLastUpdate( self, host, metricname, metriclist ):
[63]1650                """
1651                Memorize the time of the latest metric from metriclist
1652                but only if it wasn't allready memorized
1653                """
[50]1654
[54]1655                if not self.lastStored.has_key( host ):
1656                        self.lastStored[ host ] = { }
1657
[50]1658                last_update_time = 0
1659
1660                for metric in metriclist:
1661
[54]1662                        if metric['name'] == metricname:
[50]1663
[54]1664                                if metric['time'] > last_update_time:
[50]1665
[54]1666                                        last_update_time = metric['time']
[40]1667
[54]1668                if self.lastStored[ host ].has_key( metricname ):
[52]1669                       
[54]1670                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]1671                                return 1
[40]1672
[54]1673                self.lastStored[ host ][ metricname ] = last_update_time
[52]1674
[33]1675        def storeMetrics( self ):
[63]1676                """
1677                Store all metrics from memory to disk
1678                and do it to the RRD's in appropriate timeperiod directory
1679                """
[33]1680
[365]1681                debug_msg( 5, "Entering storeMetrics()")
1682
1683                count_values    = 0
1684                count_metrics   = 0
1685                count_bits      = 0
1686
[33]1687                for hostname, mymetrics in self.myMetrics.items():     
1688
1689                        for metricname, mymetric in mymetrics.items():
1690
[365]1691                                count_metrics += 1
1692
1693                                for dmetric in mymetric:
1694
1695                                        count_values += 1
1696
1697                                        count_bits      += len( dmetric['time'] )
1698                                        count_bits      += len( dmetric['val'] )
1699
1700                count_bytes     = count_bits / 8
1701
1702                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1703                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1704                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1705                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1706
1707                for hostname, mymetrics in self.myMetrics.items():     
1708
1709                        for metricname, mymetric in mymetrics.items():
1710
[53]1711                                metrics_to_store = [ ]
1712
[63]1713                                # Pop metrics from stack for storing until none is left
1714                                # atomic code: only 1 thread at a time may access myMetrics
1715
[53]1716                                # <ATOMIC>
[50]1717                                #
[47]1718                                self.slot.acquire() 
[33]1719
[54]1720                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1721
[54]1722                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1723
[176]1724                                                try:
1725                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1726                                                except IndexError, msg:
1727
[179]1728                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1729                                                        # is still len 0 when the statement is executed.
1730                                                        # Just ignore indexerror's..
[176]1731                                                        pass
1732
[53]1733                                self.slot.release()
1734                                #
1735                                # </ATOMIC>
1736
[47]1737                                # Create a mapping table, each metric to the period where it should be stored
1738                                #
[53]1739                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1740
[50]1741                                update_rets = [ ]
1742
[47]1743                                for period, pmetric in metric_serial_table.items():
1744
[146]1745                                        create_ret = self.createCheck( hostname, metricname, period )   
[47]1746
1747                                        update_ret = self.update( hostname, metricname, period, pmetric )
1748
1749                                        if update_ret == 0:
1750
1751                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1752                                        else:
1753                                                debug_msg( 9, 'metric update failed' )
1754
[146]1755                                        update_rets.append( create_ret )
[50]1756                                        update_rets.append( update_ret )
[47]1757
[179]1758                                # Lets ignore errors here for now, we need to make sure last update time
1759                                # is correct!
1760                                #
1761                                #if not (1) in update_rets:
[50]1762
[179]1763                                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1764
[365]1765                debug_msg( 5, "Leaving storeMetrics()")
1766
[17]1767        def makeTimeSerial( self ):
[63]1768                """Generate a time serial. Seconds since epoch"""
[17]1769
1770                # Seconds since epoch
1771                mytime = int( time.time() )
1772
1773                return mytime
1774
[50]1775        def makeRrdPath( self, host, metricname, timeserial ):
[63]1776                """Make a RRD location/path and filename"""
[17]1777
[292]1778                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1779                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
[17]1780
1781                return rrd_dir, rrd_file
1782
[20]1783        def getLastRrdTimeSerial( self, host ):
[63]1784                """Find the last timeserial (directory) for this host"""
[17]1785
[19]1786                newest_timeserial = 0
1787
[47]1788                for dir in self.timeserials[ host ]:
[32]1789
[47]1790                        valid_dir = 1
[17]1791
[47]1792                        for letter in dir:
1793                                if letter not in string.digits:
1794                                        valid_dir = 0
[17]1795
[47]1796                        if valid_dir:
1797                                timeserial = dir
1798                                if timeserial > newest_timeserial:
1799                                        newest_timeserial = timeserial
[17]1800
1801                if newest_timeserial:
[18]1802                        return newest_timeserial
[17]1803                else:
1804                        return 0
1805
[47]1806        def determinePeriod( self, host, check_serial ):
[63]1807                """Determine to which period (directory) this time(serial) belongs"""
[47]1808
1809                period_serial = 0
1810
[56]1811                if self.timeserials.has_key( host ):
[47]1812
[56]1813                        for serial in self.timeserials[ host ]:
[47]1814
[56]1815                                if check_serial >= serial and period_serial < serial:
[47]1816
[56]1817                                        period_serial = serial
1818
[47]1819                return period_serial
1820
1821        def determineSerials( self, host, metricname, metriclist ):
1822                """
1823                Determine the correct serial and corresponding rrd to store
1824                for a list of metrics
1825                """
1826
1827                metric_serial_table = { }
1828
1829                for metric in metriclist:
1830
1831                        if metric['name'] == metricname:
1832
[292]1833                                period          = self.determinePeriod( host, metric['time'] ) 
[47]1834
[292]1835                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[47]1836
[49]1837                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1838
1839                                        # This one should get it's own new period
1840                                        period = metric['time']
[57]1841
1842                                        if not self.timeserials.has_key( host ):
1843                                                self.timeserials[ host ] = [ ]
1844
[50]1845                                        self.timeserials[ host ].append( period )
[47]1846
1847                                if not metric_serial_table.has_key( period ):
1848
[49]1849                                        metric_serial_table[ period ] = [ ]
[47]1850
1851                                metric_serial_table[ period ].append( metric )
1852
1853                return metric_serial_table
1854
[33]1855        def createCheck( self, host, metricname, timeserial ):
[63]1856                """Check if an rrd allready exists for this metric, create if not"""
[9]1857
[35]1858                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]1859               
[33]1860                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]1861
[9]1862                if not os.path.exists( rrd_dir ):
[58]1863
1864                        try:
1865                                os.makedirs( rrd_dir )
1866
[169]1867                        except os.OSError, msg:
[58]1868
1869                                if msg.find( 'File exists' ) != -1:
1870
1871                                        # Ignore exists errors
1872                                        pass
1873
1874                                else:
1875
1876                                        print msg
1877                                        return
1878
[14]1879                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]1880
[14]1881                if not os.path.exists( rrd_file ):
[9]1882
[292]1883                        interval        = self.config.getInterval( self.cluster )
1884                        heartbeat       = 8 * int( interval )
[9]1885
[292]1886                        params          = [ ]
[12]1887
[37]1888                        params.append( '--step' )
1889                        params.append( str( interval ) )
[12]1890
[37]1891                        params.append( '--start' )
[47]1892                        params.append( str( int( timeserial ) - 1 ) )
[12]1893
[37]1894                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1895                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]1896
[37]1897                        self.rrdm.create( str(rrd_file), params )
1898
[14]1899                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1900
[47]1901        def update( self, host, metricname, timeserial, metriclist ):
[63]1902                """
1903                Update rrd file for host with metricname
1904                in directory timeserial with metriclist
1905                """
[9]1906
[35]1907                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]1908
[292]1909                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
[18]1910
[292]1911                update_list             = self.makeUpdateList( host, metriclist )
[15]1912
[41]1913                if len( update_list ) > 0:
1914                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]1915
[41]1916                        if ret:
1917                                return 1
[27]1918               
[41]1919                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]1920
[36]1921                return 0
1922
[169]1923def daemon():
1924        """daemonized threading"""
[8]1925
[169]1926        # Fork the first child
1927        #
1928        pid = os.fork()
1929
1930        if pid > 0:
1931
1932                sys.exit(0)  # end parent
1933
1934        # creates a session and sets the process group ID
1935        #
1936        os.setsid()
1937
1938        # Fork the second child
1939        #
1940        pid = os.fork()
1941
1942        if pid > 0:
1943
1944                sys.exit(0)  # end parent
1945
[435]1946        write_pidfile()
1947
[169]1948        # Go to the root directory and set the umask
1949        #
1950        os.chdir('/')
1951        os.umask(0)
1952
1953        sys.stdin.close()
1954        sys.stdout.close()
1955        sys.stderr.close()
1956
[273]1957        os.open('/dev/null', os.O_RDWR)
1958        os.dup2(0, 1)
1959        os.dup2(0, 2)
[169]1960
1961        run()
1962
1963def run():
1964        """Threading start"""
1965
[469]1966        config          = GangliaConfigParser( GMETAD_CONF )
1967        s_timeout       = int( config.getLowestInterval() - 1 )
1968
1969        socket.setdefaulttimeout( s_timeout )
1970
[287]1971        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[295]1972        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[8]1973
[295]1974        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1975        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
[287]1976
[169]1977        try:
[292]1978                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1979                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]1980
[169]1981                torque_xml_thread.start()
1982                ganglia_xml_thread.start()
1983               
[176]1984        except thread.error, msg:
[169]1985                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1986                syslog.closelog()
1987                sys.exit(1)
1988               
1989        debug_msg( 0, 'main threading started.' )
[78]1990
[169]1991def main():
1992        """Program startup"""
1993
[375]1994        global DAEMONIZE, USE_SYSLOG
1995
[214]1996        if not processArgs( sys.argv[1:] ):
1997                sys.exit( 1 )
1998
[169]1999        if( DAEMONIZE and USE_SYSLOG ):
2000                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2001
2002        if DAEMONIZE:
2003                daemon()
2004        else:
2005                run()
2006
2007#
[81]2008# Global functions
[169]2009#
[81]2010
[9]2011def check_dir( directory ):
[63]2012        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]2013
2014        if directory[-1] == '/':
2015                directory = directory[:-1]
2016
2017        return directory
2018
[295]2019def reqtime2epoch( rtime ):
2020
2021        (hours, minutes, seconds )      = rtime.split( ':' )
2022
2023        etime   = int(seconds)
2024        etime   = etime + ( int(minutes) * 60 )
2025        etime   = etime + ( int(hours) * 60 * 60 )
2026
2027        return etime
2028
[12]2029def debug_msg( level, msg ):
[169]2030        """Only print msg if correct levels"""
[12]2031
[169]2032        if (not DAEMONIZE and DEBUG_LEVEL >= level):
2033                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2034       
2035        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2036                syslog.syslog( msg )
[12]2037
[46]2038def printTime( ):
[63]2039        """Print current time in human readable format"""
[46]2040
2041        return time.strftime("%a %d %b %Y %H:%M:%S")
2042
[435]2043def write_pidfile():
2044
2045        # Write pidfile if PIDFILE exists
2046        if PIDFILE:
2047
2048                pid     = os.getpid()
2049
2050                pidfile = open(PIDFILE, 'w')
2051
2052                pidfile.write( str( pid ) )
2053                pidfile.close()
2054
[63]2055# Ooohh, someone started me! Let's go..
[469]2056#
[9]2057if __name__ == '__main__':
2058        main()
Note: See TracBrowser for help on using the repository browser.