source: trunk/jobarchived/jobarchived.py @ 455

Last change on this file since 455 was 455, checked in by bastiaans, 16 years ago

jobarchived/jobarchived.py:

  • fix to py-rrdtool module detection

jobarchived/examples/jobarchived.conf:

  • more sensible default path
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 47.4 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 455 2008-02-05 15:26:24Z bastiaans $
22#
[3]23
[284]24DEFAULT_SEARCH_PATH     = '/usr/share/jobarchived'
[3]25
[284]26import sys
27
28if DEFAULT_SEARCH_PATH not in sys.path:
29
30        sys.path.append( DEFAULT_SEARCH_PATH )
31
32import getopt, syslog, ConfigParser
33
[435]34def usage():
35
36        print
37        print 'usage: jobarchived [options]'
38        print 'options:'
39        print '      --config, -c      configuration file'
40        print '      --pidfile, -p     pid file'
41        print '      --help, -h        help'
42        print
43
[214]44def processArgs( args ):
[6]45
[435]46        SHORT_L = 'p:hc:'
47        LONG_L  = [ 'help', 'config=', 'pidfile=' ]
[169]48
[214]49        config_filename = None
[169]50
[435]51        global PIDFILE
52
53        PIDFILE = None
54
[214]55        try:
[169]56
[214]57                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]58
[214]59        except getopt.error, detail:
[60]60
[214]61                print detail
62                sys.exit(1)
[9]63
[214]64        for opt, value in opts:
[60]65
[214]66                if opt in [ '--config', '-c' ]:
[13]67
[214]68                        config_filename = value
[198]69
[435]70                if opt in [ '--pidfile', '-p' ]:
71
72                        PIDFILE         = value
73
74                if opt in [ '--help', '-h' ]:
75
76                        usage()
77                        sys.exit( 0 )
78
[214]79        if not config_filename:
[60]80
[214]81                config_filename = '/etc/jobarchived.conf'
[22]82
[214]83        try:
84                return loadConfig( config_filename )
[13]85
[214]86        except ConfigParser.NoOptionError, detail:
87
88                print detail
89                sys.exit( 1 )
90
91def loadConfig( filename ):
92
93        def getlist( cfg_string ):
94
95                my_list = [ ]
96
97                for item_txt in cfg_string.split( ',' ):
98
99                        sep_char = None
100
101                        item_txt = item_txt.strip()
102
103                        for s_char in [ "'", '"' ]:
104
105                                if item_txt.find( s_char ) != -1:
106
107                                        if item_txt.count( s_char ) != 2:
108
109                                                print 'Missing quote: %s' %item_txt
110                                                sys.exit( 1 )
111
112                                        else:
113
114                                                sep_char = s_char
115                                                break
116
117                        if sep_char:
118
119                                item_txt = item_txt.split( sep_char )[1]
120
121                        my_list.append( item_txt )
122
123                return my_list
124
125        cfg = ConfigParser.ConfigParser()
126
127        cfg.read( filename )
128
[375]129        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
[214]130
[292]131        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
[214]132
[292]133        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
[214]134
[292]135        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[214]136
[292]137        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[214]138
[292]139        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[214]140
[375]141        MODRRDTOOL              = False
142
[214]143        try:
[375]144                import rrdtool
[214]145
[375]146                MODRRDTOOL              = True
147
148        except ImportError:
149
150                MODRRDTOOL              = False
151
152                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobmond significantly!" )
153
[455]154                RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
155
[375]156        try:
157
[292]158                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[214]159
160        except AttributeError, detail:
161
162                print 'Unknown syslog facility'
163                sys.exit( 1 )
164
[292]165        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
[214]166
[292]167        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
[214]168
[292]169        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
[214]170
[292]171        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
[214]172
[292]173        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
[214]174
[295]175        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
176
[292]177        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[214]178
[224]179
[214]180        return True
181
[17]182# What XML data types not to store
[13]183#
[17]184UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]185
[47]186# Maximum time (in seconds) a parsethread may run
187#
188PARSE_TIMEOUT = 60
189
190# Maximum time (in seconds) a storethread may run
191#
192STORE_TIMEOUT = 360
193
[8]194"""
[224]195The Job Archiving Daemon
[8]196"""
197
[214]198from types import *
199
200import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
[365]201import rrdtool
[379]202from pyPgSQL import PgSQL
[214]203
[379]204# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
205# and added some more functions for postgres.
206#       
207#
208# Changed by: Bas van der Vlies <basv@sara.nl>
209#       
210# SARA API for Postgres Database
211#
212# Changed by: Ramon Bastiaans for Job Monarch
213#
214
215class InitVars:
216        Vars = {}
217       
218        def __init__(self, **key_arg):
219                for (key, value) in key_arg.items():
220                        if value:
221                                self.Vars[key] = value
222                        else:   
223                                self.Vars[key] = None
224                               
225        def __call__(self, *key):
226                key = "%s" % key
227                return self.Vars[key]
228               
229        def __getitem__(self, key):
230                return self.Vars[key]
231               
232        def __repr__(self):
233                return repr(self.Vars)
234               
235        def keys(self):
236                barf =  map(None, self.Vars.keys())
237                return barf
238               
239        def values(self):
240                barf =  map(None, self.Vars.values())
241                return barf
242               
243        def has_key(self, key):
244                if self.Vars.has_key(key):
245                        return 1
246                else:   
247                        return 0
248                       
249class DBError(Exception):
250        def __init__(self, msg=''):
251                self.msg = msg
252                Exception.__init__(self, msg)
253        def __repr__(self):
254                return self.msg
255        __str__ = __repr__
256
257#
258# Class to connect to a database
259# and return the queury in a list or dictionairy.
260#
261class DB:
262        def __init__(self, db_vars):
263
264                self.dict = db_vars
265
266                if self.dict.has_key('User'):
267                        self.user = self.dict['User']
268                else:
269                        self.user = 'postgres'
270
271                if self.dict.has_key('Host'):
272                        self.host = self.dict['Host']
273                else:
274                        self.host = 'localhost'
275
276                if self.dict.has_key('Password'):
277                        self.passwd = self.dict['Password']
278                else:
279                        self.passwd = ''
280
281                if self.dict.has_key('DataBaseName'):
282                        self.db = self.dict['DataBaseName']
283                else:
284                        self.db = 'uva_cluster_db'
285
286                # connect_string = 'host:port:database:user:password:
287                dsn = "%s::%s:%s:%s" %(self.host, self.db, self.user, self.passwd)
288
289                try:
290                        self.SQL = PgSQL.connect(dsn)
291                except PgSQL.Error, details:
292                        str = "%s" %details
293                        raise DBError(str)
294
295        def __repr__(self):
296                return repr(self.result)
297
298        def __nonzero__(self):
299                return not(self.result == None)
300
301        def __len__(self):
302                return len(self.result)
303
304        def __getitem__(self,i):
305                return self.result[i]
306
307        def __getslice__(self,i,j):
308                return self.result[i:j]
309
310        def Get(self, q_str):
311                c = self.SQL.cursor()
312                try:
313                        c.execute(q_str)
314                        result = c.fetchall()
315                except PgSQL.Error, details:
316                        c.close()
317                        str = "%s" %details
318                        raise DBError(str)
319
320                c.close()
321                return result
322
323        def Set(self, q_str):
324                c = self.SQL.cursor()
325                try:
326                        c.execute(q_str)
327                        result = c.oidValue
328
329                except PgSQL.Error, details:
330                        c.close()
331                        str = "%s" %details
332                        raise DBError(str)
333
334                c.close()
335                return result
336
337        def Commit(self):
338                self.SQL.commit()
339
[84]340class DataSQLStore:
341
342        db_vars = None
343        dbc = None
344
345        def __init__( self, hostname, database ):
346
[379]347                self.db_vars = InitVars(DataBaseName=database,
[84]348                                User='root',
349                                Host=hostname,
350                                Password='',
351                                Dictionary='true')
352
353                try:
[379]354                        self.dbc     = DB(self.db_vars)
355                except DBError, details:
[169]356                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
[84]357                        sys.exit(1)
358
359        def setDatabase(self, statement):
360                ret = self.doDatabase('set', statement)
361                return ret
362               
363        def getDatabase(self, statement):
364                ret = self.doDatabase('get', statement)
365                return ret
366
367        def doDatabase(self, type, statement):
368
[365]369                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
[84]370                try:
371                        if type == 'set':
372                                result = self.dbc.Set( statement )
373                                self.dbc.Commit()
374                        elif type == 'get':
375                                result = self.dbc.Get( statement )
376                               
[379]377                except DBError, detail:
[84]378                        operation = statement.split(' ')[0]
[169]379                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
[84]380                        sys.exit(1)
381
[365]382                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
[84]383                return result
384
[191]385        def getJobNodeId( self, job_id, node_id ):
386
387                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
388                if len( id ) > 0:
389
390                        if len( id[0] ) > 0 and id[0] != '':
391                       
392                                return 1
393
394                return 0
395
[84]396        def getNodeId( self, hostname ):
397
[89]398                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]399
[89]400                if len( id ) > 0:
[84]401
[89]402                        id = id[0][0]
403
[84]404                        return id
405                else:
406                        return None
407
408        def getNodeIds( self, hostnames ):
409
410                ids = [ ]
411
412                for node in hostnames:
413
414                        id = self.getNodeId( node )
415
416                        if id:
417                                ids.append( id )
418
419                return ids
420
421        def getJobId( self, jobid ):
422
423                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
424
425                if id:
[89]426                        id = id[0][0]
[84]427
428                        return id
429                else:
430                        return None
431
432        def addJob( self, job_id, jobattrs ):
433
434                if not self.getJobId( job_id ):
435
436                        self.mutateJob( 'insert', job_id, jobattrs ) 
437                else:
438                        self.mutateJob( 'update', job_id, jobattrs )
439
440        def mutateJob( self, action, job_id, jobattrs ):
441
[292]442                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
[84]443
[292]444                insert_col_str  = 'job_id'
445                insert_val_str  = "'%s'" %job_id
446                update_str      = None
[84]447
[365]448                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
[84]449
[99]450                ids = [ ]
451
[84]452                for valname, value in jobattrs.items():
453
[96]454                        if valname in job_values and value != '':
[84]455
456                                column_name = 'job_' + valname
457
458                                if action == 'insert':
459
460                                        if not insert_col_str:
461                                                insert_col_str = column_name
462                                        else:
463                                                insert_col_str = insert_col_str + ',' + column_name
464
465                                        if not insert_val_str:
466                                                insert_val_str = value
467                                        else:
468                                                insert_val_str = insert_val_str + ",'%s'" %value
469
470                                elif action == 'update':
471                                       
472                                        if not update_str:
473                                                update_str = "%s='%s'" %(column_name, value)
474                                        else:
475                                                update_str = update_str + ",%s='%s'" %(column_name, value)
476
[90]477                        elif valname == 'nodes' and value:
[84]478
[191]479                                node_valid = 1
[190]480
481                                if len(value) == 1:
482                               
[191]483                                        if jobattrs['status'] == 'Q':
[190]484
[191]485                                                node_valid = 0
[190]486
[191]487                                        else:
[190]488
[191]489                                                node_valid = 0
[190]490
[191]491                                                for node_char in str(value[0]):
[190]492
[191]493                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]494
[191]495                                                                node_valid = 1
[84]496
[191]497                                if node_valid:
498
499                                        ids = self.addNodes( value, jobattrs['domain'] )
500
[84]501                if action == 'insert':
502
503                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]504
[84]505                elif action == 'update':
506
[89]507                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
[84]508
[191]509                if len( ids ) > 0:
510                        self.addJobNodes( job_id, ids )
[190]511
[154]512        def addNodes( self, hostnames, domain ):
[84]513
[98]514                ids = [ ]
515
[84]516                for node in hostnames:
517
[292]518                        node    = '%s.%s' %( node, domain )
519                        id      = self.getNodeId( node )
[84]520       
521                        if not id:
522                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
[98]523                                id = self.getNodeId( node )
[84]524
[98]525                        ids.append( id )
526
527                return ids
528
[86]529        def addJobNodes( self, jobid, nodes ):
530
531                for node in nodes:
532
[191]533                        if not self.getJobNodeId( jobid, node ):
534
535                                self.addJobNode( jobid, node )
536
[84]537        def addJobNode( self, jobid, nodeid ):
538
539                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
540
541        def storeJobInfo( self, jobid, jobattrs ):
542
543                self.addJob( jobid, jobattrs )
544
[295]545        def checkStaleJobs( self ):
546
547                q = "SELECT * from jobs WHERE job_status != 'F'"
548
549                r = self.getDatabase( q )
550
551                if len( r ) == 0:
552
553                        return None
554
555                cleanjobs       = [ ]
556                timeoutjobs     = [ ]
557
558                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
559                cur_time        = time.time()
560
561                for row in r:
562
563                        job_id                  = row[0]
564                        job_requested_time      = row[4]
565                        job_status              = row[7]
566                        job_start_timestamp     = row[8]
567
568                        if job_status == 'Q' or not job_start_timestamp:
569
570                                cleanjobs.append( job_id )
571
572                        else:
573
574                                start_timestamp = int( job_start_timestamp )
575
576                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
577
578                                        if job_requested_time:
579
580                                                rtime_epoch     = reqtime2epoch( job_requested_time )
581                                        else:
582                                                rtime_epoch     = None
583                                       
584                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
585
586                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
587
588                for j in cleanjobs:
589
590                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
591                        self.setDatabase( q )
592
593                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
594
595                for j in timeoutjobs:
596
597                        ( i, s, r )             = j
598
599                        if r:
600                                new_end_timestamp       = int( s ) + r
601
602                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
603                        self.setDatabase( q )
604
[37]605class RRDMutator:
[63]606        """A class for performing RRD mutations"""
[37]607
[277]608        binary = None
[37]609
[365]610
[37]611        def __init__( self, binary=None ):
[63]612                """Set alternate binary if supplied"""
[37]613
614                if binary:
615                        self.binary = binary
616
617        def create( self, filename, args ):
[63]618                """Create a new rrd with args"""
619
[375]620                global MODRRDTOOL
[37]621
[375]622                if MODRRDTOOL:
623                        return self.perform( 'create', filename, args )
624                else:
625                        return self.perform( 'create', '"' + filename + '"', args )
626
[37]627        def update( self, filename, args ):
[63]628                """Update a rrd with args"""
629
[375]630                global MODRRDTOOL
[37]631
[375]632                if MODRRDTOOL:
633                        return self.perform( 'update', filename, args )
634                else:
635                        return self.perform( 'update', '"' + filename + '"', args )
636
[42]637        def grabLastUpdate( self, filename ):
[63]638                """Determine the last update time of filename rrd"""
[42]639
[375]640                global MODRRDTOOL
641
[42]642                last_update = 0
643
[375]644                if MODRRDTOOL:
[53]645
[375]646                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
[42]647
[375]648                        rrd_header      = { }
[292]649
[375]650                        try:
651                                rrd_header      = rrdtool.info( filename )
652                        except rrdtool.error, msg:
653                                debug_msg( 8, str( msg ) )
654
655                        if rrd_header.has_key( 'last_update' ):
656                                return last_update
657                        else:
658                                return 0
659
[42]660                else:
[375]661                        debug_msg( 8, self.binary + ' info ' + filename )
[42]662
[375]663                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
664
665                        for line in my_pipe.readlines():
666
667                                if line.find( 'last_update') != -1:
668
669                                        last_update = line.split( ' = ' )[1]
670
671                        if my_pipe:
672
673                                my_pipe.close()
674
675                        if last_update:
676                                return last_update
677                        else:
678                                return 0
679
680
[40]681        def perform( self, action, filename, args ):
[63]682                """Perform action on rrd filename with args"""
[37]683
[375]684                global MODRRDTOOL
685
[37]686                arg_string = None
687
[40]688                if type( args ) is not ListType:
689                        debug_msg( 8, 'Arguments needs to be of type List' )
690                        return 1
691
[37]692                for arg in args:
693
694                        if not arg_string:
695
696                                arg_string = arg
697                        else:
698                                arg_string = arg_string + ' ' + arg
699
[375]700                if MODRRDTOOL:
[37]701
[375]702                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
[292]703
[375]704                        try:
705                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
[37]706
[375]707                                if action == 'create':
[146]708
[375]709                                        rrdtool.create( str( filename ), *args )
[37]710
[375]711                                elif action == 'update':
[37]712
[375]713                                        rrdtool.update( str( filename ), *args )
[365]714
[375]715                        except rrdtool.error, msg:
[365]716
[375]717                                error_msg = str( msg )
718                                debug_msg( 8, error_msg )
719                                return 1
720
721                else:
722
723                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
724
725                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
726                        lines   = cmd.readlines()
727
728                        cmd.close()
729
730                        for line in lines:
731
732                                if line.find( 'ERROR' ) != -1:
733
734                                        error_msg = string.join( line.split( ' ' )[1:] )
735                                        debug_msg( 8, error_msg )
736                                        return 1
737
[37]738                return 0
739
[78]740class XMLProcessor:
741        """Skeleton class for XML processor's"""
742
743        def run( self ):
744                """Do main processing of XML here"""
745
746                pass
747
748class TorqueXMLProcessor( XMLProcessor ):
749        """Main class for processing XML and acting with it"""
750
[295]751        def __init__( self, XMLSource, DataStore ):
[78]752                """Setup initial XML connection and handlers"""
753
[293]754                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]755                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
756                self.myXMLSource        = XMLSource
[295]757                self.myXMLHandler       = TorqueXMLHandler( DataStore )
[287]758                self.myXMLError         = XMLErrorHandler()
[78]759
[287]760                self.config             = GangliaConfigParser( GMETAD_CONF )
761
[78]762        def run( self ):
763                """Main XML processing"""
764
[169]765                debug_msg( 1, 'torque_xml_thread(): started.' )
[87]766
[78]767                while( 1 ):
768
[287]769                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
[169]770                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
[176]771
[287]772                        my_data = self.myXMLSource.getData()
773
[176]774                        try:
[287]775                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]776                        except socket.error, msg:
777                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
778                               
[169]779                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
780                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[87]781                        time.sleep( self.config.getLowestInterval() )
[78]782
[71]783class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
[63]784        """Parse Torque's jobinfo XML from our plugin"""
785
[72]786        jobAttrs = { }
787
[295]788        def __init__( self, datastore ):
[84]789
[295]790                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
791                self.ds                 = datastore
[292]792                self.jobs_processed     = [ ]
793                self.jobs_to_store      = [ ]
[84]794
[183]795        def startDocument( self ):
796
[292]797                self.heartbeat  = 0
[365]798                self.elementct  = 0
[183]799
[63]800        def startElement( self, name, attrs ):
801                """
802                This XML will be all gmetric XML
803                so there will be no specific start/end element
804                just one XML statement with all info
805                """
806               
[73]807                jobinfo = { }
[63]808
[365]809                self.elementct  += 1
810
[199]811                if name == 'CLUSTER':
[63]812
[372]813                        self.clustername = str( attrs.get( 'NAME', "" ) )
[199]814
815                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
816
[372]817                        metricname = str( attrs.get( 'NAME', "" ) )
[63]818
[285]819                        if metricname == 'MONARCH-HEARTBEAT':
[372]820                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
[63]821
[285]822                        elif metricname.find( 'MONARCH-JOB' ) != -1:
[63]823
[292]824                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
[372]825                                val     = str( attrs.get( 'VAL', "" ) )
[63]826
[96]827                                if not job_id in self.jobs_processed:
[292]828
[96]829                                        self.jobs_processed.append( job_id )
830
[73]831                                check_change = 0
832
833                                if self.jobAttrs.has_key( job_id ):
[292]834
[73]835                                        check_change = 1
836
[63]837                                valinfo = val.split( ' ' )
838
839                                for myval in valinfo:
840
[84]841                                        if len( myval.split( '=' ) ) > 1:
[63]842
[292]843                                                valname = myval.split( '=' )[0]
844                                                value   = myval.split( '=' )[1]
[70]845
[84]846                                                if valname == 'nodes':
847                                                        value = value.split( ';' )
[72]848
[84]849                                                jobinfo[ valname ] = value
850
[73]851                                if check_change:
[182]852                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
[292]853                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
854                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
[84]855                                                if not job_id in self.jobs_to_store:
856                                                        self.jobs_to_store.append( job_id )
857
[365]858                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]859                                else:
860                                        self.jobAttrs[ job_id ] = jobinfo
[84]861
862                                        if not job_id in self.jobs_to_store:
863                                                self.jobs_to_store.append( job_id )
864
[365]865                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]866                                       
[77]867        def endDocument( self ):
[74]868                """When all metrics have gone, check if any jobs have finished"""
[72]869
[371]870                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
[365]871
[182]872                if self.heartbeat:
873                        for jobid, jobinfo in self.jobAttrs.items():
[74]874
[182]875                                # This is an old job, not in current jobinfo list anymore
876                                # it must have finished, since we _did_ get a new heartbeat
877                                #
878                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
[102]879
[184]880                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
[74]881
[182]882                                        if not jobid in self.jobs_processed:
883                                                self.jobs_processed.append( jobid )
[96]884
[182]885                                        self.jobAttrs[ jobid ]['status'] = 'F'
[360]886                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
[96]887
[182]888                                        if not jobid in self.jobs_to_store:
889                                                self.jobs_to_store.append( jobid )
[74]890
[182]891                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
[87]892
[182]893                        for jobid in self.jobs_to_store:
[295]894                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
[84]895
[184]896                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
897
898                                        if self.jobAttrs[ jobid ]['status'] == 'F':
899                                                del self.jobAttrs[ jobid ]
900
[182]901                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
[87]902
[292]903                        self.jobs_processed     = [ ]
904                        self.jobs_to_store      = [ ]
[84]905
[82]906        def setJobAttrs( self, old, new ):
907                """
908                Set new job attributes in old, but not lose existing fields
909                if old attributes doesn't have those
910                """
911
912                for valname, value in new.items():
913                        old[ valname ] = value
914
915                return old
916               
917
[73]918        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
[74]919                """
920                Check if jobinfo has changed from jobattrs[jobid]
921                if it's report time is bigger than previous one
922                and it is report time is recent (equal to heartbeat)
923                """
[72]924
[87]925                ignore_changes = [ 'reported' ]
926
[73]927                if jobattrs.has_key( jobid ):
928
929                        for valname, value in jobinfo.items():
930
[87]931                                if valname not in ignore_changes:
[73]932
[87]933                                        if jobattrs[ jobid ].has_key( valname ):
[73]934
[87]935                                                if value != jobattrs[ jobid ][ valname ]:
[73]936
[87]937                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
[360]938                                                                return True
[73]939
[87]940                                        else:
[360]941                                                return True
[87]942
[360]943                return False
[73]944
[71]945class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[63]946        """Parse Ganglia's XML"""
[3]947
[295]948        def __init__( self, config, datastore ):
[63]949                """Setup initial variables and gather info on existing rrd archive"""
950
[292]951                self.config     = config
952                self.clusters   = { }
[295]953                self.ds         = datastore
[324]954
[295]955                debug_msg( 1, 'Checking database..' )
[365]956
957                global DEBUG_LEVEL
958
959                if DEBUG_LEVEL <= 2:
960                        self.ds.checkStaleJobs()
961
[295]962                debug_msg( 1, 'Check done.' )
[296]963                debug_msg( 1, 'Checking rrd archive..' )
[293]964                self.gatherClusters()
[169]965                debug_msg( 1, 'Check done.' )
[33]966
[44]967        def gatherClusters( self ):
[63]968                """Find all existing clusters in archive dir"""
[44]969
[292]970                archive_dir     = check_dir(ARCHIVE_PATH)
[44]971
[292]972                hosts           = [ ]
[44]973
974                if os.path.exists( archive_dir ):
975
[292]976                        dirlist = os.listdir( archive_dir )
[44]977
[369]978                        for cfgcluster in ARCHIVE_DATASOURCES:
979
980                                if cfgcluster not in dirlist:
981
982                                        # Autocreate a directory for this cluster
983                                        # assume it is new
984                                        #
985                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
986
987                                        os.mkdir( cluster_dir )
988
[370]989                                        dirlist.append( cfgcluster )
990
[44]991                        for item in dirlist:
992
993                                clustername = item
994
[60]995                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]996
997                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
998
[365]999                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
1000
[6]1001        def startElement( self, name, attrs ):
[63]1002                """Memorize appropriate data from xml start tags"""
[3]1003
[7]1004                if name == 'GANGLIA_XML':
[32]1005
[372]1006                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
1007                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
[32]1008
[12]1009                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]1010
[7]1011                elif name == 'GRID':
[32]1012
[372]1013                        self.gridName   = str( attrs.get( 'NAME', "" ) )
1014                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
[32]1015
[12]1016                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]1017
[7]1018                elif name == 'CLUSTER':
[32]1019
[372]1020                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
1021                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
[32]1022
[60]1023                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]1024
[34]1025                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]1026
[35]1027                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]1028
[60]1029                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]1030
[372]1031                        self.hostName           = str( attrs.get( 'NAME', "" ) )
1032                        self.hostIp             = str( attrs.get( 'IP', "" ) )
1033                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
[32]1034
[12]1035                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]1036
[60]1037                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]1038
[372]1039                        type = str( attrs.get( 'TYPE', "" ) )
[198]1040                       
1041                        exclude_metric = False
1042                       
1043                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]1044
[372]1045                                orig_name = str( attrs.get( 'NAME', "" ) )
[3]1046
[198]1047                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1048                               
1049                                        exclude_metric = True
1050
1051                                elif re.match( ex_metricstr, orig_name ):
1052
1053                                        exclude_metric = True
1054
1055                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1056
[292]1057                                myMetric                = { }
[372]1058                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
1059                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
[292]1060                                myMetric['time']        = self.hostReported
[3]1061
[34]1062                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]1063
[34]1064                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]1065
[34]1066        def storeMetrics( self ):
[63]1067                """Store metrics of each cluster rrd handler"""
[9]1068
[34]1069                for clustername, rrdh in self.clusters.items():
[16]1070
[38]1071                        ret = rrdh.storeMetrics()
[9]1072
[38]1073                        if ret:
1074                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1075                                return 1
1076
1077                return 0
1078
[71]1079class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1080
1081        def error( self, exception ):
1082                """Recoverable error"""
1083
[169]1084                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]1085
1086        def fatalError( self, exception ):
1087                """Non-recoverable error"""
1088
1089                exception_str = str( exception )
1090
1091                # Ignore 'no element found' errors
1092                if exception_str.find( 'no element found' ) != -1:
[169]1093                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
[71]1094                        return 0
1095
[170]1096                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]1097                sys.exit( 1 )
1098
1099        def warning( self, exception ):
1100                """Warning"""
1101
1102                debug_msg( 0, 'Warning ' + str( exception ) )
1103
[78]1104class XMLGatherer:
[63]1105        """Setup a connection and file object to Ganglia's XML"""
[3]1106
[287]1107        s               = None
1108        fd              = None
1109        data            = None
[293]1110        slot            = None
[8]1111
[287]1112        # Time since the last update
1113        #
1114        LAST_UPDATE     = 0
1115
1116        # Minimum interval between updates
1117        #
1118        MIN_UPDATE_INT  = 10
1119
1120        # Is a update occuring now
1121        #
1122        update_now      = False
1123
[8]1124        def __init__( self, host, port ):
[63]1125                """Store host and port for connection"""
[8]1126
[293]1127                self.host       = host
1128                self.port       = port
1129                self.slot       = threading.Lock()
[3]1130
[287]1131                self.retrieveData()
1132
1133        def retrieveData( self ):
[63]1134                """Setup connection to XML source"""
[8]1135
[287]1136                self.update_now = True
1137
[293]1138                self.slot.acquire()
1139
[8]1140                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]1141
[5]1142                        af, socktype, proto, canonname, sa = res
[32]1143
[5]1144                        try:
[32]1145
[8]1146                                self.s = socket.socket( af, socktype, proto )
[32]1147
[5]1148                        except socket.error, msg:
[32]1149
[8]1150                                self.s = None
[5]1151                                continue
[32]1152
[5]1153                        try:
[32]1154
[8]1155                                self.s.connect( sa )
[32]1156
[5]1157                        except socket.error, msg:
[32]1158
[70]1159                                self.disconnect()
[5]1160                                continue
[32]1161
[5]1162                        break
[3]1163
[8]1164                if self.s is None:
[32]1165
[170]1166                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
[287]1167                        self.update_now = False
[33]1168                        sys.exit( 1 )
[5]1169
[287]1170                else:
[324]1171                        #self.s.send( '\n' )
[287]1172
1173                        my_fp                   = self.s.makefile( 'r' )
1174                        my_data                 = my_fp.readlines()
1175                        my_data                 = string.join( my_data, '' )
1176
1177                        self.data               = my_data
1178
1179                        self.LAST_UPDATE        = time.time()
1180
[293]1181                self.slot.release()
1182
[287]1183                self.update_now = False
1184
[33]1185        def disconnect( self ):
[63]1186                """Close socket"""
[33]1187
1188                if self.s:
[287]1189                        #self.s.shutdown( 2 )
[33]1190                        self.s.close()
1191                        self.s = None
1192
1193        def __del__( self ):
[63]1194                """Kill the socket before we leave"""
[33]1195
1196                self.disconnect()
1197
[287]1198        def reGetData( self ):
[70]1199                """Reconnect"""
[33]1200
[287]1201                while self.update_now:
1202
1203                        # Must be another update in progress:
1204                        # Wait until the update is complete
1205                        #
1206                        time.sleep( 1 )
1207
[38]1208                if self.s:
1209                        self.disconnect()
[33]1210
[287]1211                self.retrieveData()
[5]1212
[287]1213        def getData( self ):
1214
1215                """Return the XML data"""
1216
1217                # If more than MIN_UPDATE_INT seconds passed since last data update
1218                # update the XML first before returning it
1219                #
1220
1221                cur_time        = time.time()
1222
1223                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1224
1225                        self.reGetData()
1226
1227                while self.update_now:
1228
1229                        # Must be another update in progress:
1230                        # Wait until the update is complete
1231                        #
1232                        time.sleep( 1 )
1233                       
1234                return self.data
1235
[70]1236        def makeFileDescriptor( self ):
1237                """Make file descriptor that points to our socket connection"""
1238
1239                self.reconnect()
1240
1241                if self.s:
1242                        self.fd = self.s.makefile( 'r' )
1243
1244        def getFileObject( self ):
1245                """Connect, and return a file object"""
1246
[78]1247                self.makeFileDescriptor()
1248
[70]1249                if self.fd:
1250                        return self.fd
1251
[78]1252class GangliaXMLProcessor( XMLProcessor ):
[63]1253        """Main class for processing XML and acting with it"""
[5]1254
[295]1255        def __init__( self, XMLSource, DataStore ):
[63]1256                """Setup initial XML connection and handlers"""
[33]1257
[287]1258                self.config             = GangliaConfigParser( GMETAD_CONF )
[33]1259
[293]1260                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]1261                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1262                self.myXMLSource        = XMLSource
[295]1263                self.ds                 = DataStore
1264                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
[287]1265                self.myXMLError         = XMLErrorHandler()
[73]1266
[9]1267        def run( self ):
[63]1268                """Main XML processing; start a xml and storethread"""
[8]1269
[102]1270                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1271                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]1272
[36]1273                while( 1 ):
1274
[102]1275                        if not xml_thread.isAlive():
[36]1276                                # Gather XML at the same interval as gmetad
1277
1278                                # threaded call to: self.processXML()
1279                                #
[169]1280                                try:
1281                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1282                                        xml_thread.start()
[176]1283                                except thread.error, msg:
[169]1284                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1285                                        #return 1
[36]1286
[102]1287                        if not store_thread.isAlive():
[55]1288                                # Store metrics every .. sec
[36]1289
[55]1290                                # threaded call to: self.storeMetrics()
1291                                #
[169]1292                                try:
1293                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1294                                        store_thread.start()
[176]1295                                except thread.error, msg:
[169]1296                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1297                                        #return 1
[36]1298               
1299                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1300                        time.sleep( 1 ) 
1301
[33]1302        def storeMetrics( self ):
[63]1303                """Store metrics retained in memory to disk"""
[22]1304
[365]1305                global DEBUG_LEVEL
1306
[63]1307                # Store metrics somewhere between every 360 and 640 seconds
[38]1308                #
[365]1309                if DEBUG_LEVEL > 2:
1310                        #STORE_INTERVAL = 60
1311                        STORE_INTERVAL = random.randint( 360, 640 )
1312                else:
1313                        STORE_INTERVAL = random.randint( 360, 640 )
[22]1314
[169]1315                try:
1316                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1317                        store_metric_thread.start()
[176]1318                except thread.error, msg:
[169]1319                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1320                        return 1
[36]1321
[169]1322                debug_msg( 1, 'ganglia_store_thread(): started.' )
1323
1324                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]1325                time.sleep( STORE_INTERVAL )
[169]1326                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]1327
[102]1328                if store_metric_thread.isAlive():
[36]1329
[169]1330                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[136]1331                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[169]1332                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
[36]1333
[169]1334                debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]1335
1336                return 0
1337
[39]1338        def storeThread( self ):
[63]1339                """Actual metric storing thread"""
[39]1340
[169]1341                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1342                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[78]1343                ret = self.myXMLHandler.storeMetrics()
[176]1344                if ret > 0:
1345                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[169]1346                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1347                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
[39]1348               
[176]1349                return 0
[39]1350
[8]1351        def processXML( self ):
[63]1352                """Process XML"""
[8]1353
[169]1354                try:
1355                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1356                        parsethread.start()
[176]1357                except thread.error, msg:
[169]1358                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1359                        return 1
[8]1360
[169]1361                debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]1362
[169]1363                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]1364                time.sleep( float( self.config.getLowestInterval() ) ) 
[169]1365                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]1366
1367                if parsethread.isAlive():
1368
[169]1369                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[47]1370                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[169]1371                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
[36]1372
[169]1373                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]1374
1375                return 0
1376
[39]1377        def parseThread( self ):
[63]1378                """Actual parsing thread"""
[39]1379
[169]1380                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1381                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
[287]1382                #self.myXMLSource = self.myXMLGatherer.getFileObject()
1383               
1384                my_data = self.myXMLSource.getData()
[176]1385
[293]1386                #print my_data
1387
[176]1388                try:
[287]1389                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]1390                except socket.error, msg:
1391                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1392
[169]1393                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1394                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]1395
[176]1396                return 0
[39]1397
[9]1398class GangliaConfigParser:
1399
[34]1400        sources = [ ]
[9]1401
1402        def __init__( self, config ):
[63]1403                """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1404
[9]1405                self.config = config
1406                self.parseValues()
1407
[32]1408        def parseValues( self ):
[63]1409                """Parse certain values from gmetad.conf"""
[9]1410
1411                readcfg = open( self.config, 'r' )
1412
1413                for line in readcfg.readlines():
1414
1415                        if line.count( '"' ) > 1:
1416
[10]1417                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1418
[292]1419                                        source          = { }
1420                                        source['name']  = line.split( '"' )[1]
1421                                        source_words    = line.split( '"' )[2].split( ' ' )
[9]1422
1423                                        for word in source_words:
1424
1425                                                valid_interval = 1
1426
1427                                                for letter in word:
[32]1428
[9]1429                                                        if letter not in string.digits:
[32]1430
[9]1431                                                                valid_interval = 0
1432
[10]1433                                                if valid_interval and len(word) > 0:
[32]1434
[9]1435                                                        source['interval'] = word
[12]1436                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]1437       
1438                                        # No interval found, use Ganglia's default     
1439                                        if not source.has_key( 'interval' ):
1440                                                source['interval'] = 15
1441                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1442
[33]1443                                        self.sources.append( source )
[9]1444
1445        def getInterval( self, source_name ):
[63]1446                """Return interval for source_name"""
[32]1447
[9]1448                for source in self.sources:
[32]1449
[12]1450                        if source['name'] == source_name:
[32]1451
[9]1452                                return source['interval']
[32]1453
[9]1454                return None
1455
[34]1456        def getLowestInterval( self ):
[63]1457                """Return the lowest interval of all clusters"""
[34]1458
1459                lowest_interval = 0
1460
1461                for source in self.sources:
1462
1463                        if not lowest_interval or source['interval'] <= lowest_interval:
1464
1465                                lowest_interval = source['interval']
1466
1467                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1468                if lowest_interval:
1469                        return lowest_interval
1470                else:
1471                        return 15
1472
[9]1473class RRDHandler:
[63]1474        """Class for handling RRD activity"""
[9]1475
[32]1476        myMetrics = { }
[40]1477        lastStored = { }
[47]1478        timeserials = { }
[36]1479        slot = None
[32]1480
[33]1481        def __init__( self, config, cluster ):
[63]1482                """Setup initial variables"""
[78]1483
[455]1484                global MODRRDTOOL
1485
[292]1486                self.block      = 0
1487                self.cluster    = cluster
1488                self.config     = config
1489                self.slot       = threading.Lock()
1490
[455]1491                if MODRRDTOOL:
1492
1493                        self.rrdm       = RRDMutator()
1494                else:
1495                        self.rrdm       = RRDMutator( RRDTOOL )
1496
[365]1497                global DEBUG_LEVEL
[9]1498
[365]1499                if DEBUG_LEVEL <= 2:
1500                        self.gatherLastUpdates()
1501
[42]1502        def gatherLastUpdates( self ):
[63]1503                """Populate the lastStored list, containing timestamps of all last updates"""
[42]1504
1505                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1506
1507                hosts = [ ]
1508
1509                if os.path.exists( cluster_dir ):
1510
[44]1511                        dirlist = os.listdir( cluster_dir )
[42]1512
[44]1513                        for dir in dirlist:
[42]1514
[44]1515                                hosts.append( dir )
[42]1516
1517                for host in hosts:
1518
[292]1519                        host_dir        = cluster_dir + '/' + host
1520                        dirlist         = os.listdir( host_dir )
[47]1521
1522                        for dir in dirlist:
1523
1524                                if not self.timeserials.has_key( host ):
1525
1526                                        self.timeserials[ host ] = [ ]
1527
1528                                self.timeserials[ host ].append( dir )
1529
[42]1530                        last_serial = self.getLastRrdTimeSerial( host )
[292]1531
[42]1532                        if last_serial:
1533
1534                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
[292]1535
[42]1536                                if os.path.exists( metric_dir ):
1537
[44]1538                                        dirlist = os.listdir( metric_dir )
[42]1539
[44]1540                                        for file in dirlist:
[42]1541
[44]1542                                                metricname = file.split( '.rrd' )[0]
[42]1543
[44]1544                                                if not self.lastStored.has_key( host ):
[42]1545
[44]1546                                                        self.lastStored[ host ] = { }
[42]1547
[44]1548                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1549
[32]1550        def getClusterName( self ):
[63]1551                """Return clustername"""
1552
[32]1553                return self.cluster
1554
1555        def memMetric( self, host, metric ):
[63]1556                """Store metric from host in memory"""
[32]1557
[179]1558                # <ATOMIC>
1559                #
1560                self.slot.acquire()
1561               
[34]1562                if self.myMetrics.has_key( host ):
[32]1563
[34]1564                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1565
[34]1566                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1567
[34]1568                                        if mymetric['time'] == metric['time']:
[32]1569
[34]1570                                                # Allready have this metric, abort
[179]1571                                                self.slot.release()
[34]1572                                                return 1
1573                        else:
1574                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1575                else:
[292]1576                        self.myMetrics[ host ]                          = { }
1577                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
[32]1578
[63]1579                # Push new metric onto stack
1580                # atomic code; only 1 thread at a time may access the stack
1581
[32]1582                self.myMetrics[ host ][ metric['name'] ].append( metric )
1583
[40]1584                self.slot.release()
[53]1585                #
1586                # </ATOMIC>
[40]1587
[47]1588        def makeUpdateList( self, host, metriclist ):
[63]1589                """
1590                Make a list of update values for rrdupdate
1591                but only those that we didn't store before
1592                """
[37]1593
[292]1594                update_list     = [ ]
1595                metric          = None
[37]1596
[47]1597                while len( metriclist ) > 0:
[37]1598
[53]1599                        metric = metriclist.pop( 0 )
[37]1600
[53]1601                        if self.checkStoreMetric( host, metric ):
[292]1602
[365]1603                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1604                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1605                                update_list.append( u_val )
[40]1606
[37]1607                return update_list
1608
[49]1609        def checkStoreMetric( self, host, metric ):
[63]1610                """Check if supplied metric if newer than last one stored"""
[40]1611
1612                if self.lastStored.has_key( host ):
1613
[47]1614                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]1615
[47]1616                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1617
[50]1618                                        # This is old
[40]1619                                        return 0
1620
[50]1621                return 1
1622
[54]1623        def memLastUpdate( self, host, metricname, metriclist ):
[63]1624                """
1625                Memorize the time of the latest metric from metriclist
1626                but only if it wasn't allready memorized
1627                """
[50]1628
[54]1629                if not self.lastStored.has_key( host ):
1630                        self.lastStored[ host ] = { }
1631
[50]1632                last_update_time = 0
1633
1634                for metric in metriclist:
1635
[54]1636                        if metric['name'] == metricname:
[50]1637
[54]1638                                if metric['time'] > last_update_time:
[50]1639
[54]1640                                        last_update_time = metric['time']
[40]1641
[54]1642                if self.lastStored[ host ].has_key( metricname ):
[52]1643                       
[54]1644                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]1645                                return 1
[40]1646
[54]1647                self.lastStored[ host ][ metricname ] = last_update_time
[52]1648
[33]1649        def storeMetrics( self ):
[63]1650                """
1651                Store all metrics from memory to disk
1652                and do it to the RRD's in appropriate timeperiod directory
1653                """
[33]1654
[365]1655                debug_msg( 5, "Entering storeMetrics()")
1656
1657                count_values    = 0
1658                count_metrics   = 0
1659                count_bits      = 0
1660
[33]1661                for hostname, mymetrics in self.myMetrics.items():     
1662
1663                        for metricname, mymetric in mymetrics.items():
1664
[365]1665                                count_metrics += 1
1666
1667                                for dmetric in mymetric:
1668
1669                                        count_values += 1
1670
1671                                        count_bits      += len( dmetric['time'] )
1672                                        count_bits      += len( dmetric['val'] )
1673
1674                count_bytes     = count_bits / 8
1675
1676                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1677                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1678                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1679                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1680
1681                for hostname, mymetrics in self.myMetrics.items():     
1682
1683                        for metricname, mymetric in mymetrics.items():
1684
[53]1685                                metrics_to_store = [ ]
1686
[63]1687                                # Pop metrics from stack for storing until none is left
1688                                # atomic code: only 1 thread at a time may access myMetrics
1689
[53]1690                                # <ATOMIC>
[50]1691                                #
[47]1692                                self.slot.acquire() 
[33]1693
[54]1694                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1695
[54]1696                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1697
[176]1698                                                try:
1699                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1700                                                except IndexError, msg:
1701
[179]1702                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1703                                                        # is still len 0 when the statement is executed.
1704                                                        # Just ignore indexerror's..
[176]1705                                                        pass
1706
[53]1707                                self.slot.release()
1708                                #
1709                                # </ATOMIC>
1710
[47]1711                                # Create a mapping table, each metric to the period where it should be stored
1712                                #
[53]1713                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1714
[50]1715                                update_rets = [ ]
1716
[47]1717                                for period, pmetric in metric_serial_table.items():
1718
[146]1719                                        create_ret = self.createCheck( hostname, metricname, period )   
[47]1720
1721                                        update_ret = self.update( hostname, metricname, period, pmetric )
1722
1723                                        if update_ret == 0:
1724
1725                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1726                                        else:
1727                                                debug_msg( 9, 'metric update failed' )
1728
[146]1729                                        update_rets.append( create_ret )
[50]1730                                        update_rets.append( update_ret )
[47]1731
[179]1732                                # Lets ignore errors here for now, we need to make sure last update time
1733                                # is correct!
1734                                #
1735                                #if not (1) in update_rets:
[50]1736
[179]1737                                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1738
[365]1739                debug_msg( 5, "Leaving storeMetrics()")
1740
[17]1741        def makeTimeSerial( self ):
[63]1742                """Generate a time serial. Seconds since epoch"""
[17]1743
1744                # Seconds since epoch
1745                mytime = int( time.time() )
1746
1747                return mytime
1748
[50]1749        def makeRrdPath( self, host, metricname, timeserial ):
[63]1750                """Make a RRD location/path and filename"""
[17]1751
[292]1752                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1753                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
[17]1754
1755                return rrd_dir, rrd_file
1756
[20]1757        def getLastRrdTimeSerial( self, host ):
[63]1758                """Find the last timeserial (directory) for this host"""
[17]1759
[19]1760                newest_timeserial = 0
1761
[47]1762                for dir in self.timeserials[ host ]:
[32]1763
[47]1764                        valid_dir = 1
[17]1765
[47]1766                        for letter in dir:
1767                                if letter not in string.digits:
1768                                        valid_dir = 0
[17]1769
[47]1770                        if valid_dir:
1771                                timeserial = dir
1772                                if timeserial > newest_timeserial:
1773                                        newest_timeserial = timeserial
[17]1774
1775                if newest_timeserial:
[18]1776                        return newest_timeserial
[17]1777                else:
1778                        return 0
1779
[47]1780        def determinePeriod( self, host, check_serial ):
[63]1781                """Determine to which period (directory) this time(serial) belongs"""
[47]1782
1783                period_serial = 0
1784
[56]1785                if self.timeserials.has_key( host ):
[47]1786
[56]1787                        for serial in self.timeserials[ host ]:
[47]1788
[56]1789                                if check_serial >= serial and period_serial < serial:
[47]1790
[56]1791                                        period_serial = serial
1792
[47]1793                return period_serial
1794
1795        def determineSerials( self, host, metricname, metriclist ):
1796                """
1797                Determine the correct serial and corresponding rrd to store
1798                for a list of metrics
1799                """
1800
1801                metric_serial_table = { }
1802
1803                for metric in metriclist:
1804
1805                        if metric['name'] == metricname:
1806
[292]1807                                period          = self.determinePeriod( host, metric['time'] ) 
[47]1808
[292]1809                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[47]1810
[49]1811                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1812
1813                                        # This one should get it's own new period
1814                                        period = metric['time']
[57]1815
1816                                        if not self.timeserials.has_key( host ):
1817                                                self.timeserials[ host ] = [ ]
1818
[50]1819                                        self.timeserials[ host ].append( period )
[47]1820
1821                                if not metric_serial_table.has_key( period ):
1822
[49]1823                                        metric_serial_table[ period ] = [ ]
[47]1824
1825                                metric_serial_table[ period ].append( metric )
1826
1827                return metric_serial_table
1828
[33]1829        def createCheck( self, host, metricname, timeserial ):
[63]1830                """Check if an rrd allready exists for this metric, create if not"""
[9]1831
[35]1832                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]1833               
[33]1834                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]1835
[9]1836                if not os.path.exists( rrd_dir ):
[58]1837
1838                        try:
1839                                os.makedirs( rrd_dir )
1840
[169]1841                        except os.OSError, msg:
[58]1842
1843                                if msg.find( 'File exists' ) != -1:
1844
1845                                        # Ignore exists errors
1846                                        pass
1847
1848                                else:
1849
1850                                        print msg
1851                                        return
1852
[14]1853                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]1854
[14]1855                if not os.path.exists( rrd_file ):
[9]1856
[292]1857                        interval        = self.config.getInterval( self.cluster )
1858                        heartbeat       = 8 * int( interval )
[9]1859
[292]1860                        params          = [ ]
[12]1861
[37]1862                        params.append( '--step' )
1863                        params.append( str( interval ) )
[12]1864
[37]1865                        params.append( '--start' )
[47]1866                        params.append( str( int( timeserial ) - 1 ) )
[12]1867
[37]1868                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1869                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]1870
[37]1871                        self.rrdm.create( str(rrd_file), params )
1872
[14]1873                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1874
[47]1875        def update( self, host, metricname, timeserial, metriclist ):
[63]1876                """
1877                Update rrd file for host with metricname
1878                in directory timeserial with metriclist
1879                """
[9]1880
[35]1881                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]1882
[292]1883                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
[18]1884
[292]1885                update_list             = self.makeUpdateList( host, metriclist )
[15]1886
[41]1887                if len( update_list ) > 0:
1888                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]1889
[41]1890                        if ret:
1891                                return 1
[27]1892               
[41]1893                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]1894
[36]1895                return 0
1896
[169]1897def daemon():
1898        """daemonized threading"""
[8]1899
[169]1900        # Fork the first child
1901        #
1902        pid = os.fork()
1903
1904        if pid > 0:
1905
1906                sys.exit(0)  # end parent
1907
1908        # creates a session and sets the process group ID
1909        #
1910        os.setsid()
1911
1912        # Fork the second child
1913        #
1914        pid = os.fork()
1915
1916        if pid > 0:
1917
1918                sys.exit(0)  # end parent
1919
[435]1920        write_pidfile()
1921
[169]1922        # Go to the root directory and set the umask
1923        #
1924        os.chdir('/')
1925        os.umask(0)
1926
1927        sys.stdin.close()
1928        sys.stdout.close()
1929        sys.stderr.close()
1930
[273]1931        os.open('/dev/null', os.O_RDWR)
1932        os.dup2(0, 1)
1933        os.dup2(0, 2)
[169]1934
1935        run()
1936
1937def run():
1938        """Threading start"""
1939
[287]1940        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[295]1941        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[8]1942
[295]1943        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1944        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
[287]1945
[169]1946        try:
[292]1947                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1948                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]1949
[169]1950                torque_xml_thread.start()
1951                ganglia_xml_thread.start()
1952               
[176]1953        except thread.error, msg:
[169]1954                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1955                syslog.closelog()
1956                sys.exit(1)
1957               
1958        debug_msg( 0, 'main threading started.' )
[78]1959
[169]1960def main():
1961        """Program startup"""
1962
[375]1963        global DAEMONIZE, USE_SYSLOG
1964
[214]1965        if not processArgs( sys.argv[1:] ):
1966                sys.exit( 1 )
1967
[169]1968        if( DAEMONIZE and USE_SYSLOG ):
1969                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1970
1971        if DAEMONIZE:
1972                daemon()
1973        else:
1974                run()
1975
1976#
[81]1977# Global functions
[169]1978#
[81]1979
[9]1980def check_dir( directory ):
[63]1981        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]1982
1983        if directory[-1] == '/':
1984                directory = directory[:-1]
1985
1986        return directory
1987
[295]1988def reqtime2epoch( rtime ):
1989
1990        (hours, minutes, seconds )      = rtime.split( ':' )
1991
1992        etime   = int(seconds)
1993        etime   = etime + ( int(minutes) * 60 )
1994        etime   = etime + ( int(hours) * 60 * 60 )
1995
1996        return etime
1997
[12]1998def debug_msg( level, msg ):
[169]1999        """Only print msg if correct levels"""
[12]2000
[169]2001        if (not DAEMONIZE and DEBUG_LEVEL >= level):
2002                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2003       
2004        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2005                syslog.syslog( msg )
[12]2006
[46]2007def printTime( ):
[63]2008        """Print current time in human readable format"""
[46]2009
2010        return time.strftime("%a %d %b %Y %H:%M:%S")
2011
[435]2012def write_pidfile():
2013
2014        # Write pidfile if PIDFILE exists
2015        if PIDFILE:
2016
2017                pid     = os.getpid()
2018
2019                pidfile = open(PIDFILE, 'w')
2020
2021                pidfile.write( str( pid ) )
2022                pidfile.close()
2023
[63]2024# Ooohh, someone started me! Let's go..
[9]2025if __name__ == '__main__':
2026        main()
Note: See TracBrowser for help on using the repository browser.