source: trunk/jobarchived/jobarchived.py @ 455

Last change on this file since 455 was 455, checked in by bastiaans, 16 years ago

jobarchived/jobarchived.py:

  • fix to py-rrdtool module detection

jobarchived/examples/jobarchived.conf:

  • more sensible default path
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 47.4 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 455 2008-02-05 15:26:24Z bastiaans $
22#
23
24DEFAULT_SEARCH_PATH     = '/usr/share/jobarchived'
25
26import sys
27
28if DEFAULT_SEARCH_PATH not in sys.path:
29
30        sys.path.append( DEFAULT_SEARCH_PATH )
31
32import getopt, syslog, ConfigParser
33
34def usage():
35
36        print
37        print 'usage: jobarchived [options]'
38        print 'options:'
39        print '      --config, -c      configuration file'
40        print '      --pidfile, -p     pid file'
41        print '      --help, -h        help'
42        print
43
44def processArgs( args ):
45
46        SHORT_L = 'p:hc:'
47        LONG_L  = [ 'help', 'config=', 'pidfile=' ]
48
49        config_filename = None
50
51        global PIDFILE
52
53        PIDFILE = None
54
55        try:
56
57                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
58
59        except getopt.error, detail:
60
61                print detail
62                sys.exit(1)
63
64        for opt, value in opts:
65
66                if opt in [ '--config', '-c' ]:
67
68                        config_filename = value
69
70                if opt in [ '--pidfile', '-p' ]:
71
72                        PIDFILE         = value
73
74                if opt in [ '--help', '-h' ]:
75
76                        usage()
77                        sys.exit( 0 )
78
79        if not config_filename:
80
81                config_filename = '/etc/jobarchived.conf'
82
83        try:
84                return loadConfig( config_filename )
85
86        except ConfigParser.NoOptionError, detail:
87
88                print detail
89                sys.exit( 1 )
90
91def loadConfig( filename ):
92
93        def getlist( cfg_string ):
94
95                my_list = [ ]
96
97                for item_txt in cfg_string.split( ',' ):
98
99                        sep_char = None
100
101                        item_txt = item_txt.strip()
102
103                        for s_char in [ "'", '"' ]:
104
105                                if item_txt.find( s_char ) != -1:
106
107                                        if item_txt.count( s_char ) != 2:
108
109                                                print 'Missing quote: %s' %item_txt
110                                                sys.exit( 1 )
111
112                                        else:
113
114                                                sep_char = s_char
115                                                break
116
117                        if sep_char:
118
119                                item_txt = item_txt.split( sep_char )[1]
120
121                        my_list.append( item_txt )
122
123                return my_list
124
125        cfg = ConfigParser.ConfigParser()
126
127        cfg.read( filename )
128
129        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
130
131        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
132
133        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
134
135        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
136
137        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
138
139        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
140
141        MODRRDTOOL              = False
142
143        try:
144                import rrdtool
145
146                MODRRDTOOL              = True
147
148        except ImportError:
149
150                MODRRDTOOL              = False
151
152                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobmond significantly!" )
153
154                RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
155
156        try:
157
158                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
159
160        except AttributeError, detail:
161
162                print 'Unknown syslog facility'
163                sys.exit( 1 )
164
165        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
166
167        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
168
169        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
170
171        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
172
173        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
174
175        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
176
177        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
178
179
180        return True
181
182# What XML data types not to store
183#
184UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
185
186# Maximum time (in seconds) a parsethread may run
187#
188PARSE_TIMEOUT = 60
189
190# Maximum time (in seconds) a storethread may run
191#
192STORE_TIMEOUT = 360
193
194"""
195The Job Archiving Daemon
196"""
197
198from types import *
199
200import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
201import rrdtool
202from pyPgSQL import PgSQL
203
204# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
205# and added some more functions for postgres.
206#       
207#
208# Changed by: Bas van der Vlies <basv@sara.nl>
209#       
210# SARA API for Postgres Database
211#
212# Changed by: Ramon Bastiaans for Job Monarch
213#
214
215class InitVars:
216        Vars = {}
217       
218        def __init__(self, **key_arg):
219                for (key, value) in key_arg.items():
220                        if value:
221                                self.Vars[key] = value
222                        else:   
223                                self.Vars[key] = None
224                               
225        def __call__(self, *key):
226                key = "%s" % key
227                return self.Vars[key]
228               
229        def __getitem__(self, key):
230                return self.Vars[key]
231               
232        def __repr__(self):
233                return repr(self.Vars)
234               
235        def keys(self):
236                barf =  map(None, self.Vars.keys())
237                return barf
238               
239        def values(self):
240                barf =  map(None, self.Vars.values())
241                return barf
242               
243        def has_key(self, key):
244                if self.Vars.has_key(key):
245                        return 1
246                else:   
247                        return 0
248                       
249class DBError(Exception):
250        def __init__(self, msg=''):
251                self.msg = msg
252                Exception.__init__(self, msg)
253        def __repr__(self):
254                return self.msg
255        __str__ = __repr__
256
257#
258# Class to connect to a database
259# and return the queury in a list or dictionairy.
260#
261class DB:
262        def __init__(self, db_vars):
263
264                self.dict = db_vars
265
266                if self.dict.has_key('User'):
267                        self.user = self.dict['User']
268                else:
269                        self.user = 'postgres'
270
271                if self.dict.has_key('Host'):
272                        self.host = self.dict['Host']
273                else:
274                        self.host = 'localhost'
275
276                if self.dict.has_key('Password'):
277                        self.passwd = self.dict['Password']
278                else:
279                        self.passwd = ''
280
281                if self.dict.has_key('DataBaseName'):
282                        self.db = self.dict['DataBaseName']
283                else:
284                        self.db = 'uva_cluster_db'
285
286                # connect_string = 'host:port:database:user:password:
287                dsn = "%s::%s:%s:%s" %(self.host, self.db, self.user, self.passwd)
288
289                try:
290                        self.SQL = PgSQL.connect(dsn)
291                except PgSQL.Error, details:
292                        str = "%s" %details
293                        raise DBError(str)
294
295        def __repr__(self):
296                return repr(self.result)
297
298        def __nonzero__(self):
299                return not(self.result == None)
300
301        def __len__(self):
302                return len(self.result)
303
304        def __getitem__(self,i):
305                return self.result[i]
306
307        def __getslice__(self,i,j):
308                return self.result[i:j]
309
310        def Get(self, q_str):
311                c = self.SQL.cursor()
312                try:
313                        c.execute(q_str)
314                        result = c.fetchall()
315                except PgSQL.Error, details:
316                        c.close()
317                        str = "%s" %details
318                        raise DBError(str)
319
320                c.close()
321                return result
322
323        def Set(self, q_str):
324                c = self.SQL.cursor()
325                try:
326                        c.execute(q_str)
327                        result = c.oidValue
328
329                except PgSQL.Error, details:
330                        c.close()
331                        str = "%s" %details
332                        raise DBError(str)
333
334                c.close()
335                return result
336
337        def Commit(self):
338                self.SQL.commit()
339
340class DataSQLStore:
341
342        db_vars = None
343        dbc = None
344
345        def __init__( self, hostname, database ):
346
347                self.db_vars = InitVars(DataBaseName=database,
348                                User='root',
349                                Host=hostname,
350                                Password='',
351                                Dictionary='true')
352
353                try:
354                        self.dbc     = DB(self.db_vars)
355                except DBError, details:
356                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
357                        sys.exit(1)
358
359        def setDatabase(self, statement):
360                ret = self.doDatabase('set', statement)
361                return ret
362               
363        def getDatabase(self, statement):
364                ret = self.doDatabase('get', statement)
365                return ret
366
367        def doDatabase(self, type, statement):
368
369                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
370                try:
371                        if type == 'set':
372                                result = self.dbc.Set( statement )
373                                self.dbc.Commit()
374                        elif type == 'get':
375                                result = self.dbc.Get( statement )
376                               
377                except DBError, detail:
378                        operation = statement.split(' ')[0]
379                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
380                        sys.exit(1)
381
382                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
383                return result
384
385        def getJobNodeId( self, job_id, node_id ):
386
387                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
388                if len( id ) > 0:
389
390                        if len( id[0] ) > 0 and id[0] != '':
391                       
392                                return 1
393
394                return 0
395
396        def getNodeId( self, hostname ):
397
398                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
399
400                if len( id ) > 0:
401
402                        id = id[0][0]
403
404                        return id
405                else:
406                        return None
407
408        def getNodeIds( self, hostnames ):
409
410                ids = [ ]
411
412                for node in hostnames:
413
414                        id = self.getNodeId( node )
415
416                        if id:
417                                ids.append( id )
418
419                return ids
420
421        def getJobId( self, jobid ):
422
423                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
424
425                if id:
426                        id = id[0][0]
427
428                        return id
429                else:
430                        return None
431
432        def addJob( self, job_id, jobattrs ):
433
434                if not self.getJobId( job_id ):
435
436                        self.mutateJob( 'insert', job_id, jobattrs ) 
437                else:
438                        self.mutateJob( 'update', job_id, jobattrs )
439
440        def mutateJob( self, action, job_id, jobattrs ):
441
442                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
443
444                insert_col_str  = 'job_id'
445                insert_val_str  = "'%s'" %job_id
446                update_str      = None
447
448                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
449
450                ids = [ ]
451
452                for valname, value in jobattrs.items():
453
454                        if valname in job_values and value != '':
455
456                                column_name = 'job_' + valname
457
458                                if action == 'insert':
459
460                                        if not insert_col_str:
461                                                insert_col_str = column_name
462                                        else:
463                                                insert_col_str = insert_col_str + ',' + column_name
464
465                                        if not insert_val_str:
466                                                insert_val_str = value
467                                        else:
468                                                insert_val_str = insert_val_str + ",'%s'" %value
469
470                                elif action == 'update':
471                                       
472                                        if not update_str:
473                                                update_str = "%s='%s'" %(column_name, value)
474                                        else:
475                                                update_str = update_str + ",%s='%s'" %(column_name, value)
476
477                        elif valname == 'nodes' and value:
478
479                                node_valid = 1
480
481                                if len(value) == 1:
482                               
483                                        if jobattrs['status'] == 'Q':
484
485                                                node_valid = 0
486
487                                        else:
488
489                                                node_valid = 0
490
491                                                for node_char in str(value[0]):
492
493                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
494
495                                                                node_valid = 1
496
497                                if node_valid:
498
499                                        ids = self.addNodes( value, jobattrs['domain'] )
500
501                if action == 'insert':
502
503                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
504
505                elif action == 'update':
506
507                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
508
509                if len( ids ) > 0:
510                        self.addJobNodes( job_id, ids )
511
512        def addNodes( self, hostnames, domain ):
513
514                ids = [ ]
515
516                for node in hostnames:
517
518                        node    = '%s.%s' %( node, domain )
519                        id      = self.getNodeId( node )
520       
521                        if not id:
522                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
523                                id = self.getNodeId( node )
524
525                        ids.append( id )
526
527                return ids
528
529        def addJobNodes( self, jobid, nodes ):
530
531                for node in nodes:
532
533                        if not self.getJobNodeId( jobid, node ):
534
535                                self.addJobNode( jobid, node )
536
537        def addJobNode( self, jobid, nodeid ):
538
539                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
540
541        def storeJobInfo( self, jobid, jobattrs ):
542
543                self.addJob( jobid, jobattrs )
544
545        def checkStaleJobs( self ):
546
547                q = "SELECT * from jobs WHERE job_status != 'F'"
548
549                r = self.getDatabase( q )
550
551                if len( r ) == 0:
552
553                        return None
554
555                cleanjobs       = [ ]
556                timeoutjobs     = [ ]
557
558                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
559                cur_time        = time.time()
560
561                for row in r:
562
563                        job_id                  = row[0]
564                        job_requested_time      = row[4]
565                        job_status              = row[7]
566                        job_start_timestamp     = row[8]
567
568                        if job_status == 'Q' or not job_start_timestamp:
569
570                                cleanjobs.append( job_id )
571
572                        else:
573
574                                start_timestamp = int( job_start_timestamp )
575
576                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
577
578                                        if job_requested_time:
579
580                                                rtime_epoch     = reqtime2epoch( job_requested_time )
581                                        else:
582                                                rtime_epoch     = None
583                                       
584                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
585
586                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
587
588                for j in cleanjobs:
589
590                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
591                        self.setDatabase( q )
592
593                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
594
595                for j in timeoutjobs:
596
597                        ( i, s, r )             = j
598
599                        if r:
600                                new_end_timestamp       = int( s ) + r
601
602                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
603                        self.setDatabase( q )
604
605class RRDMutator:
606        """A class for performing RRD mutations"""
607
608        binary = None
609
610
611        def __init__( self, binary=None ):
612                """Set alternate binary if supplied"""
613
614                if binary:
615                        self.binary = binary
616
617        def create( self, filename, args ):
618                """Create a new rrd with args"""
619
620                global MODRRDTOOL
621
622                if MODRRDTOOL:
623                        return self.perform( 'create', filename, args )
624                else:
625                        return self.perform( 'create', '"' + filename + '"', args )
626
627        def update( self, filename, args ):
628                """Update a rrd with args"""
629
630                global MODRRDTOOL
631
632                if MODRRDTOOL:
633                        return self.perform( 'update', filename, args )
634                else:
635                        return self.perform( 'update', '"' + filename + '"', args )
636
637        def grabLastUpdate( self, filename ):
638                """Determine the last update time of filename rrd"""
639
640                global MODRRDTOOL
641
642                last_update = 0
643
644                if MODRRDTOOL:
645
646                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
647
648                        rrd_header      = { }
649
650                        try:
651                                rrd_header      = rrdtool.info( filename )
652                        except rrdtool.error, msg:
653                                debug_msg( 8, str( msg ) )
654
655                        if rrd_header.has_key( 'last_update' ):
656                                return last_update
657                        else:
658                                return 0
659
660                else:
661                        debug_msg( 8, self.binary + ' info ' + filename )
662
663                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
664
665                        for line in my_pipe.readlines():
666
667                                if line.find( 'last_update') != -1:
668
669                                        last_update = line.split( ' = ' )[1]
670
671                        if my_pipe:
672
673                                my_pipe.close()
674
675                        if last_update:
676                                return last_update
677                        else:
678                                return 0
679
680
681        def perform( self, action, filename, args ):
682                """Perform action on rrd filename with args"""
683
684                global MODRRDTOOL
685
686                arg_string = None
687
688                if type( args ) is not ListType:
689                        debug_msg( 8, 'Arguments needs to be of type List' )
690                        return 1
691
692                for arg in args:
693
694                        if not arg_string:
695
696                                arg_string = arg
697                        else:
698                                arg_string = arg_string + ' ' + arg
699
700                if MODRRDTOOL:
701
702                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
703
704                        try:
705                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
706
707                                if action == 'create':
708
709                                        rrdtool.create( str( filename ), *args )
710
711                                elif action == 'update':
712
713                                        rrdtool.update( str( filename ), *args )
714
715                        except rrdtool.error, msg:
716
717                                error_msg = str( msg )
718                                debug_msg( 8, error_msg )
719                                return 1
720
721                else:
722
723                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
724
725                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
726                        lines   = cmd.readlines()
727
728                        cmd.close()
729
730                        for line in lines:
731
732                                if line.find( 'ERROR' ) != -1:
733
734                                        error_msg = string.join( line.split( ' ' )[1:] )
735                                        debug_msg( 8, error_msg )
736                                        return 1
737
738                return 0
739
740class XMLProcessor:
741        """Skeleton class for XML processor's"""
742
743        def run( self ):
744                """Do main processing of XML here"""
745
746                pass
747
748class TorqueXMLProcessor( XMLProcessor ):
749        """Main class for processing XML and acting with it"""
750
751        def __init__( self, XMLSource, DataStore ):
752                """Setup initial XML connection and handlers"""
753
754                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
755                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
756                self.myXMLSource        = XMLSource
757                self.myXMLHandler       = TorqueXMLHandler( DataStore )
758                self.myXMLError         = XMLErrorHandler()
759
760                self.config             = GangliaConfigParser( GMETAD_CONF )
761
762        def run( self ):
763                """Main XML processing"""
764
765                debug_msg( 1, 'torque_xml_thread(): started.' )
766
767                while( 1 ):
768
769                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
770                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
771
772                        my_data = self.myXMLSource.getData()
773
774                        try:
775                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
776                        except socket.error, msg:
777                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
778                               
779                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
780                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
781                        time.sleep( self.config.getLowestInterval() )
782
783class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
784        """Parse Torque's jobinfo XML from our plugin"""
785
786        jobAttrs = { }
787
788        def __init__( self, datastore ):
789
790                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
791                self.ds                 = datastore
792                self.jobs_processed     = [ ]
793                self.jobs_to_store      = [ ]
794
795        def startDocument( self ):
796
797                self.heartbeat  = 0
798                self.elementct  = 0
799
800        def startElement( self, name, attrs ):
801                """
802                This XML will be all gmetric XML
803                so there will be no specific start/end element
804                just one XML statement with all info
805                """
806               
807                jobinfo = { }
808
809                self.elementct  += 1
810
811                if name == 'CLUSTER':
812
813                        self.clustername = str( attrs.get( 'NAME', "" ) )
814
815                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
816
817                        metricname = str( attrs.get( 'NAME', "" ) )
818
819                        if metricname == 'MONARCH-HEARTBEAT':
820                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
821
822                        elif metricname.find( 'MONARCH-JOB' ) != -1:
823
824                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
825                                val     = str( attrs.get( 'VAL', "" ) )
826
827                                if not job_id in self.jobs_processed:
828
829                                        self.jobs_processed.append( job_id )
830
831                                check_change = 0
832
833                                if self.jobAttrs.has_key( job_id ):
834
835                                        check_change = 1
836
837                                valinfo = val.split( ' ' )
838
839                                for myval in valinfo:
840
841                                        if len( myval.split( '=' ) ) > 1:
842
843                                                valname = myval.split( '=' )[0]
844                                                value   = myval.split( '=' )[1]
845
846                                                if valname == 'nodes':
847                                                        value = value.split( ';' )
848
849                                                jobinfo[ valname ] = value
850
851                                if check_change:
852                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
853                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
854                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
855                                                if not job_id in self.jobs_to_store:
856                                                        self.jobs_to_store.append( job_id )
857
858                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
859                                else:
860                                        self.jobAttrs[ job_id ] = jobinfo
861
862                                        if not job_id in self.jobs_to_store:
863                                                self.jobs_to_store.append( job_id )
864
865                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
866                                       
867        def endDocument( self ):
868                """When all metrics have gone, check if any jobs have finished"""
869
870                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
871
872                if self.heartbeat:
873                        for jobid, jobinfo in self.jobAttrs.items():
874
875                                # This is an old job, not in current jobinfo list anymore
876                                # it must have finished, since we _did_ get a new heartbeat
877                                #
878                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
879
880                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
881
882                                        if not jobid in self.jobs_processed:
883                                                self.jobs_processed.append( jobid )
884
885                                        self.jobAttrs[ jobid ]['status'] = 'F'
886                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
887
888                                        if not jobid in self.jobs_to_store:
889                                                self.jobs_to_store.append( jobid )
890
891                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
892
893                        for jobid in self.jobs_to_store:
894                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
895
896                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
897
898                                        if self.jobAttrs[ jobid ]['status'] == 'F':
899                                                del self.jobAttrs[ jobid ]
900
901                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
902
903                        self.jobs_processed     = [ ]
904                        self.jobs_to_store      = [ ]
905
906        def setJobAttrs( self, old, new ):
907                """
908                Set new job attributes in old, but not lose existing fields
909                if old attributes doesn't have those
910                """
911
912                for valname, value in new.items():
913                        old[ valname ] = value
914
915                return old
916               
917
918        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
919                """
920                Check if jobinfo has changed from jobattrs[jobid]
921                if it's report time is bigger than previous one
922                and it is report time is recent (equal to heartbeat)
923                """
924
925                ignore_changes = [ 'reported' ]
926
927                if jobattrs.has_key( jobid ):
928
929                        for valname, value in jobinfo.items():
930
931                                if valname not in ignore_changes:
932
933                                        if jobattrs[ jobid ].has_key( valname ):
934
935                                                if value != jobattrs[ jobid ][ valname ]:
936
937                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
938                                                                return True
939
940                                        else:
941                                                return True
942
943                return False
944
945class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
946        """Parse Ganglia's XML"""
947
948        def __init__( self, config, datastore ):
949                """Setup initial variables and gather info on existing rrd archive"""
950
951                self.config     = config
952                self.clusters   = { }
953                self.ds         = datastore
954
955                debug_msg( 1, 'Checking database..' )
956
957                global DEBUG_LEVEL
958
959                if DEBUG_LEVEL <= 2:
960                        self.ds.checkStaleJobs()
961
962                debug_msg( 1, 'Check done.' )
963                debug_msg( 1, 'Checking rrd archive..' )
964                self.gatherClusters()
965                debug_msg( 1, 'Check done.' )
966
967        def gatherClusters( self ):
968                """Find all existing clusters in archive dir"""
969
970                archive_dir     = check_dir(ARCHIVE_PATH)
971
972                hosts           = [ ]
973
974                if os.path.exists( archive_dir ):
975
976                        dirlist = os.listdir( archive_dir )
977
978                        for cfgcluster in ARCHIVE_DATASOURCES:
979
980                                if cfgcluster not in dirlist:
981
982                                        # Autocreate a directory for this cluster
983                                        # assume it is new
984                                        #
985                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
986
987                                        os.mkdir( cluster_dir )
988
989                                        dirlist.append( cfgcluster )
990
991                        for item in dirlist:
992
993                                clustername = item
994
995                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
996
997                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
998
999                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
1000
1001        def startElement( self, name, attrs ):
1002                """Memorize appropriate data from xml start tags"""
1003
1004                if name == 'GANGLIA_XML':
1005
1006                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
1007                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
1008
1009                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
1010
1011                elif name == 'GRID':
1012
1013                        self.gridName   = str( attrs.get( 'NAME', "" ) )
1014                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
1015
1016                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
1017
1018                elif name == 'CLUSTER':
1019
1020                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
1021                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
1022
1023                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
1024
1025                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
1026
1027                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
1028
1029                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
1030
1031                        self.hostName           = str( attrs.get( 'NAME', "" ) )
1032                        self.hostIp             = str( attrs.get( 'IP', "" ) )
1033                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
1034
1035                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
1036
1037                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
1038
1039                        type = str( attrs.get( 'TYPE', "" ) )
1040                       
1041                        exclude_metric = False
1042                       
1043                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1044
1045                                orig_name = str( attrs.get( 'NAME', "" ) )
1046
1047                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1048                               
1049                                        exclude_metric = True
1050
1051                                elif re.match( ex_metricstr, orig_name ):
1052
1053                                        exclude_metric = True
1054
1055                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1056
1057                                myMetric                = { }
1058                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
1059                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
1060                                myMetric['time']        = self.hostReported
1061
1062                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
1063
1064                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
1065
1066        def storeMetrics( self ):
1067                """Store metrics of each cluster rrd handler"""
1068
1069                for clustername, rrdh in self.clusters.items():
1070
1071                        ret = rrdh.storeMetrics()
1072
1073                        if ret:
1074                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1075                                return 1
1076
1077                return 0
1078
1079class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1080
1081        def error( self, exception ):
1082                """Recoverable error"""
1083
1084                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
1085
1086        def fatalError( self, exception ):
1087                """Non-recoverable error"""
1088
1089                exception_str = str( exception )
1090
1091                # Ignore 'no element found' errors
1092                if exception_str.find( 'no element found' ) != -1:
1093                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1094                        return 0
1095
1096                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1097                sys.exit( 1 )
1098
1099        def warning( self, exception ):
1100                """Warning"""
1101
1102                debug_msg( 0, 'Warning ' + str( exception ) )
1103
1104class XMLGatherer:
1105        """Setup a connection and file object to Ganglia's XML"""
1106
1107        s               = None
1108        fd              = None
1109        data            = None
1110        slot            = None
1111
1112        # Time since the last update
1113        #
1114        LAST_UPDATE     = 0
1115
1116        # Minimum interval between updates
1117        #
1118        MIN_UPDATE_INT  = 10
1119
1120        # Is a update occuring now
1121        #
1122        update_now      = False
1123
1124        def __init__( self, host, port ):
1125                """Store host and port for connection"""
1126
1127                self.host       = host
1128                self.port       = port
1129                self.slot       = threading.Lock()
1130
1131                self.retrieveData()
1132
1133        def retrieveData( self ):
1134                """Setup connection to XML source"""
1135
1136                self.update_now = True
1137
1138                self.slot.acquire()
1139
1140                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
1141
1142                        af, socktype, proto, canonname, sa = res
1143
1144                        try:
1145
1146                                self.s = socket.socket( af, socktype, proto )
1147
1148                        except socket.error, msg:
1149
1150                                self.s = None
1151                                continue
1152
1153                        try:
1154
1155                                self.s.connect( sa )
1156
1157                        except socket.error, msg:
1158
1159                                self.disconnect()
1160                                continue
1161
1162                        break
1163
1164                if self.s is None:
1165
1166                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1167                        self.update_now = False
1168                        sys.exit( 1 )
1169
1170                else:
1171                        #self.s.send( '\n' )
1172
1173                        my_fp                   = self.s.makefile( 'r' )
1174                        my_data                 = my_fp.readlines()
1175                        my_data                 = string.join( my_data, '' )
1176
1177                        self.data               = my_data
1178
1179                        self.LAST_UPDATE        = time.time()
1180
1181                self.slot.release()
1182
1183                self.update_now = False
1184
1185        def disconnect( self ):
1186                """Close socket"""
1187
1188                if self.s:
1189                        #self.s.shutdown( 2 )
1190                        self.s.close()
1191                        self.s = None
1192
1193        def __del__( self ):
1194                """Kill the socket before we leave"""
1195
1196                self.disconnect()
1197
1198        def reGetData( self ):
1199                """Reconnect"""
1200
1201                while self.update_now:
1202
1203                        # Must be another update in progress:
1204                        # Wait until the update is complete
1205                        #
1206                        time.sleep( 1 )
1207
1208                if self.s:
1209                        self.disconnect()
1210
1211                self.retrieveData()
1212
1213        def getData( self ):
1214
1215                """Return the XML data"""
1216
1217                # If more than MIN_UPDATE_INT seconds passed since last data update
1218                # update the XML first before returning it
1219                #
1220
1221                cur_time        = time.time()
1222
1223                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1224
1225                        self.reGetData()
1226
1227                while self.update_now:
1228
1229                        # Must be another update in progress:
1230                        # Wait until the update is complete
1231                        #
1232                        time.sleep( 1 )
1233                       
1234                return self.data
1235
1236        def makeFileDescriptor( self ):
1237                """Make file descriptor that points to our socket connection"""
1238
1239                self.reconnect()
1240
1241                if self.s:
1242                        self.fd = self.s.makefile( 'r' )
1243
1244        def getFileObject( self ):
1245                """Connect, and return a file object"""
1246
1247                self.makeFileDescriptor()
1248
1249                if self.fd:
1250                        return self.fd
1251
1252class GangliaXMLProcessor( XMLProcessor ):
1253        """Main class for processing XML and acting with it"""
1254
1255        def __init__( self, XMLSource, DataStore ):
1256                """Setup initial XML connection and handlers"""
1257
1258                self.config             = GangliaConfigParser( GMETAD_CONF )
1259
1260                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1261                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1262                self.myXMLSource        = XMLSource
1263                self.ds                 = DataStore
1264                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
1265                self.myXMLError         = XMLErrorHandler()
1266
1267        def run( self ):
1268                """Main XML processing; start a xml and storethread"""
1269
1270                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1271                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1272
1273                while( 1 ):
1274
1275                        if not xml_thread.isAlive():
1276                                # Gather XML at the same interval as gmetad
1277
1278                                # threaded call to: self.processXML()
1279                                #
1280                                try:
1281                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1282                                        xml_thread.start()
1283                                except thread.error, msg:
1284                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1285                                        #return 1
1286
1287                        if not store_thread.isAlive():
1288                                # Store metrics every .. sec
1289
1290                                # threaded call to: self.storeMetrics()
1291                                #
1292                                try:
1293                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1294                                        store_thread.start()
1295                                except thread.error, msg:
1296                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1297                                        #return 1
1298               
1299                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1300                        time.sleep( 1 ) 
1301
1302        def storeMetrics( self ):
1303                """Store metrics retained in memory to disk"""
1304
1305                global DEBUG_LEVEL
1306
1307                # Store metrics somewhere between every 360 and 640 seconds
1308                #
1309                if DEBUG_LEVEL > 2:
1310                        #STORE_INTERVAL = 60
1311                        STORE_INTERVAL = random.randint( 360, 640 )
1312                else:
1313                        STORE_INTERVAL = random.randint( 360, 640 )
1314
1315                try:
1316                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1317                        store_metric_thread.start()
1318                except thread.error, msg:
1319                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1320                        return 1
1321
1322                debug_msg( 1, 'ganglia_store_thread(): started.' )
1323
1324                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1325                time.sleep( STORE_INTERVAL )
1326                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
1327
1328                if store_metric_thread.isAlive():
1329
1330                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
1331                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1332                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
1333
1334                debug_msg( 1, 'ganglia_store_thread(): finished.' )
1335
1336                return 0
1337
1338        def storeThread( self ):
1339                """Actual metric storing thread"""
1340
1341                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1342                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
1343                ret = self.myXMLHandler.storeMetrics()
1344                if ret > 0:
1345                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
1346                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1347                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1348               
1349                return 0
1350
1351        def processXML( self ):
1352                """Process XML"""
1353
1354                try:
1355                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1356                        parsethread.start()
1357                except thread.error, msg:
1358                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1359                        return 1
1360
1361                debug_msg( 1, 'ganglia_xml_thread(): started.' )
1362
1363                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1364                time.sleep( float( self.config.getLowestInterval() ) ) 
1365                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
1366
1367                if parsethread.isAlive():
1368
1369                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
1370                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1371                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
1372
1373                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
1374
1375                return 0
1376
1377        def parseThread( self ):
1378                """Actual parsing thread"""
1379
1380                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1381                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1382                #self.myXMLSource = self.myXMLGatherer.getFileObject()
1383               
1384                my_data = self.myXMLSource.getData()
1385
1386                #print my_data
1387
1388                try:
1389                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1390                except socket.error, msg:
1391                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1392
1393                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1394                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
1395
1396                return 0
1397
1398class GangliaConfigParser:
1399
1400        sources = [ ]
1401
1402        def __init__( self, config ):
1403                """Parse some stuff from our gmetad's config, such as polling interval"""
1404
1405                self.config = config
1406                self.parseValues()
1407
1408        def parseValues( self ):
1409                """Parse certain values from gmetad.conf"""
1410
1411                readcfg = open( self.config, 'r' )
1412
1413                for line in readcfg.readlines():
1414
1415                        if line.count( '"' ) > 1:
1416
1417                                if line.find( 'data_source' ) != -1 and line[0] != '#':
1418
1419                                        source          = { }
1420                                        source['name']  = line.split( '"' )[1]
1421                                        source_words    = line.split( '"' )[2].split( ' ' )
1422
1423                                        for word in source_words:
1424
1425                                                valid_interval = 1
1426
1427                                                for letter in word:
1428
1429                                                        if letter not in string.digits:
1430
1431                                                                valid_interval = 0
1432
1433                                                if valid_interval and len(word) > 0:
1434
1435                                                        source['interval'] = word
1436                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
1437       
1438                                        # No interval found, use Ganglia's default     
1439                                        if not source.has_key( 'interval' ):
1440                                                source['interval'] = 15
1441                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1442
1443                                        self.sources.append( source )
1444
1445        def getInterval( self, source_name ):
1446                """Return interval for source_name"""
1447
1448                for source in self.sources:
1449
1450                        if source['name'] == source_name:
1451
1452                                return source['interval']
1453
1454                return None
1455
1456        def getLowestInterval( self ):
1457                """Return the lowest interval of all clusters"""
1458
1459                lowest_interval = 0
1460
1461                for source in self.sources:
1462
1463                        if not lowest_interval or source['interval'] <= lowest_interval:
1464
1465                                lowest_interval = source['interval']
1466
1467                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1468                if lowest_interval:
1469                        return lowest_interval
1470                else:
1471                        return 15
1472
1473class RRDHandler:
1474        """Class for handling RRD activity"""
1475
1476        myMetrics = { }
1477        lastStored = { }
1478        timeserials = { }
1479        slot = None
1480
1481        def __init__( self, config, cluster ):
1482                """Setup initial variables"""
1483
1484                global MODRRDTOOL
1485
1486                self.block      = 0
1487                self.cluster    = cluster
1488                self.config     = config
1489                self.slot       = threading.Lock()
1490
1491                if MODRRDTOOL:
1492
1493                        self.rrdm       = RRDMutator()
1494                else:
1495                        self.rrdm       = RRDMutator( RRDTOOL )
1496
1497                global DEBUG_LEVEL
1498
1499                if DEBUG_LEVEL <= 2:
1500                        self.gatherLastUpdates()
1501
1502        def gatherLastUpdates( self ):
1503                """Populate the lastStored list, containing timestamps of all last updates"""
1504
1505                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1506
1507                hosts = [ ]
1508
1509                if os.path.exists( cluster_dir ):
1510
1511                        dirlist = os.listdir( cluster_dir )
1512
1513                        for dir in dirlist:
1514
1515                                hosts.append( dir )
1516
1517                for host in hosts:
1518
1519                        host_dir        = cluster_dir + '/' + host
1520                        dirlist         = os.listdir( host_dir )
1521
1522                        for dir in dirlist:
1523
1524                                if not self.timeserials.has_key( host ):
1525
1526                                        self.timeserials[ host ] = [ ]
1527
1528                                self.timeserials[ host ].append( dir )
1529
1530                        last_serial = self.getLastRrdTimeSerial( host )
1531
1532                        if last_serial:
1533
1534                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1535
1536                                if os.path.exists( metric_dir ):
1537
1538                                        dirlist = os.listdir( metric_dir )
1539
1540                                        for file in dirlist:
1541
1542                                                metricname = file.split( '.rrd' )[0]
1543
1544                                                if not self.lastStored.has_key( host ):
1545
1546                                                        self.lastStored[ host ] = { }
1547
1548                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1549
1550        def getClusterName( self ):
1551                """Return clustername"""
1552
1553                return self.cluster
1554
1555        def memMetric( self, host, metric ):
1556                """Store metric from host in memory"""
1557
1558                # <ATOMIC>
1559                #
1560                self.slot.acquire()
1561               
1562                if self.myMetrics.has_key( host ):
1563
1564                        if self.myMetrics[ host ].has_key( metric['name'] ):
1565
1566                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1567
1568                                        if mymetric['time'] == metric['time']:
1569
1570                                                # Allready have this metric, abort
1571                                                self.slot.release()
1572                                                return 1
1573                        else:
1574                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1575                else:
1576                        self.myMetrics[ host ]                          = { }
1577                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
1578
1579                # Push new metric onto stack
1580                # atomic code; only 1 thread at a time may access the stack
1581
1582                self.myMetrics[ host ][ metric['name'] ].append( metric )
1583
1584                self.slot.release()
1585                #
1586                # </ATOMIC>
1587
1588        def makeUpdateList( self, host, metriclist ):
1589                """
1590                Make a list of update values for rrdupdate
1591                but only those that we didn't store before
1592                """
1593
1594                update_list     = [ ]
1595                metric          = None
1596
1597                while len( metriclist ) > 0:
1598
1599                        metric = metriclist.pop( 0 )
1600
1601                        if self.checkStoreMetric( host, metric ):
1602
1603                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1604                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1605                                update_list.append( u_val )
1606
1607                return update_list
1608
1609        def checkStoreMetric( self, host, metric ):
1610                """Check if supplied metric if newer than last one stored"""
1611
1612                if self.lastStored.has_key( host ):
1613
1614                        if self.lastStored[ host ].has_key( metric['name'] ):
1615
1616                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1617
1618                                        # This is old
1619                                        return 0
1620
1621                return 1
1622
1623        def memLastUpdate( self, host, metricname, metriclist ):
1624                """
1625                Memorize the time of the latest metric from metriclist
1626                but only if it wasn't allready memorized
1627                """
1628
1629                if not self.lastStored.has_key( host ):
1630                        self.lastStored[ host ] = { }
1631
1632                last_update_time = 0
1633
1634                for metric in metriclist:
1635
1636                        if metric['name'] == metricname:
1637
1638                                if metric['time'] > last_update_time:
1639
1640                                        last_update_time = metric['time']
1641
1642                if self.lastStored[ host ].has_key( metricname ):
1643                       
1644                        if last_update_time <= self.lastStored[ host ][ metricname ]:
1645                                return 1
1646
1647                self.lastStored[ host ][ metricname ] = last_update_time
1648
1649        def storeMetrics( self ):
1650                """
1651                Store all metrics from memory to disk
1652                and do it to the RRD's in appropriate timeperiod directory
1653                """
1654
1655                debug_msg( 5, "Entering storeMetrics()")
1656
1657                count_values    = 0
1658                count_metrics   = 0
1659                count_bits      = 0
1660
1661                for hostname, mymetrics in self.myMetrics.items():     
1662
1663                        for metricname, mymetric in mymetrics.items():
1664
1665                                count_metrics += 1
1666
1667                                for dmetric in mymetric:
1668
1669                                        count_values += 1
1670
1671                                        count_bits      += len( dmetric['time'] )
1672                                        count_bits      += len( dmetric['val'] )
1673
1674                count_bytes     = count_bits / 8
1675
1676                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1677                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1678                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1679                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1680
1681                for hostname, mymetrics in self.myMetrics.items():     
1682
1683                        for metricname, mymetric in mymetrics.items():
1684
1685                                metrics_to_store = [ ]
1686
1687                                # Pop metrics from stack for storing until none is left
1688                                # atomic code: only 1 thread at a time may access myMetrics
1689
1690                                # <ATOMIC>
1691                                #
1692                                self.slot.acquire() 
1693
1694                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1695
1696                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1697
1698                                                try:
1699                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1700                                                except IndexError, msg:
1701
1702                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1703                                                        # is still len 0 when the statement is executed.
1704                                                        # Just ignore indexerror's..
1705                                                        pass
1706
1707                                self.slot.release()
1708                                #
1709                                # </ATOMIC>
1710
1711                                # Create a mapping table, each metric to the period where it should be stored
1712                                #
1713                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1714
1715                                update_rets = [ ]
1716
1717                                for period, pmetric in metric_serial_table.items():
1718
1719                                        create_ret = self.createCheck( hostname, metricname, period )   
1720
1721                                        update_ret = self.update( hostname, metricname, period, pmetric )
1722
1723                                        if update_ret == 0:
1724
1725                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1726                                        else:
1727                                                debug_msg( 9, 'metric update failed' )
1728
1729                                        update_rets.append( create_ret )
1730                                        update_rets.append( update_ret )
1731
1732                                # Lets ignore errors here for now, we need to make sure last update time
1733                                # is correct!
1734                                #
1735                                #if not (1) in update_rets:
1736
1737                                self.memLastUpdate( hostname, metricname, metrics_to_store )
1738
1739                debug_msg( 5, "Leaving storeMetrics()")
1740
1741        def makeTimeSerial( self ):
1742                """Generate a time serial. Seconds since epoch"""
1743
1744                # Seconds since epoch
1745                mytime = int( time.time() )
1746
1747                return mytime
1748
1749        def makeRrdPath( self, host, metricname, timeserial ):
1750                """Make a RRD location/path and filename"""
1751
1752                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1753                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
1754
1755                return rrd_dir, rrd_file
1756
1757        def getLastRrdTimeSerial( self, host ):
1758                """Find the last timeserial (directory) for this host"""
1759
1760                newest_timeserial = 0
1761
1762                for dir in self.timeserials[ host ]:
1763
1764                        valid_dir = 1
1765
1766                        for letter in dir:
1767                                if letter not in string.digits:
1768                                        valid_dir = 0
1769
1770                        if valid_dir:
1771                                timeserial = dir
1772                                if timeserial > newest_timeserial:
1773                                        newest_timeserial = timeserial
1774
1775                if newest_timeserial:
1776                        return newest_timeserial
1777                else:
1778                        return 0
1779
1780        def determinePeriod( self, host, check_serial ):
1781                """Determine to which period (directory) this time(serial) belongs"""
1782
1783                period_serial = 0
1784
1785                if self.timeserials.has_key( host ):
1786
1787                        for serial in self.timeserials[ host ]:
1788
1789                                if check_serial >= serial and period_serial < serial:
1790
1791                                        period_serial = serial
1792
1793                return period_serial
1794
1795        def determineSerials( self, host, metricname, metriclist ):
1796                """
1797                Determine the correct serial and corresponding rrd to store
1798                for a list of metrics
1799                """
1800
1801                metric_serial_table = { }
1802
1803                for metric in metriclist:
1804
1805                        if metric['name'] == metricname:
1806
1807                                period          = self.determinePeriod( host, metric['time'] ) 
1808
1809                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1810
1811                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1812
1813                                        # This one should get it's own new period
1814                                        period = metric['time']
1815
1816                                        if not self.timeserials.has_key( host ):
1817                                                self.timeserials[ host ] = [ ]
1818
1819                                        self.timeserials[ host ].append( period )
1820
1821                                if not metric_serial_table.has_key( period ):
1822
1823                                        metric_serial_table[ period ] = [ ]
1824
1825                                metric_serial_table[ period ].append( metric )
1826
1827                return metric_serial_table
1828
1829        def createCheck( self, host, metricname, timeserial ):
1830                """Check if an rrd allready exists for this metric, create if not"""
1831
1832                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1833               
1834                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1835
1836                if not os.path.exists( rrd_dir ):
1837
1838                        try:
1839                                os.makedirs( rrd_dir )
1840
1841                        except os.OSError, msg:
1842
1843                                if msg.find( 'File exists' ) != -1:
1844
1845                                        # Ignore exists errors
1846                                        pass
1847
1848                                else:
1849
1850                                        print msg
1851                                        return
1852
1853                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
1854
1855                if not os.path.exists( rrd_file ):
1856
1857                        interval        = self.config.getInterval( self.cluster )
1858                        heartbeat       = 8 * int( interval )
1859
1860                        params          = [ ]
1861
1862                        params.append( '--step' )
1863                        params.append( str( interval ) )
1864
1865                        params.append( '--start' )
1866                        params.append( str( int( timeserial ) - 1 ) )
1867
1868                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1869                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
1870
1871                        self.rrdm.create( str(rrd_file), params )
1872
1873                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1874
1875        def update( self, host, metricname, timeserial, metriclist ):
1876                """
1877                Update rrd file for host with metricname
1878                in directory timeserial with metriclist
1879                """
1880
1881                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1882
1883                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
1884
1885                update_list             = self.makeUpdateList( host, metriclist )
1886
1887                if len( update_list ) > 0:
1888                        ret = self.rrdm.update( str(rrd_file), update_list )
1889
1890                        if ret:
1891                                return 1
1892               
1893                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
1894
1895                return 0
1896
1897def daemon():
1898        """daemonized threading"""
1899
1900        # Fork the first child
1901        #
1902        pid = os.fork()
1903
1904        if pid > 0:
1905
1906                sys.exit(0)  # end parent
1907
1908        # creates a session and sets the process group ID
1909        #
1910        os.setsid()
1911
1912        # Fork the second child
1913        #
1914        pid = os.fork()
1915
1916        if pid > 0:
1917
1918                sys.exit(0)  # end parent
1919
1920        write_pidfile()
1921
1922        # Go to the root directory and set the umask
1923        #
1924        os.chdir('/')
1925        os.umask(0)
1926
1927        sys.stdin.close()
1928        sys.stdout.close()
1929        sys.stderr.close()
1930
1931        os.open('/dev/null', os.O_RDWR)
1932        os.dup2(0, 1)
1933        os.dup2(0, 2)
1934
1935        run()
1936
1937def run():
1938        """Threading start"""
1939
1940        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1941        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
1942
1943        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1944        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
1945
1946        try:
1947                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1948                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
1949
1950                torque_xml_thread.start()
1951                ganglia_xml_thread.start()
1952               
1953        except thread.error, msg:
1954                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1955                syslog.closelog()
1956                sys.exit(1)
1957               
1958        debug_msg( 0, 'main threading started.' )
1959
1960def main():
1961        """Program startup"""
1962
1963        global DAEMONIZE, USE_SYSLOG
1964
1965        if not processArgs( sys.argv[1:] ):
1966                sys.exit( 1 )
1967
1968        if( DAEMONIZE and USE_SYSLOG ):
1969                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1970
1971        if DAEMONIZE:
1972                daemon()
1973        else:
1974                run()
1975
1976#
1977# Global functions
1978#
1979
1980def check_dir( directory ):
1981        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
1982
1983        if directory[-1] == '/':
1984                directory = directory[:-1]
1985
1986        return directory
1987
1988def reqtime2epoch( rtime ):
1989
1990        (hours, minutes, seconds )      = rtime.split( ':' )
1991
1992        etime   = int(seconds)
1993        etime   = etime + ( int(minutes) * 60 )
1994        etime   = etime + ( int(hours) * 60 * 60 )
1995
1996        return etime
1997
1998def debug_msg( level, msg ):
1999        """Only print msg if correct levels"""
2000
2001        if (not DAEMONIZE and DEBUG_LEVEL >= level):
2002                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
2003       
2004        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2005                syslog.syslog( msg )
2006
2007def printTime( ):
2008        """Print current time in human readable format"""
2009
2010        return time.strftime("%a %d %b %Y %H:%M:%S")
2011
2012def write_pidfile():
2013
2014        # Write pidfile if PIDFILE exists
2015        if PIDFILE:
2016
2017                pid     = os.getpid()
2018
2019                pidfile = open(PIDFILE, 'w')
2020
2021                pidfile.write( str( pid ) )
2022                pidfile.close()
2023
2024# Ooohh, someone started me! Let's go..
2025if __name__ == '__main__':
2026        main()
Note: See TracBrowser for help on using the repository browser.