source: trunk/jobarchived/jobarchived.py @ 447

Last change on this file since 447 was 435, checked in by bastiaans, 17 years ago

jobarchived/jobarchived.py:

  • added pidfile support

pkg/rpm/init.d/jobarchived,
pkg/rpm/jobmonarch-jobarchived.spec,
Makefile:

  • updated
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 47.3 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 435 2007-07-11 14:57:42Z bastiaans $
22#
[3]23
[284]24DEFAULT_SEARCH_PATH     = '/usr/share/jobarchived'
[3]25
[284]26import sys
27
28if DEFAULT_SEARCH_PATH not in sys.path:
29
30        sys.path.append( DEFAULT_SEARCH_PATH )
31
32import getopt, syslog, ConfigParser
33
[435]34def usage():
35
36        print
37        print 'usage: jobarchived [options]'
38        print 'options:'
39        print '      --config, -c      configuration file'
40        print '      --pidfile, -p     pid file'
41        print '      --help, -h        help'
42        print
43
[214]44def processArgs( args ):
[6]45
[435]46        SHORT_L = 'p:hc:'
47        LONG_L  = [ 'help', 'config=', 'pidfile=' ]
[169]48
[214]49        config_filename = None
[169]50
[435]51        global PIDFILE
52
53        PIDFILE = None
54
[214]55        try:
[169]56
[214]57                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]58
[214]59        except getopt.error, detail:
[60]60
[214]61                print detail
62                sys.exit(1)
[9]63
[214]64        for opt, value in opts:
[60]65
[214]66                if opt in [ '--config', '-c' ]:
[13]67
[214]68                        config_filename = value
[198]69
[435]70                if opt in [ '--pidfile', '-p' ]:
71
72                        PIDFILE         = value
73
74                if opt in [ '--help', '-h' ]:
75
76                        usage()
77                        sys.exit( 0 )
78
[214]79        if not config_filename:
[60]80
[214]81                config_filename = '/etc/jobarchived.conf'
[22]82
[214]83        try:
84                return loadConfig( config_filename )
[13]85
[214]86        except ConfigParser.NoOptionError, detail:
87
88                print detail
89                sys.exit( 1 )
90
91def loadConfig( filename ):
92
93        def getlist( cfg_string ):
94
95                my_list = [ ]
96
97                for item_txt in cfg_string.split( ',' ):
98
99                        sep_char = None
100
101                        item_txt = item_txt.strip()
102
103                        for s_char in [ "'", '"' ]:
104
105                                if item_txt.find( s_char ) != -1:
106
107                                        if item_txt.count( s_char ) != 2:
108
109                                                print 'Missing quote: %s' %item_txt
110                                                sys.exit( 1 )
111
112                                        else:
113
114                                                sep_char = s_char
115                                                break
116
117                        if sep_char:
118
119                                item_txt = item_txt.split( sep_char )[1]
120
121                        my_list.append( item_txt )
122
123                return my_list
124
125        cfg = ConfigParser.ConfigParser()
126
127        cfg.read( filename )
128
[375]129        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
[214]130
[292]131        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
[214]132
[292]133        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
[214]134
[292]135        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
[214]136
[292]137        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
[214]138
[292]139        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
[214]140
[375]141        MODRRDTOOL              = False
142
[214]143        try:
[375]144                import rrdtool
[214]145
[375]146                MODRRDTOOL              = True
147
148        except ImportError:
149
150                MODRRDTOOL              = False
151
152                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobmond significantly!" )
153
154        try:
155
[292]156                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
[214]157
158        except AttributeError, detail:
159
160                print 'Unknown syslog facility'
161                sys.exit( 1 )
162
[292]163        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
[214]164
[292]165        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
[214]166
[292]167        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
[214]168
[292]169        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
[214]170
[292]171        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
[214]172
[295]173        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
174
[292]175        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
[214]176
[292]177        RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
[224]178
[214]179        return True
180
[17]181# What XML data types not to store
[13]182#
[17]183UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]184
[47]185# Maximum time (in seconds) a parsethread may run
186#
187PARSE_TIMEOUT = 60
188
189# Maximum time (in seconds) a storethread may run
190#
191STORE_TIMEOUT = 360
192
[8]193"""
[224]194The Job Archiving Daemon
[8]195"""
196
[214]197from types import *
198
199import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
[365]200import rrdtool
[379]201from pyPgSQL import PgSQL
[214]202
[379]203# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
204# and added some more functions for postgres.
205#       
206#
207# Changed by: Bas van der Vlies <basv@sara.nl>
208#       
209# SARA API for Postgres Database
210#
211# Changed by: Ramon Bastiaans for Job Monarch
212#
213
214class InitVars:
215        Vars = {}
216       
217        def __init__(self, **key_arg):
218                for (key, value) in key_arg.items():
219                        if value:
220                                self.Vars[key] = value
221                        else:   
222                                self.Vars[key] = None
223                               
224        def __call__(self, *key):
225                key = "%s" % key
226                return self.Vars[key]
227               
228        def __getitem__(self, key):
229                return self.Vars[key]
230               
231        def __repr__(self):
232                return repr(self.Vars)
233               
234        def keys(self):
235                barf =  map(None, self.Vars.keys())
236                return barf
237               
238        def values(self):
239                barf =  map(None, self.Vars.values())
240                return barf
241               
242        def has_key(self, key):
243                if self.Vars.has_key(key):
244                        return 1
245                else:   
246                        return 0
247                       
248class DBError(Exception):
249        def __init__(self, msg=''):
250                self.msg = msg
251                Exception.__init__(self, msg)
252        def __repr__(self):
253                return self.msg
254        __str__ = __repr__
255
256#
257# Class to connect to a database
258# and return the queury in a list or dictionairy.
259#
260class DB:
261        def __init__(self, db_vars):
262
263                self.dict = db_vars
264
265                if self.dict.has_key('User'):
266                        self.user = self.dict['User']
267                else:
268                        self.user = 'postgres'
269
270                if self.dict.has_key('Host'):
271                        self.host = self.dict['Host']
272                else:
273                        self.host = 'localhost'
274
275                if self.dict.has_key('Password'):
276                        self.passwd = self.dict['Password']
277                else:
278                        self.passwd = ''
279
280                if self.dict.has_key('DataBaseName'):
281                        self.db = self.dict['DataBaseName']
282                else:
283                        self.db = 'uva_cluster_db'
284
285                # connect_string = 'host:port:database:user:password:
286                dsn = "%s::%s:%s:%s" %(self.host, self.db, self.user, self.passwd)
287
288                try:
289                        self.SQL = PgSQL.connect(dsn)
290                except PgSQL.Error, details:
291                        str = "%s" %details
292                        raise DBError(str)
293
294        def __repr__(self):
295                return repr(self.result)
296
297        def __nonzero__(self):
298                return not(self.result == None)
299
300        def __len__(self):
301                return len(self.result)
302
303        def __getitem__(self,i):
304                return self.result[i]
305
306        def __getslice__(self,i,j):
307                return self.result[i:j]
308
309        def Get(self, q_str):
310                c = self.SQL.cursor()
311                try:
312                        c.execute(q_str)
313                        result = c.fetchall()
314                except PgSQL.Error, details:
315                        c.close()
316                        str = "%s" %details
317                        raise DBError(str)
318
319                c.close()
320                return result
321
322        def Set(self, q_str):
323                c = self.SQL.cursor()
324                try:
325                        c.execute(q_str)
326                        result = c.oidValue
327
328                except PgSQL.Error, details:
329                        c.close()
330                        str = "%s" %details
331                        raise DBError(str)
332
333                c.close()
334                return result
335
336        def Commit(self):
337                self.SQL.commit()
338
[84]339class DataSQLStore:
340
341        db_vars = None
342        dbc = None
343
344        def __init__( self, hostname, database ):
345
[379]346                self.db_vars = InitVars(DataBaseName=database,
[84]347                                User='root',
348                                Host=hostname,
349                                Password='',
350                                Dictionary='true')
351
352                try:
[379]353                        self.dbc     = DB(self.db_vars)
354                except DBError, details:
[169]355                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
[84]356                        sys.exit(1)
357
358        def setDatabase(self, statement):
359                ret = self.doDatabase('set', statement)
360                return ret
361               
362        def getDatabase(self, statement):
363                ret = self.doDatabase('get', statement)
364                return ret
365
366        def doDatabase(self, type, statement):
367
[365]368                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
[84]369                try:
370                        if type == 'set':
371                                result = self.dbc.Set( statement )
372                                self.dbc.Commit()
373                        elif type == 'get':
374                                result = self.dbc.Get( statement )
375                               
[379]376                except DBError, detail:
[84]377                        operation = statement.split(' ')[0]
[169]378                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
[84]379                        sys.exit(1)
380
[365]381                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
[84]382                return result
383
[191]384        def getJobNodeId( self, job_id, node_id ):
385
386                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
387                if len( id ) > 0:
388
389                        if len( id[0] ) > 0 and id[0] != '':
390                       
391                                return 1
392
393                return 0
394
[84]395        def getNodeId( self, hostname ):
396
[89]397                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]398
[89]399                if len( id ) > 0:
[84]400
[89]401                        id = id[0][0]
402
[84]403                        return id
404                else:
405                        return None
406
407        def getNodeIds( self, hostnames ):
408
409                ids = [ ]
410
411                for node in hostnames:
412
413                        id = self.getNodeId( node )
414
415                        if id:
416                                ids.append( id )
417
418                return ids
419
420        def getJobId( self, jobid ):
421
422                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
423
424                if id:
[89]425                        id = id[0][0]
[84]426
427                        return id
428                else:
429                        return None
430
431        def addJob( self, job_id, jobattrs ):
432
433                if not self.getJobId( job_id ):
434
435                        self.mutateJob( 'insert', job_id, jobattrs ) 
436                else:
437                        self.mutateJob( 'update', job_id, jobattrs )
438
439        def mutateJob( self, action, job_id, jobattrs ):
440
[292]441                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
[84]442
[292]443                insert_col_str  = 'job_id'
444                insert_val_str  = "'%s'" %job_id
445                update_str      = None
[84]446
[365]447                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
[84]448
[99]449                ids = [ ]
450
[84]451                for valname, value in jobattrs.items():
452
[96]453                        if valname in job_values and value != '':
[84]454
455                                column_name = 'job_' + valname
456
457                                if action == 'insert':
458
459                                        if not insert_col_str:
460                                                insert_col_str = column_name
461                                        else:
462                                                insert_col_str = insert_col_str + ',' + column_name
463
464                                        if not insert_val_str:
465                                                insert_val_str = value
466                                        else:
467                                                insert_val_str = insert_val_str + ",'%s'" %value
468
469                                elif action == 'update':
470                                       
471                                        if not update_str:
472                                                update_str = "%s='%s'" %(column_name, value)
473                                        else:
474                                                update_str = update_str + ",%s='%s'" %(column_name, value)
475
[90]476                        elif valname == 'nodes' and value:
[84]477
[191]478                                node_valid = 1
[190]479
480                                if len(value) == 1:
481                               
[191]482                                        if jobattrs['status'] == 'Q':
[190]483
[191]484                                                node_valid = 0
[190]485
[191]486                                        else:
[190]487
[191]488                                                node_valid = 0
[190]489
[191]490                                                for node_char in str(value[0]):
[190]491
[191]492                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]493
[191]494                                                                node_valid = 1
[84]495
[191]496                                if node_valid:
497
498                                        ids = self.addNodes( value, jobattrs['domain'] )
499
[84]500                if action == 'insert':
501
502                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]503
[84]504                elif action == 'update':
505
[89]506                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
[84]507
[191]508                if len( ids ) > 0:
509                        self.addJobNodes( job_id, ids )
[190]510
[154]511        def addNodes( self, hostnames, domain ):
[84]512
[98]513                ids = [ ]
514
[84]515                for node in hostnames:
516
[292]517                        node    = '%s.%s' %( node, domain )
518                        id      = self.getNodeId( node )
[84]519       
520                        if not id:
521                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
[98]522                                id = self.getNodeId( node )
[84]523
[98]524                        ids.append( id )
525
526                return ids
527
[86]528        def addJobNodes( self, jobid, nodes ):
529
530                for node in nodes:
531
[191]532                        if not self.getJobNodeId( jobid, node ):
533
534                                self.addJobNode( jobid, node )
535
[84]536        def addJobNode( self, jobid, nodeid ):
537
538                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
539
540        def storeJobInfo( self, jobid, jobattrs ):
541
542                self.addJob( jobid, jobattrs )
543
[295]544        def checkStaleJobs( self ):
545
546                q = "SELECT * from jobs WHERE job_status != 'F'"
547
548                r = self.getDatabase( q )
549
550                if len( r ) == 0:
551
552                        return None
553
554                cleanjobs       = [ ]
555                timeoutjobs     = [ ]
556
557                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
558                cur_time        = time.time()
559
560                for row in r:
561
562                        job_id                  = row[0]
563                        job_requested_time      = row[4]
564                        job_status              = row[7]
565                        job_start_timestamp     = row[8]
566
567                        if job_status == 'Q' or not job_start_timestamp:
568
569                                cleanjobs.append( job_id )
570
571                        else:
572
573                                start_timestamp = int( job_start_timestamp )
574
575                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
576
577                                        if job_requested_time:
578
579                                                rtime_epoch     = reqtime2epoch( job_requested_time )
580                                        else:
581                                                rtime_epoch     = None
582                                       
583                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
584
585                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
586
587                for j in cleanjobs:
588
589                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
590                        self.setDatabase( q )
591
592                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
593
594                for j in timeoutjobs:
595
596                        ( i, s, r )             = j
597
598                        if r:
599                                new_end_timestamp       = int( s ) + r
600
601                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
602                        self.setDatabase( q )
603
[37]604class RRDMutator:
[63]605        """A class for performing RRD mutations"""
[37]606
[277]607        binary = None
[37]608
[365]609
[37]610        def __init__( self, binary=None ):
[63]611                """Set alternate binary if supplied"""
[37]612
613                if binary:
614                        self.binary = binary
615
616        def create( self, filename, args ):
[63]617                """Create a new rrd with args"""
618
[375]619                global MODRRDTOOL
[37]620
[375]621                if MODRRDTOOL:
622                        return self.perform( 'create', filename, args )
623                else:
624                        return self.perform( 'create', '"' + filename + '"', args )
625
[37]626        def update( self, filename, args ):
[63]627                """Update a rrd with args"""
628
[375]629                global MODRRDTOOL
[37]630
[375]631                if MODRRDTOOL:
632                        return self.perform( 'update', filename, args )
633                else:
634                        return self.perform( 'update', '"' + filename + '"', args )
635
[42]636        def grabLastUpdate( self, filename ):
[63]637                """Determine the last update time of filename rrd"""
[42]638
[375]639                global MODRRDTOOL
640
[42]641                last_update = 0
642
[375]643                if MODRRDTOOL:
[53]644
[375]645                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
[42]646
[375]647                        rrd_header      = { }
[292]648
[375]649                        try:
650                                rrd_header      = rrdtool.info( filename )
651                        except rrdtool.error, msg:
652                                debug_msg( 8, str( msg ) )
653
654                        if rrd_header.has_key( 'last_update' ):
655                                return last_update
656                        else:
657                                return 0
658
[42]659                else:
[375]660                        debug_msg( 8, self.binary + ' info ' + filename )
[42]661
[375]662                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
663
664                        for line in my_pipe.readlines():
665
666                                if line.find( 'last_update') != -1:
667
668                                        last_update = line.split( ' = ' )[1]
669
670                        if my_pipe:
671
672                                my_pipe.close()
673
674                        if last_update:
675                                return last_update
676                        else:
677                                return 0
678
679
[40]680        def perform( self, action, filename, args ):
[63]681                """Perform action on rrd filename with args"""
[37]682
[375]683                global MODRRDTOOL
684
[37]685                arg_string = None
686
[40]687                if type( args ) is not ListType:
688                        debug_msg( 8, 'Arguments needs to be of type List' )
689                        return 1
690
[37]691                for arg in args:
692
693                        if not arg_string:
694
695                                arg_string = arg
696                        else:
697                                arg_string = arg_string + ' ' + arg
698
[375]699                if MODRRDTOOL:
[37]700
[375]701                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
[292]702
[375]703                        try:
704                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
[37]705
[375]706                                if action == 'create':
[146]707
[375]708                                        rrdtool.create( str( filename ), *args )
[37]709
[375]710                                elif action == 'update':
[37]711
[375]712                                        rrdtool.update( str( filename ), *args )
[365]713
[375]714                        except rrdtool.error, msg:
[365]715
[375]716                                error_msg = str( msg )
717                                debug_msg( 8, error_msg )
718                                return 1
719
720                else:
721
722                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
723
724                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
725                        lines   = cmd.readlines()
726
727                        cmd.close()
728
729                        for line in lines:
730
731                                if line.find( 'ERROR' ) != -1:
732
733                                        error_msg = string.join( line.split( ' ' )[1:] )
734                                        debug_msg( 8, error_msg )
735                                        return 1
736
[37]737                return 0
738
[78]739class XMLProcessor:
740        """Skeleton class for XML processor's"""
741
742        def run( self ):
743                """Do main processing of XML here"""
744
745                pass
746
747class TorqueXMLProcessor( XMLProcessor ):
748        """Main class for processing XML and acting with it"""
749
[295]750        def __init__( self, XMLSource, DataStore ):
[78]751                """Setup initial XML connection and handlers"""
752
[293]753                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]754                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
755                self.myXMLSource        = XMLSource
[295]756                self.myXMLHandler       = TorqueXMLHandler( DataStore )
[287]757                self.myXMLError         = XMLErrorHandler()
[78]758
[287]759                self.config             = GangliaConfigParser( GMETAD_CONF )
760
[78]761        def run( self ):
762                """Main XML processing"""
763
[169]764                debug_msg( 1, 'torque_xml_thread(): started.' )
[87]765
[78]766                while( 1 ):
767
[287]768                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
[169]769                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
[176]770
[287]771                        my_data = self.myXMLSource.getData()
772
[176]773                        try:
[287]774                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]775                        except socket.error, msg:
776                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
777                               
[169]778                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
779                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[87]780                        time.sleep( self.config.getLowestInterval() )
[78]781
[71]782class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
[63]783        """Parse Torque's jobinfo XML from our plugin"""
784
[72]785        jobAttrs = { }
786
[295]787        def __init__( self, datastore ):
[84]788
[295]789                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
790                self.ds                 = datastore
[292]791                self.jobs_processed     = [ ]
792                self.jobs_to_store      = [ ]
[84]793
[183]794        def startDocument( self ):
795
[292]796                self.heartbeat  = 0
[365]797                self.elementct  = 0
[183]798
[63]799        def startElement( self, name, attrs ):
800                """
801                This XML will be all gmetric XML
802                so there will be no specific start/end element
803                just one XML statement with all info
804                """
805               
[73]806                jobinfo = { }
[63]807
[365]808                self.elementct  += 1
809
[199]810                if name == 'CLUSTER':
[63]811
[372]812                        self.clustername = str( attrs.get( 'NAME', "" ) )
[199]813
814                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
815
[372]816                        metricname = str( attrs.get( 'NAME', "" ) )
[63]817
[285]818                        if metricname == 'MONARCH-HEARTBEAT':
[372]819                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
[63]820
[285]821                        elif metricname.find( 'MONARCH-JOB' ) != -1:
[63]822
[292]823                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
[372]824                                val     = str( attrs.get( 'VAL', "" ) )
[63]825
[96]826                                if not job_id in self.jobs_processed:
[292]827
[96]828                                        self.jobs_processed.append( job_id )
829
[73]830                                check_change = 0
831
832                                if self.jobAttrs.has_key( job_id ):
[292]833
[73]834                                        check_change = 1
835
[63]836                                valinfo = val.split( ' ' )
837
838                                for myval in valinfo:
839
[84]840                                        if len( myval.split( '=' ) ) > 1:
[63]841
[292]842                                                valname = myval.split( '=' )[0]
843                                                value   = myval.split( '=' )[1]
[70]844
[84]845                                                if valname == 'nodes':
846                                                        value = value.split( ';' )
[72]847
[84]848                                                jobinfo[ valname ] = value
849
[73]850                                if check_change:
[182]851                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
[292]852                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
853                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
[84]854                                                if not job_id in self.jobs_to_store:
855                                                        self.jobs_to_store.append( job_id )
856
[365]857                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]858                                else:
859                                        self.jobAttrs[ job_id ] = jobinfo
[84]860
861                                        if not job_id in self.jobs_to_store:
862                                                self.jobs_to_store.append( job_id )
863
[365]864                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
[73]865                                       
[77]866        def endDocument( self ):
[74]867                """When all metrics have gone, check if any jobs have finished"""
[72]868
[371]869                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
[365]870
[182]871                if self.heartbeat:
872                        for jobid, jobinfo in self.jobAttrs.items():
[74]873
[182]874                                # This is an old job, not in current jobinfo list anymore
875                                # it must have finished, since we _did_ get a new heartbeat
876                                #
877                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
[102]878
[184]879                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
[74]880
[182]881                                        if not jobid in self.jobs_processed:
882                                                self.jobs_processed.append( jobid )
[96]883
[182]884                                        self.jobAttrs[ jobid ]['status'] = 'F'
[360]885                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
[96]886
[182]887                                        if not jobid in self.jobs_to_store:
888                                                self.jobs_to_store.append( jobid )
[74]889
[182]890                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
[87]891
[182]892                        for jobid in self.jobs_to_store:
[295]893                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
[84]894
[184]895                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
896
897                                        if self.jobAttrs[ jobid ]['status'] == 'F':
898                                                del self.jobAttrs[ jobid ]
899
[182]900                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
[87]901
[292]902                        self.jobs_processed     = [ ]
903                        self.jobs_to_store      = [ ]
[84]904
[82]905        def setJobAttrs( self, old, new ):
906                """
907                Set new job attributes in old, but not lose existing fields
908                if old attributes doesn't have those
909                """
910
911                for valname, value in new.items():
912                        old[ valname ] = value
913
914                return old
915               
916
[73]917        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
[74]918                """
919                Check if jobinfo has changed from jobattrs[jobid]
920                if it's report time is bigger than previous one
921                and it is report time is recent (equal to heartbeat)
922                """
[72]923
[87]924                ignore_changes = [ 'reported' ]
925
[73]926                if jobattrs.has_key( jobid ):
927
928                        for valname, value in jobinfo.items():
929
[87]930                                if valname not in ignore_changes:
[73]931
[87]932                                        if jobattrs[ jobid ].has_key( valname ):
[73]933
[87]934                                                if value != jobattrs[ jobid ][ valname ]:
[73]935
[87]936                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
[360]937                                                                return True
[73]938
[87]939                                        else:
[360]940                                                return True
[87]941
[360]942                return False
[73]943
[71]944class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[63]945        """Parse Ganglia's XML"""
[3]946
[295]947        def __init__( self, config, datastore ):
[63]948                """Setup initial variables and gather info on existing rrd archive"""
949
[292]950                self.config     = config
951                self.clusters   = { }
[295]952                self.ds         = datastore
[324]953
[295]954                debug_msg( 1, 'Checking database..' )
[365]955
956                global DEBUG_LEVEL
957
958                if DEBUG_LEVEL <= 2:
959                        self.ds.checkStaleJobs()
960
[295]961                debug_msg( 1, 'Check done.' )
[296]962                debug_msg( 1, 'Checking rrd archive..' )
[293]963                self.gatherClusters()
[169]964                debug_msg( 1, 'Check done.' )
[33]965
[44]966        def gatherClusters( self ):
[63]967                """Find all existing clusters in archive dir"""
[44]968
[292]969                archive_dir     = check_dir(ARCHIVE_PATH)
[44]970
[292]971                hosts           = [ ]
[44]972
973                if os.path.exists( archive_dir ):
974
[292]975                        dirlist = os.listdir( archive_dir )
[44]976
[369]977                        for cfgcluster in ARCHIVE_DATASOURCES:
978
979                                if cfgcluster not in dirlist:
980
981                                        # Autocreate a directory for this cluster
982                                        # assume it is new
983                                        #
984                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
985
986                                        os.mkdir( cluster_dir )
987
[370]988                                        dirlist.append( cfgcluster )
989
[44]990                        for item in dirlist:
991
992                                clustername = item
993
[60]994                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]995
996                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
997
[365]998                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
999
[6]1000        def startElement( self, name, attrs ):
[63]1001                """Memorize appropriate data from xml start tags"""
[3]1002
[7]1003                if name == 'GANGLIA_XML':
[32]1004
[372]1005                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
1006                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
[32]1007
[12]1008                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]1009
[7]1010                elif name == 'GRID':
[32]1011
[372]1012                        self.gridName   = str( attrs.get( 'NAME', "" ) )
1013                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
[32]1014
[12]1015                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]1016
[7]1017                elif name == 'CLUSTER':
[32]1018
[372]1019                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
1020                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
[32]1021
[60]1022                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]1023
[34]1024                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]1025
[35]1026                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]1027
[60]1028                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]1029
[372]1030                        self.hostName           = str( attrs.get( 'NAME', "" ) )
1031                        self.hostIp             = str( attrs.get( 'IP', "" ) )
1032                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
[32]1033
[12]1034                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]1035
[60]1036                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]1037
[372]1038                        type = str( attrs.get( 'TYPE', "" ) )
[198]1039                       
1040                        exclude_metric = False
1041                       
1042                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]1043
[372]1044                                orig_name = str( attrs.get( 'NAME', "" ) )
[3]1045
[198]1046                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1047                               
1048                                        exclude_metric = True
1049
1050                                elif re.match( ex_metricstr, orig_name ):
1051
1052                                        exclude_metric = True
1053
1054                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1055
[292]1056                                myMetric                = { }
[372]1057                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
1058                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
[292]1059                                myMetric['time']        = self.hostReported
[3]1060
[34]1061                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]1062
[34]1063                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]1064
[34]1065        def storeMetrics( self ):
[63]1066                """Store metrics of each cluster rrd handler"""
[9]1067
[34]1068                for clustername, rrdh in self.clusters.items():
[16]1069
[38]1070                        ret = rrdh.storeMetrics()
[9]1071
[38]1072                        if ret:
1073                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1074                                return 1
1075
1076                return 0
1077
[71]1078class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1079
1080        def error( self, exception ):
1081                """Recoverable error"""
1082
[169]1083                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]1084
1085        def fatalError( self, exception ):
1086                """Non-recoverable error"""
1087
1088                exception_str = str( exception )
1089
1090                # Ignore 'no element found' errors
1091                if exception_str.find( 'no element found' ) != -1:
[169]1092                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
[71]1093                        return 0
1094
[170]1095                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]1096                sys.exit( 1 )
1097
1098        def warning( self, exception ):
1099                """Warning"""
1100
1101                debug_msg( 0, 'Warning ' + str( exception ) )
1102
[78]1103class XMLGatherer:
[63]1104        """Setup a connection and file object to Ganglia's XML"""
[3]1105
[287]1106        s               = None
1107        fd              = None
1108        data            = None
[293]1109        slot            = None
[8]1110
[287]1111        # Time since the last update
1112        #
1113        LAST_UPDATE     = 0
1114
1115        # Minimum interval between updates
1116        #
1117        MIN_UPDATE_INT  = 10
1118
1119        # Is a update occuring now
1120        #
1121        update_now      = False
1122
[8]1123        def __init__( self, host, port ):
[63]1124                """Store host and port for connection"""
[8]1125
[293]1126                self.host       = host
1127                self.port       = port
1128                self.slot       = threading.Lock()
[3]1129
[287]1130                self.retrieveData()
1131
1132        def retrieveData( self ):
[63]1133                """Setup connection to XML source"""
[8]1134
[287]1135                self.update_now = True
1136
[293]1137                self.slot.acquire()
1138
[8]1139                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]1140
[5]1141                        af, socktype, proto, canonname, sa = res
[32]1142
[5]1143                        try:
[32]1144
[8]1145                                self.s = socket.socket( af, socktype, proto )
[32]1146
[5]1147                        except socket.error, msg:
[32]1148
[8]1149                                self.s = None
[5]1150                                continue
[32]1151
[5]1152                        try:
[32]1153
[8]1154                                self.s.connect( sa )
[32]1155
[5]1156                        except socket.error, msg:
[32]1157
[70]1158                                self.disconnect()
[5]1159                                continue
[32]1160
[5]1161                        break
[3]1162
[8]1163                if self.s is None:
[32]1164
[170]1165                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
[287]1166                        self.update_now = False
[33]1167                        sys.exit( 1 )
[5]1168
[287]1169                else:
[324]1170                        #self.s.send( '\n' )
[287]1171
1172                        my_fp                   = self.s.makefile( 'r' )
1173                        my_data                 = my_fp.readlines()
1174                        my_data                 = string.join( my_data, '' )
1175
1176                        self.data               = my_data
1177
1178                        self.LAST_UPDATE        = time.time()
1179
[293]1180                self.slot.release()
1181
[287]1182                self.update_now = False
1183
[33]1184        def disconnect( self ):
[63]1185                """Close socket"""
[33]1186
1187                if self.s:
[287]1188                        #self.s.shutdown( 2 )
[33]1189                        self.s.close()
1190                        self.s = None
1191
1192        def __del__( self ):
[63]1193                """Kill the socket before we leave"""
[33]1194
1195                self.disconnect()
1196
[287]1197        def reGetData( self ):
[70]1198                """Reconnect"""
[33]1199
[287]1200                while self.update_now:
1201
1202                        # Must be another update in progress:
1203                        # Wait until the update is complete
1204                        #
1205                        time.sleep( 1 )
1206
[38]1207                if self.s:
1208                        self.disconnect()
[33]1209
[287]1210                self.retrieveData()
[5]1211
[287]1212        def getData( self ):
1213
1214                """Return the XML data"""
1215
1216                # If more than MIN_UPDATE_INT seconds passed since last data update
1217                # update the XML first before returning it
1218                #
1219
1220                cur_time        = time.time()
1221
1222                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1223
1224                        self.reGetData()
1225
1226                while self.update_now:
1227
1228                        # Must be another update in progress:
1229                        # Wait until the update is complete
1230                        #
1231                        time.sleep( 1 )
1232                       
1233                return self.data
1234
[70]1235        def makeFileDescriptor( self ):
1236                """Make file descriptor that points to our socket connection"""
1237
1238                self.reconnect()
1239
1240                if self.s:
1241                        self.fd = self.s.makefile( 'r' )
1242
1243        def getFileObject( self ):
1244                """Connect, and return a file object"""
1245
[78]1246                self.makeFileDescriptor()
1247
[70]1248                if self.fd:
1249                        return self.fd
1250
[78]1251class GangliaXMLProcessor( XMLProcessor ):
[63]1252        """Main class for processing XML and acting with it"""
[5]1253
[295]1254        def __init__( self, XMLSource, DataStore ):
[63]1255                """Setup initial XML connection and handlers"""
[33]1256
[287]1257                self.config             = GangliaConfigParser( GMETAD_CONF )
[33]1258
[293]1259                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[287]1260                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1261                self.myXMLSource        = XMLSource
[295]1262                self.ds                 = DataStore
1263                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
[287]1264                self.myXMLError         = XMLErrorHandler()
[73]1265
[9]1266        def run( self ):
[63]1267                """Main XML processing; start a xml and storethread"""
[8]1268
[102]1269                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1270                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]1271
[36]1272                while( 1 ):
1273
[102]1274                        if not xml_thread.isAlive():
[36]1275                                # Gather XML at the same interval as gmetad
1276
1277                                # threaded call to: self.processXML()
1278                                #
[169]1279                                try:
1280                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1281                                        xml_thread.start()
[176]1282                                except thread.error, msg:
[169]1283                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1284                                        #return 1
[36]1285
[102]1286                        if not store_thread.isAlive():
[55]1287                                # Store metrics every .. sec
[36]1288
[55]1289                                # threaded call to: self.storeMetrics()
1290                                #
[169]1291                                try:
1292                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1293                                        store_thread.start()
[176]1294                                except thread.error, msg:
[169]1295                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1296                                        #return 1
[36]1297               
1298                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1299                        time.sleep( 1 ) 
1300
[33]1301        def storeMetrics( self ):
[63]1302                """Store metrics retained in memory to disk"""
[22]1303
[365]1304                global DEBUG_LEVEL
1305
[63]1306                # Store metrics somewhere between every 360 and 640 seconds
[38]1307                #
[365]1308                if DEBUG_LEVEL > 2:
1309                        #STORE_INTERVAL = 60
1310                        STORE_INTERVAL = random.randint( 360, 640 )
1311                else:
1312                        STORE_INTERVAL = random.randint( 360, 640 )
[22]1313
[169]1314                try:
1315                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1316                        store_metric_thread.start()
[176]1317                except thread.error, msg:
[169]1318                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1319                        return 1
[36]1320
[169]1321                debug_msg( 1, 'ganglia_store_thread(): started.' )
1322
1323                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]1324                time.sleep( STORE_INTERVAL )
[169]1325                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]1326
[102]1327                if store_metric_thread.isAlive():
[36]1328
[169]1329                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[136]1330                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[169]1331                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
[36]1332
[169]1333                debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]1334
1335                return 0
1336
[39]1337        def storeThread( self ):
[63]1338                """Actual metric storing thread"""
[39]1339
[169]1340                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1341                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[78]1342                ret = self.myXMLHandler.storeMetrics()
[176]1343                if ret > 0:
1344                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[169]1345                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1346                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
[39]1347               
[176]1348                return 0
[39]1349
[8]1350        def processXML( self ):
[63]1351                """Process XML"""
[8]1352
[169]1353                try:
1354                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1355                        parsethread.start()
[176]1356                except thread.error, msg:
[169]1357                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1358                        return 1
[8]1359
[169]1360                debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]1361
[169]1362                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]1363                time.sleep( float( self.config.getLowestInterval() ) ) 
[169]1364                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]1365
1366                if parsethread.isAlive():
1367
[169]1368                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[47]1369                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[169]1370                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
[36]1371
[169]1372                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]1373
1374                return 0
1375
[39]1376        def parseThread( self ):
[63]1377                """Actual parsing thread"""
[39]1378
[169]1379                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1380                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
[287]1381                #self.myXMLSource = self.myXMLGatherer.getFileObject()
1382               
1383                my_data = self.myXMLSource.getData()
[176]1384
[293]1385                #print my_data
1386
[176]1387                try:
[287]1388                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
[176]1389                except socket.error, msg:
1390                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1391
[169]1392                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1393                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]1394
[176]1395                return 0
[39]1396
[9]1397class GangliaConfigParser:
1398
[34]1399        sources = [ ]
[9]1400
1401        def __init__( self, config ):
[63]1402                """Parse some stuff from our gmetad's config, such as polling interval"""
[32]1403
[9]1404                self.config = config
1405                self.parseValues()
1406
[32]1407        def parseValues( self ):
[63]1408                """Parse certain values from gmetad.conf"""
[9]1409
1410                readcfg = open( self.config, 'r' )
1411
1412                for line in readcfg.readlines():
1413
1414                        if line.count( '"' ) > 1:
1415
[10]1416                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]1417
[292]1418                                        source          = { }
1419                                        source['name']  = line.split( '"' )[1]
1420                                        source_words    = line.split( '"' )[2].split( ' ' )
[9]1421
1422                                        for word in source_words:
1423
1424                                                valid_interval = 1
1425
1426                                                for letter in word:
[32]1427
[9]1428                                                        if letter not in string.digits:
[32]1429
[9]1430                                                                valid_interval = 0
1431
[10]1432                                                if valid_interval and len(word) > 0:
[32]1433
[9]1434                                                        source['interval'] = word
[12]1435                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]1436       
1437                                        # No interval found, use Ganglia's default     
1438                                        if not source.has_key( 'interval' ):
1439                                                source['interval'] = 15
1440                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1441
[33]1442                                        self.sources.append( source )
[9]1443
1444        def getInterval( self, source_name ):
[63]1445                """Return interval for source_name"""
[32]1446
[9]1447                for source in self.sources:
[32]1448
[12]1449                        if source['name'] == source_name:
[32]1450
[9]1451                                return source['interval']
[32]1452
[9]1453                return None
1454
[34]1455        def getLowestInterval( self ):
[63]1456                """Return the lowest interval of all clusters"""
[34]1457
1458                lowest_interval = 0
1459
1460                for source in self.sources:
1461
1462                        if not lowest_interval or source['interval'] <= lowest_interval:
1463
1464                                lowest_interval = source['interval']
1465
1466                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1467                if lowest_interval:
1468                        return lowest_interval
1469                else:
1470                        return 15
1471
[9]1472class RRDHandler:
[63]1473        """Class for handling RRD activity"""
[9]1474
[32]1475        myMetrics = { }
[40]1476        lastStored = { }
[47]1477        timeserials = { }
[36]1478        slot = None
[32]1479
[33]1480        def __init__( self, config, cluster ):
[63]1481                """Setup initial variables"""
[78]1482
[292]1483                self.block      = 0
1484                self.cluster    = cluster
1485                self.config     = config
1486                self.slot       = threading.Lock()
1487                self.rrdm       = RRDMutator( RRDTOOL )
1488
[365]1489                global DEBUG_LEVEL
[9]1490
[365]1491                if DEBUG_LEVEL <= 2:
1492                        self.gatherLastUpdates()
1493
[42]1494        def gatherLastUpdates( self ):
[63]1495                """Populate the lastStored list, containing timestamps of all last updates"""
[42]1496
1497                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1498
1499                hosts = [ ]
1500
1501                if os.path.exists( cluster_dir ):
1502
[44]1503                        dirlist = os.listdir( cluster_dir )
[42]1504
[44]1505                        for dir in dirlist:
[42]1506
[44]1507                                hosts.append( dir )
[42]1508
1509                for host in hosts:
1510
[292]1511                        host_dir        = cluster_dir + '/' + host
1512                        dirlist         = os.listdir( host_dir )
[47]1513
1514                        for dir in dirlist:
1515
1516                                if not self.timeserials.has_key( host ):
1517
1518                                        self.timeserials[ host ] = [ ]
1519
1520                                self.timeserials[ host ].append( dir )
1521
[42]1522                        last_serial = self.getLastRrdTimeSerial( host )
[292]1523
[42]1524                        if last_serial:
1525
1526                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
[292]1527
[42]1528                                if os.path.exists( metric_dir ):
1529
[44]1530                                        dirlist = os.listdir( metric_dir )
[42]1531
[44]1532                                        for file in dirlist:
[42]1533
[44]1534                                                metricname = file.split( '.rrd' )[0]
[42]1535
[44]1536                                                if not self.lastStored.has_key( host ):
[42]1537
[44]1538                                                        self.lastStored[ host ] = { }
[42]1539
[44]1540                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1541
[32]1542        def getClusterName( self ):
[63]1543                """Return clustername"""
1544
[32]1545                return self.cluster
1546
1547        def memMetric( self, host, metric ):
[63]1548                """Store metric from host in memory"""
[32]1549
[179]1550                # <ATOMIC>
1551                #
1552                self.slot.acquire()
1553               
[34]1554                if self.myMetrics.has_key( host ):
[32]1555
[34]1556                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1557
[34]1558                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1559
[34]1560                                        if mymetric['time'] == metric['time']:
[32]1561
[34]1562                                                # Allready have this metric, abort
[179]1563                                                self.slot.release()
[34]1564                                                return 1
1565                        else:
1566                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1567                else:
[292]1568                        self.myMetrics[ host ]                          = { }
1569                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
[32]1570
[63]1571                # Push new metric onto stack
1572                # atomic code; only 1 thread at a time may access the stack
1573
[32]1574                self.myMetrics[ host ][ metric['name'] ].append( metric )
1575
[40]1576                self.slot.release()
[53]1577                #
1578                # </ATOMIC>
[40]1579
[47]1580        def makeUpdateList( self, host, metriclist ):
[63]1581                """
1582                Make a list of update values for rrdupdate
1583                but only those that we didn't store before
1584                """
[37]1585
[292]1586                update_list     = [ ]
1587                metric          = None
[37]1588
[47]1589                while len( metriclist ) > 0:
[37]1590
[53]1591                        metric = metriclist.pop( 0 )
[37]1592
[53]1593                        if self.checkStoreMetric( host, metric ):
[292]1594
[365]1595                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1596                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1597                                update_list.append( u_val )
[40]1598
[37]1599                return update_list
1600
[49]1601        def checkStoreMetric( self, host, metric ):
[63]1602                """Check if supplied metric if newer than last one stored"""
[40]1603
1604                if self.lastStored.has_key( host ):
1605
[47]1606                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]1607
[47]1608                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1609
[50]1610                                        # This is old
[40]1611                                        return 0
1612
[50]1613                return 1
1614
[54]1615        def memLastUpdate( self, host, metricname, metriclist ):
[63]1616                """
1617                Memorize the time of the latest metric from metriclist
1618                but only if it wasn't allready memorized
1619                """
[50]1620
[54]1621                if not self.lastStored.has_key( host ):
1622                        self.lastStored[ host ] = { }
1623
[50]1624                last_update_time = 0
1625
1626                for metric in metriclist:
1627
[54]1628                        if metric['name'] == metricname:
[50]1629
[54]1630                                if metric['time'] > last_update_time:
[50]1631
[54]1632                                        last_update_time = metric['time']
[40]1633
[54]1634                if self.lastStored[ host ].has_key( metricname ):
[52]1635                       
[54]1636                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]1637                                return 1
[40]1638
[54]1639                self.lastStored[ host ][ metricname ] = last_update_time
[52]1640
[33]1641        def storeMetrics( self ):
[63]1642                """
1643                Store all metrics from memory to disk
1644                and do it to the RRD's in appropriate timeperiod directory
1645                """
[33]1646
[365]1647                debug_msg( 5, "Entering storeMetrics()")
1648
1649                count_values    = 0
1650                count_metrics   = 0
1651                count_bits      = 0
1652
[33]1653                for hostname, mymetrics in self.myMetrics.items():     
1654
1655                        for metricname, mymetric in mymetrics.items():
1656
[365]1657                                count_metrics += 1
1658
1659                                for dmetric in mymetric:
1660
1661                                        count_values += 1
1662
1663                                        count_bits      += len( dmetric['time'] )
1664                                        count_bits      += len( dmetric['val'] )
1665
1666                count_bytes     = count_bits / 8
1667
1668                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1669                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1670                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1671                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1672
1673                for hostname, mymetrics in self.myMetrics.items():     
1674
1675                        for metricname, mymetric in mymetrics.items():
1676
[53]1677                                metrics_to_store = [ ]
1678
[63]1679                                # Pop metrics from stack for storing until none is left
1680                                # atomic code: only 1 thread at a time may access myMetrics
1681
[53]1682                                # <ATOMIC>
[50]1683                                #
[47]1684                                self.slot.acquire() 
[33]1685
[54]1686                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1687
[54]1688                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1689
[176]1690                                                try:
1691                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1692                                                except IndexError, msg:
1693
[179]1694                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1695                                                        # is still len 0 when the statement is executed.
1696                                                        # Just ignore indexerror's..
[176]1697                                                        pass
1698
[53]1699                                self.slot.release()
1700                                #
1701                                # </ATOMIC>
1702
[47]1703                                # Create a mapping table, each metric to the period where it should be stored
1704                                #
[53]1705                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1706
[50]1707                                update_rets = [ ]
1708
[47]1709                                for period, pmetric in metric_serial_table.items():
1710
[146]1711                                        create_ret = self.createCheck( hostname, metricname, period )   
[47]1712
1713                                        update_ret = self.update( hostname, metricname, period, pmetric )
1714
1715                                        if update_ret == 0:
1716
1717                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1718                                        else:
1719                                                debug_msg( 9, 'metric update failed' )
1720
[146]1721                                        update_rets.append( create_ret )
[50]1722                                        update_rets.append( update_ret )
[47]1723
[179]1724                                # Lets ignore errors here for now, we need to make sure last update time
1725                                # is correct!
1726                                #
1727                                #if not (1) in update_rets:
[50]1728
[179]1729                                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1730
[365]1731                debug_msg( 5, "Leaving storeMetrics()")
1732
[17]1733        def makeTimeSerial( self ):
[63]1734                """Generate a time serial. Seconds since epoch"""
[17]1735
1736                # Seconds since epoch
1737                mytime = int( time.time() )
1738
1739                return mytime
1740
[50]1741        def makeRrdPath( self, host, metricname, timeserial ):
[63]1742                """Make a RRD location/path and filename"""
[17]1743
[292]1744                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1745                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
[17]1746
1747                return rrd_dir, rrd_file
1748
[20]1749        def getLastRrdTimeSerial( self, host ):
[63]1750                """Find the last timeserial (directory) for this host"""
[17]1751
[19]1752                newest_timeserial = 0
1753
[47]1754                for dir in self.timeserials[ host ]:
[32]1755
[47]1756                        valid_dir = 1
[17]1757
[47]1758                        for letter in dir:
1759                                if letter not in string.digits:
1760                                        valid_dir = 0
[17]1761
[47]1762                        if valid_dir:
1763                                timeserial = dir
1764                                if timeserial > newest_timeserial:
1765                                        newest_timeserial = timeserial
[17]1766
1767                if newest_timeserial:
[18]1768                        return newest_timeserial
[17]1769                else:
1770                        return 0
1771
[47]1772        def determinePeriod( self, host, check_serial ):
[63]1773                """Determine to which period (directory) this time(serial) belongs"""
[47]1774
1775                period_serial = 0
1776
[56]1777                if self.timeserials.has_key( host ):
[47]1778
[56]1779                        for serial in self.timeserials[ host ]:
[47]1780
[56]1781                                if check_serial >= serial and period_serial < serial:
[47]1782
[56]1783                                        period_serial = serial
1784
[47]1785                return period_serial
1786
1787        def determineSerials( self, host, metricname, metriclist ):
1788                """
1789                Determine the correct serial and corresponding rrd to store
1790                for a list of metrics
1791                """
1792
1793                metric_serial_table = { }
1794
1795                for metric in metriclist:
1796
1797                        if metric['name'] == metricname:
1798
[292]1799                                period          = self.determinePeriod( host, metric['time'] ) 
[47]1800
[292]1801                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
[47]1802
[49]1803                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1804
1805                                        # This one should get it's own new period
1806                                        period = metric['time']
[57]1807
1808                                        if not self.timeserials.has_key( host ):
1809                                                self.timeserials[ host ] = [ ]
1810
[50]1811                                        self.timeserials[ host ].append( period )
[47]1812
1813                                if not metric_serial_table.has_key( period ):
1814
[49]1815                                        metric_serial_table[ period ] = [ ]
[47]1816
1817                                metric_serial_table[ period ].append( metric )
1818
1819                return metric_serial_table
1820
[33]1821        def createCheck( self, host, metricname, timeserial ):
[63]1822                """Check if an rrd allready exists for this metric, create if not"""
[9]1823
[35]1824                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]1825               
[33]1826                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]1827
[9]1828                if not os.path.exists( rrd_dir ):
[58]1829
1830                        try:
1831                                os.makedirs( rrd_dir )
1832
[169]1833                        except os.OSError, msg:
[58]1834
1835                                if msg.find( 'File exists' ) != -1:
1836
1837                                        # Ignore exists errors
1838                                        pass
1839
1840                                else:
1841
1842                                        print msg
1843                                        return
1844
[14]1845                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]1846
[14]1847                if not os.path.exists( rrd_file ):
[9]1848
[292]1849                        interval        = self.config.getInterval( self.cluster )
1850                        heartbeat       = 8 * int( interval )
[9]1851
[292]1852                        params          = [ ]
[12]1853
[37]1854                        params.append( '--step' )
1855                        params.append( str( interval ) )
[12]1856
[37]1857                        params.append( '--start' )
[47]1858                        params.append( str( int( timeserial ) - 1 ) )
[12]1859
[37]1860                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1861                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]1862
[37]1863                        self.rrdm.create( str(rrd_file), params )
1864
[14]1865                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1866
[47]1867        def update( self, host, metricname, timeserial, metriclist ):
[63]1868                """
1869                Update rrd file for host with metricname
1870                in directory timeserial with metriclist
1871                """
[9]1872
[35]1873                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]1874
[292]1875                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
[18]1876
[292]1877                update_list             = self.makeUpdateList( host, metriclist )
[15]1878
[41]1879                if len( update_list ) > 0:
1880                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]1881
[41]1882                        if ret:
1883                                return 1
[27]1884               
[41]1885                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]1886
[36]1887                return 0
1888
[169]1889def daemon():
1890        """daemonized threading"""
[8]1891
[169]1892        # Fork the first child
1893        #
1894        pid = os.fork()
1895
1896        if pid > 0:
1897
1898                sys.exit(0)  # end parent
1899
1900        # creates a session and sets the process group ID
1901        #
1902        os.setsid()
1903
1904        # Fork the second child
1905        #
1906        pid = os.fork()
1907
1908        if pid > 0:
1909
1910                sys.exit(0)  # end parent
1911
[435]1912        write_pidfile()
1913
[169]1914        # Go to the root directory and set the umask
1915        #
1916        os.chdir('/')
1917        os.umask(0)
1918
1919        sys.stdin.close()
1920        sys.stdout.close()
1921        sys.stderr.close()
1922
[273]1923        os.open('/dev/null', os.O_RDWR)
1924        os.dup2(0, 1)
1925        os.dup2(0, 2)
[169]1926
1927        run()
1928
1929def run():
1930        """Threading start"""
1931
[287]1932        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
[295]1933        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[8]1934
[295]1935        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1936        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
[287]1937
[169]1938        try:
[292]1939                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1940                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]1941
[169]1942                torque_xml_thread.start()
1943                ganglia_xml_thread.start()
1944               
[176]1945        except thread.error, msg:
[169]1946                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1947                syslog.closelog()
1948                sys.exit(1)
1949               
1950        debug_msg( 0, 'main threading started.' )
[78]1951
[169]1952def main():
1953        """Program startup"""
1954
[375]1955        global DAEMONIZE, USE_SYSLOG
1956
[214]1957        if not processArgs( sys.argv[1:] ):
1958                sys.exit( 1 )
1959
[169]1960        if( DAEMONIZE and USE_SYSLOG ):
1961                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1962
1963        if DAEMONIZE:
1964                daemon()
1965        else:
1966                run()
1967
1968#
[81]1969# Global functions
[169]1970#
[81]1971
[9]1972def check_dir( directory ):
[63]1973        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]1974
1975        if directory[-1] == '/':
1976                directory = directory[:-1]
1977
1978        return directory
1979
[295]1980def reqtime2epoch( rtime ):
1981
1982        (hours, minutes, seconds )      = rtime.split( ':' )
1983
1984        etime   = int(seconds)
1985        etime   = etime + ( int(minutes) * 60 )
1986        etime   = etime + ( int(hours) * 60 * 60 )
1987
1988        return etime
1989
[12]1990def debug_msg( level, msg ):
[169]1991        """Only print msg if correct levels"""
[12]1992
[169]1993        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1994                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1995       
1996        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1997                syslog.syslog( msg )
[12]1998
[46]1999def printTime( ):
[63]2000        """Print current time in human readable format"""
[46]2001
2002        return time.strftime("%a %d %b %Y %H:%M:%S")
2003
[435]2004def write_pidfile():
2005
2006        # Write pidfile if PIDFILE exists
2007        if PIDFILE:
2008
2009                pid     = os.getpid()
2010
2011                pidfile = open(PIDFILE, 'w')
2012
2013                pidfile.write( str( pid ) )
2014                pidfile.close()
2015
[63]2016# Ooohh, someone started me! Let's go..
[9]2017if __name__ == '__main__':
2018        main()
Note: See TracBrowser for help on using the repository browser.