source: trunk/jobarchived/jobarchived.py @ 377

Last change on this file since 377 was 375, checked in by bastiaans, 17 years ago

jobarchived/jobarchived.py:

  • backwards compatible for rrdtool pipes, use rrdtool module automaticly if available
  • Property svn:keywords set to Id
File size: 42.8 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 375 2007-06-13 13:54:25Z bastiaans $
22#
23
24DEFAULT_SEARCH_PATH     = '/usr/share/jobarchived'
25
26import sys
27
28if DEFAULT_SEARCH_PATH not in sys.path:
29
30        sys.path.append( DEFAULT_SEARCH_PATH )
31
32import getopt, syslog, ConfigParser
33
34def processArgs( args ):
35
36        SHORT_L = 'c:'
37        LONG_L  = 'config='
38
39        config_filename = None
40
41        try:
42
43                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
44
45        except getopt.error, detail:
46
47                print detail
48                sys.exit(1)
49
50        for opt, value in opts:
51
52                if opt in [ '--config', '-c' ]:
53
54                        config_filename = value
55
56        if not config_filename:
57
58                config_filename = '/etc/jobarchived.conf'
59
60        try:
61                return loadConfig( config_filename )
62
63        except ConfigParser.NoOptionError, detail:
64
65                print detail
66                sys.exit( 1 )
67
68def loadConfig( filename ):
69
70        def getlist( cfg_string ):
71
72                my_list = [ ]
73
74                for item_txt in cfg_string.split( ',' ):
75
76                        sep_char = None
77
78                        item_txt = item_txt.strip()
79
80                        for s_char in [ "'", '"' ]:
81
82                                if item_txt.find( s_char ) != -1:
83
84                                        if item_txt.count( s_char ) != 2:
85
86                                                print 'Missing quote: %s' %item_txt
87                                                sys.exit( 1 )
88
89                                        else:
90
91                                                sep_char = s_char
92                                                break
93
94                        if sep_char:
95
96                                item_txt = item_txt.split( sep_char )[1]
97
98                        my_list.append( item_txt )
99
100                return my_list
101
102        cfg = ConfigParser.ConfigParser()
103
104        cfg.read( filename )
105
106        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
107
108        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
109
110        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
111
112        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
113
114        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
115
116        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
117
118        MODRRDTOOL              = False
119
120        try:
121                import rrdtool
122
123                MODRRDTOOL              = True
124
125        except ImportError:
126
127                MODRRDTOOL              = False
128
129                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobmond significantly!" )
130
131        try:
132
133                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
134
135        except AttributeError, detail:
136
137                print 'Unknown syslog facility'
138                sys.exit( 1 )
139
140        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
141
142        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
143
144        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
145
146        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
147
148        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
149
150        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
151
152        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
153
154        RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
155
156        return True
157
158# What XML data types not to store
159#
160UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
161
162# Maximum time (in seconds) a parsethread may run
163#
164PARSE_TIMEOUT = 60
165
166# Maximum time (in seconds) a storethread may run
167#
168STORE_TIMEOUT = 360
169
170"""
171The Job Archiving Daemon
172"""
173
174from types import *
175
176import DBClass
177import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
178import rrdtool
179
180class DataSQLStore:
181
182        db_vars = None
183        dbc = None
184
185        def __init__( self, hostname, database ):
186
187                self.db_vars = DBClass.InitVars(DataBaseName=database,
188                                User='root',
189                                Host=hostname,
190                                Password='',
191                                Dictionary='true')
192
193                try:
194                        self.dbc     = DBClass.DB(self.db_vars)
195                except DBClass.DBError, details:
196                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
197                        sys.exit(1)
198
199        def setDatabase(self, statement):
200                ret = self.doDatabase('set', statement)
201                return ret
202               
203        def getDatabase(self, statement):
204                ret = self.doDatabase('get', statement)
205                return ret
206
207        def doDatabase(self, type, statement):
208
209                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
210                try:
211                        if type == 'set':
212                                result = self.dbc.Set( statement )
213                                self.dbc.Commit()
214                        elif type == 'get':
215                                result = self.dbc.Get( statement )
216                               
217                except DBClass.DBError, detail:
218                        operation = statement.split(' ')[0]
219                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
220                        sys.exit(1)
221
222                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
223                return result
224
225        def getJobNodeId( self, job_id, node_id ):
226
227                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
228                if len( id ) > 0:
229
230                        if len( id[0] ) > 0 and id[0] != '':
231                       
232                                return 1
233
234                return 0
235
236        def getNodeId( self, hostname ):
237
238                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
239
240                if len( id ) > 0:
241
242                        id = id[0][0]
243
244                        return id
245                else:
246                        return None
247
248        def getNodeIds( self, hostnames ):
249
250                ids = [ ]
251
252                for node in hostnames:
253
254                        id = self.getNodeId( node )
255
256                        if id:
257                                ids.append( id )
258
259                return ids
260
261        def getJobId( self, jobid ):
262
263                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
264
265                if id:
266                        id = id[0][0]
267
268                        return id
269                else:
270                        return None
271
272        def addJob( self, job_id, jobattrs ):
273
274                if not self.getJobId( job_id ):
275
276                        self.mutateJob( 'insert', job_id, jobattrs ) 
277                else:
278                        self.mutateJob( 'update', job_id, jobattrs )
279
280        def mutateJob( self, action, job_id, jobattrs ):
281
282                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
283
284                insert_col_str  = 'job_id'
285                insert_val_str  = "'%s'" %job_id
286                update_str      = None
287
288                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
289
290                ids = [ ]
291
292                for valname, value in jobattrs.items():
293
294                        if valname in job_values and value != '':
295
296                                column_name = 'job_' + valname
297
298                                if action == 'insert':
299
300                                        if not insert_col_str:
301                                                insert_col_str = column_name
302                                        else:
303                                                insert_col_str = insert_col_str + ',' + column_name
304
305                                        if not insert_val_str:
306                                                insert_val_str = value
307                                        else:
308                                                insert_val_str = insert_val_str + ",'%s'" %value
309
310                                elif action == 'update':
311                                       
312                                        if not update_str:
313                                                update_str = "%s='%s'" %(column_name, value)
314                                        else:
315                                                update_str = update_str + ",%s='%s'" %(column_name, value)
316
317                        elif valname == 'nodes' and value:
318
319                                node_valid = 1
320
321                                if len(value) == 1:
322                               
323                                        if jobattrs['status'] == 'Q':
324
325                                                node_valid = 0
326
327                                        else:
328
329                                                node_valid = 0
330
331                                                for node_char in str(value[0]):
332
333                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
334
335                                                                node_valid = 1
336
337                                if node_valid:
338
339                                        ids = self.addNodes( value, jobattrs['domain'] )
340
341                if action == 'insert':
342
343                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
344
345                elif action == 'update':
346
347                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
348
349                if len( ids ) > 0:
350                        self.addJobNodes( job_id, ids )
351
352        def addNodes( self, hostnames, domain ):
353
354                ids = [ ]
355
356                for node in hostnames:
357
358                        node    = '%s.%s' %( node, domain )
359                        id      = self.getNodeId( node )
360       
361                        if not id:
362                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
363                                id = self.getNodeId( node )
364
365                        ids.append( id )
366
367                return ids
368
369        def addJobNodes( self, jobid, nodes ):
370
371                for node in nodes:
372
373                        if not self.getJobNodeId( jobid, node ):
374
375                                self.addJobNode( jobid, node )
376
377        def addJobNode( self, jobid, nodeid ):
378
379                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
380
381        def storeJobInfo( self, jobid, jobattrs ):
382
383                self.addJob( jobid, jobattrs )
384
385        def checkStaleJobs( self ):
386
387                q = "SELECT * from jobs WHERE job_status != 'F'"
388
389                r = self.getDatabase( q )
390
391                if len( r ) == 0:
392
393                        return None
394
395                cleanjobs       = [ ]
396                timeoutjobs     = [ ]
397
398                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
399                cur_time        = time.time()
400
401                for row in r:
402
403                        job_id                  = row[0]
404                        job_requested_time      = row[4]
405                        job_status              = row[7]
406                        job_start_timestamp     = row[8]
407
408                        if job_status == 'Q' or not job_start_timestamp:
409
410                                cleanjobs.append( job_id )
411
412                        else:
413
414                                start_timestamp = int( job_start_timestamp )
415
416                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
417
418                                        if job_requested_time:
419
420                                                rtime_epoch     = reqtime2epoch( job_requested_time )
421                                        else:
422                                                rtime_epoch     = None
423                                       
424                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
425
426                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
427
428                for j in cleanjobs:
429
430                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
431                        self.setDatabase( q )
432
433                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
434
435                for j in timeoutjobs:
436
437                        ( i, s, r )             = j
438
439                        if r:
440                                new_end_timestamp       = int( s ) + r
441
442                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
443                        self.setDatabase( q )
444
445class RRDMutator:
446        """A class for performing RRD mutations"""
447
448        binary = None
449
450
451        def __init__( self, binary=None ):
452                """Set alternate binary if supplied"""
453
454                if binary:
455                        self.binary = binary
456
457        def create( self, filename, args ):
458                """Create a new rrd with args"""
459
460                global MODRRDTOOL
461
462                if MODRRDTOOL:
463                        return self.perform( 'create', filename, args )
464                else:
465                        return self.perform( 'create', '"' + filename + '"', args )
466
467        def update( self, filename, args ):
468                """Update a rrd with args"""
469
470                global MODRRDTOOL
471
472                if MODRRDTOOL:
473                        return self.perform( 'update', filename, args )
474                else:
475                        return self.perform( 'update', '"' + filename + '"', args )
476
477        def grabLastUpdate( self, filename ):
478                """Determine the last update time of filename rrd"""
479
480                global MODRRDTOOL
481
482                last_update = 0
483
484                if MODRRDTOOL:
485
486                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
487
488                        rrd_header      = { }
489
490                        try:
491                                rrd_header      = rrdtool.info( filename )
492                        except rrdtool.error, msg:
493                                debug_msg( 8, str( msg ) )
494
495                        if rrd_header.has_key( 'last_update' ):
496                                return last_update
497                        else:
498                                return 0
499
500                else:
501                        debug_msg( 8, self.binary + ' info ' + filename )
502
503                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
504
505                        for line in my_pipe.readlines():
506
507                                if line.find( 'last_update') != -1:
508
509                                        last_update = line.split( ' = ' )[1]
510
511                        if my_pipe:
512
513                                my_pipe.close()
514
515                        if last_update:
516                                return last_update
517                        else:
518                                return 0
519
520
521        def perform( self, action, filename, args ):
522                """Perform action on rrd filename with args"""
523
524                global MODRRDTOOL
525
526                arg_string = None
527
528                if type( args ) is not ListType:
529                        debug_msg( 8, 'Arguments needs to be of type List' )
530                        return 1
531
532                for arg in args:
533
534                        if not arg_string:
535
536                                arg_string = arg
537                        else:
538                                arg_string = arg_string + ' ' + arg
539
540                if MODRRDTOOL:
541
542                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
543
544                        try:
545                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
546
547                                if action == 'create':
548
549                                        rrdtool.create( str( filename ), *args )
550
551                                elif action == 'update':
552
553                                        rrdtool.update( str( filename ), *args )
554
555                        except rrdtool.error, msg:
556
557                                error_msg = str( msg )
558                                debug_msg( 8, error_msg )
559                                return 1
560
561                else:
562
563                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
564
565                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
566                        lines   = cmd.readlines()
567
568                        cmd.close()
569
570                        for line in lines:
571
572                                if line.find( 'ERROR' ) != -1:
573
574                                        error_msg = string.join( line.split( ' ' )[1:] )
575                                        debug_msg( 8, error_msg )
576                                        return 1
577
578                return 0
579
580class XMLProcessor:
581        """Skeleton class for XML processor's"""
582
583        def run( self ):
584                """Do main processing of XML here"""
585
586                pass
587
588class TorqueXMLProcessor( XMLProcessor ):
589        """Main class for processing XML and acting with it"""
590
591        def __init__( self, XMLSource, DataStore ):
592                """Setup initial XML connection and handlers"""
593
594                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
595                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
596                self.myXMLSource        = XMLSource
597                self.myXMLHandler       = TorqueXMLHandler( DataStore )
598                self.myXMLError         = XMLErrorHandler()
599
600                self.config             = GangliaConfigParser( GMETAD_CONF )
601
602        def run( self ):
603                """Main XML processing"""
604
605                debug_msg( 1, 'torque_xml_thread(): started.' )
606
607                while( 1 ):
608
609                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
610                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
611
612                        my_data = self.myXMLSource.getData()
613
614                        try:
615                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
616                        except socket.error, msg:
617                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
618                               
619                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
620                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
621                        time.sleep( self.config.getLowestInterval() )
622
623class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
624        """Parse Torque's jobinfo XML from our plugin"""
625
626        jobAttrs = { }
627
628        def __init__( self, datastore ):
629
630                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
631                self.ds                 = datastore
632                self.jobs_processed     = [ ]
633                self.jobs_to_store      = [ ]
634
635        def startDocument( self ):
636
637                self.heartbeat  = 0
638                self.elementct  = 0
639
640        def startElement( self, name, attrs ):
641                """
642                This XML will be all gmetric XML
643                so there will be no specific start/end element
644                just one XML statement with all info
645                """
646               
647                jobinfo = { }
648
649                self.elementct  += 1
650
651                if name == 'CLUSTER':
652
653                        self.clustername = str( attrs.get( 'NAME', "" ) )
654
655                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
656
657                        metricname = str( attrs.get( 'NAME', "" ) )
658
659                        if metricname == 'MONARCH-HEARTBEAT':
660                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
661
662                        elif metricname.find( 'MONARCH-JOB' ) != -1:
663
664                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
665                                val     = str( attrs.get( 'VAL', "" ) )
666
667                                if not job_id in self.jobs_processed:
668
669                                        self.jobs_processed.append( job_id )
670
671                                check_change = 0
672
673                                if self.jobAttrs.has_key( job_id ):
674
675                                        check_change = 1
676
677                                valinfo = val.split( ' ' )
678
679                                for myval in valinfo:
680
681                                        if len( myval.split( '=' ) ) > 1:
682
683                                                valname = myval.split( '=' )[0]
684                                                value   = myval.split( '=' )[1]
685
686                                                if valname == 'nodes':
687                                                        value = value.split( ';' )
688
689                                                jobinfo[ valname ] = value
690
691                                if check_change:
692                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
693                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
694                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
695                                                if not job_id in self.jobs_to_store:
696                                                        self.jobs_to_store.append( job_id )
697
698                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
699                                else:
700                                        self.jobAttrs[ job_id ] = jobinfo
701
702                                        if not job_id in self.jobs_to_store:
703                                                self.jobs_to_store.append( job_id )
704
705                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
706                                       
707        def endDocument( self ):
708                """When all metrics have gone, check if any jobs have finished"""
709
710                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
711
712                if self.heartbeat:
713                        for jobid, jobinfo in self.jobAttrs.items():
714
715                                # This is an old job, not in current jobinfo list anymore
716                                # it must have finished, since we _did_ get a new heartbeat
717                                #
718                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
719
720                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
721
722                                        if not jobid in self.jobs_processed:
723                                                self.jobs_processed.append( jobid )
724
725                                        self.jobAttrs[ jobid ]['status'] = 'F'
726                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
727
728                                        if not jobid in self.jobs_to_store:
729                                                self.jobs_to_store.append( jobid )
730
731                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
732
733                        for jobid in self.jobs_to_store:
734                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
735
736                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
737
738                                        if self.jobAttrs[ jobid ]['status'] == 'F':
739                                                del self.jobAttrs[ jobid ]
740
741                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
742
743                        self.jobs_processed     = [ ]
744                        self.jobs_to_store      = [ ]
745
746        def setJobAttrs( self, old, new ):
747                """
748                Set new job attributes in old, but not lose existing fields
749                if old attributes doesn't have those
750                """
751
752                for valname, value in new.items():
753                        old[ valname ] = value
754
755                return old
756               
757
758        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
759                """
760                Check if jobinfo has changed from jobattrs[jobid]
761                if it's report time is bigger than previous one
762                and it is report time is recent (equal to heartbeat)
763                """
764
765                ignore_changes = [ 'reported' ]
766
767                if jobattrs.has_key( jobid ):
768
769                        for valname, value in jobinfo.items():
770
771                                if valname not in ignore_changes:
772
773                                        if jobattrs[ jobid ].has_key( valname ):
774
775                                                if value != jobattrs[ jobid ][ valname ]:
776
777                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
778                                                                return True
779
780                                        else:
781                                                return True
782
783                return False
784
785class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
786        """Parse Ganglia's XML"""
787
788        def __init__( self, config, datastore ):
789                """Setup initial variables and gather info on existing rrd archive"""
790
791                self.config     = config
792                self.clusters   = { }
793                self.ds         = datastore
794
795                debug_msg( 1, 'Checking database..' )
796
797                global DEBUG_LEVEL
798
799                if DEBUG_LEVEL <= 2:
800                        self.ds.checkStaleJobs()
801
802                debug_msg( 1, 'Check done.' )
803                debug_msg( 1, 'Checking rrd archive..' )
804                self.gatherClusters()
805                debug_msg( 1, 'Check done.' )
806
807        def gatherClusters( self ):
808                """Find all existing clusters in archive dir"""
809
810                archive_dir     = check_dir(ARCHIVE_PATH)
811
812                hosts           = [ ]
813
814                if os.path.exists( archive_dir ):
815
816                        dirlist = os.listdir( archive_dir )
817
818                        for cfgcluster in ARCHIVE_DATASOURCES:
819
820                                if cfgcluster not in dirlist:
821
822                                        # Autocreate a directory for this cluster
823                                        # assume it is new
824                                        #
825                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
826
827                                        os.mkdir( cluster_dir )
828
829                                        dirlist.append( cfgcluster )
830
831                        for item in dirlist:
832
833                                clustername = item
834
835                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
836
837                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
838
839                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
840
841        def startElement( self, name, attrs ):
842                """Memorize appropriate data from xml start tags"""
843
844                if name == 'GANGLIA_XML':
845
846                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
847                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
848
849                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
850
851                elif name == 'GRID':
852
853                        self.gridName   = str( attrs.get( 'NAME', "" ) )
854                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
855
856                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
857
858                elif name == 'CLUSTER':
859
860                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
861                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
862
863                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
864
865                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
866
867                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
868
869                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
870
871                        self.hostName           = str( attrs.get( 'NAME', "" ) )
872                        self.hostIp             = str( attrs.get( 'IP', "" ) )
873                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
874
875                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
876
877                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
878
879                        type = str( attrs.get( 'TYPE', "" ) )
880                       
881                        exclude_metric = False
882                       
883                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
884
885                                orig_name = str( attrs.get( 'NAME', "" ) )
886
887                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
888                               
889                                        exclude_metric = True
890
891                                elif re.match( ex_metricstr, orig_name ):
892
893                                        exclude_metric = True
894
895                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
896
897                                myMetric                = { }
898                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
899                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
900                                myMetric['time']        = self.hostReported
901
902                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
903
904                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
905
906        def storeMetrics( self ):
907                """Store metrics of each cluster rrd handler"""
908
909                for clustername, rrdh in self.clusters.items():
910
911                        ret = rrdh.storeMetrics()
912
913                        if ret:
914                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
915                                return 1
916
917                return 0
918
919class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
920
921        def error( self, exception ):
922                """Recoverable error"""
923
924                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
925
926        def fatalError( self, exception ):
927                """Non-recoverable error"""
928
929                exception_str = str( exception )
930
931                # Ignore 'no element found' errors
932                if exception_str.find( 'no element found' ) != -1:
933                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
934                        return 0
935
936                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
937                sys.exit( 1 )
938
939        def warning( self, exception ):
940                """Warning"""
941
942                debug_msg( 0, 'Warning ' + str( exception ) )
943
944class XMLGatherer:
945        """Setup a connection and file object to Ganglia's XML"""
946
947        s               = None
948        fd              = None
949        data            = None
950        slot            = None
951
952        # Time since the last update
953        #
954        LAST_UPDATE     = 0
955
956        # Minimum interval between updates
957        #
958        MIN_UPDATE_INT  = 10
959
960        # Is a update occuring now
961        #
962        update_now      = False
963
964        def __init__( self, host, port ):
965                """Store host and port for connection"""
966
967                self.host       = host
968                self.port       = port
969                self.slot       = threading.Lock()
970
971                self.retrieveData()
972
973        def retrieveData( self ):
974                """Setup connection to XML source"""
975
976                self.update_now = True
977
978                self.slot.acquire()
979
980                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
981
982                        af, socktype, proto, canonname, sa = res
983
984                        try:
985
986                                self.s = socket.socket( af, socktype, proto )
987
988                        except socket.error, msg:
989
990                                self.s = None
991                                continue
992
993                        try:
994
995                                self.s.connect( sa )
996
997                        except socket.error, msg:
998
999                                self.disconnect()
1000                                continue
1001
1002                        break
1003
1004                if self.s is None:
1005
1006                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1007                        self.update_now = False
1008                        sys.exit( 1 )
1009
1010                else:
1011                        #self.s.send( '\n' )
1012
1013                        my_fp                   = self.s.makefile( 'r' )
1014                        my_data                 = my_fp.readlines()
1015                        my_data                 = string.join( my_data, '' )
1016
1017                        self.data               = my_data
1018
1019                        self.LAST_UPDATE        = time.time()
1020
1021                self.slot.release()
1022
1023                self.update_now = False
1024
1025        def disconnect( self ):
1026                """Close socket"""
1027
1028                if self.s:
1029                        #self.s.shutdown( 2 )
1030                        self.s.close()
1031                        self.s = None
1032
1033        def __del__( self ):
1034                """Kill the socket before we leave"""
1035
1036                self.disconnect()
1037
1038        def reGetData( self ):
1039                """Reconnect"""
1040
1041                while self.update_now:
1042
1043                        # Must be another update in progress:
1044                        # Wait until the update is complete
1045                        #
1046                        time.sleep( 1 )
1047
1048                if self.s:
1049                        self.disconnect()
1050
1051                self.retrieveData()
1052
1053        def getData( self ):
1054
1055                """Return the XML data"""
1056
1057                # If more than MIN_UPDATE_INT seconds passed since last data update
1058                # update the XML first before returning it
1059                #
1060
1061                cur_time        = time.time()
1062
1063                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1064
1065                        self.reGetData()
1066
1067                while self.update_now:
1068
1069                        # Must be another update in progress:
1070                        # Wait until the update is complete
1071                        #
1072                        time.sleep( 1 )
1073                       
1074                return self.data
1075
1076        def makeFileDescriptor( self ):
1077                """Make file descriptor that points to our socket connection"""
1078
1079                self.reconnect()
1080
1081                if self.s:
1082                        self.fd = self.s.makefile( 'r' )
1083
1084        def getFileObject( self ):
1085                """Connect, and return a file object"""
1086
1087                self.makeFileDescriptor()
1088
1089                if self.fd:
1090                        return self.fd
1091
1092class GangliaXMLProcessor( XMLProcessor ):
1093        """Main class for processing XML and acting with it"""
1094
1095        def __init__( self, XMLSource, DataStore ):
1096                """Setup initial XML connection and handlers"""
1097
1098                self.config             = GangliaConfigParser( GMETAD_CONF )
1099
1100                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1101                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1102                self.myXMLSource        = XMLSource
1103                self.ds                 = DataStore
1104                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
1105                self.myXMLError         = XMLErrorHandler()
1106
1107        def run( self ):
1108                """Main XML processing; start a xml and storethread"""
1109
1110                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1111                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1112
1113                while( 1 ):
1114
1115                        if not xml_thread.isAlive():
1116                                # Gather XML at the same interval as gmetad
1117
1118                                # threaded call to: self.processXML()
1119                                #
1120                                try:
1121                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1122                                        xml_thread.start()
1123                                except thread.error, msg:
1124                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1125                                        #return 1
1126
1127                        if not store_thread.isAlive():
1128                                # Store metrics every .. sec
1129
1130                                # threaded call to: self.storeMetrics()
1131                                #
1132                                try:
1133                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1134                                        store_thread.start()
1135                                except thread.error, msg:
1136                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1137                                        #return 1
1138               
1139                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1140                        time.sleep( 1 ) 
1141
1142        def storeMetrics( self ):
1143                """Store metrics retained in memory to disk"""
1144
1145                global DEBUG_LEVEL
1146
1147                # Store metrics somewhere between every 360 and 640 seconds
1148                #
1149                if DEBUG_LEVEL > 2:
1150                        #STORE_INTERVAL = 60
1151                        STORE_INTERVAL = random.randint( 360, 640 )
1152                else:
1153                        STORE_INTERVAL = random.randint( 360, 640 )
1154
1155                try:
1156                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1157                        store_metric_thread.start()
1158                except thread.error, msg:
1159                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1160                        return 1
1161
1162                debug_msg( 1, 'ganglia_store_thread(): started.' )
1163
1164                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1165                time.sleep( STORE_INTERVAL )
1166                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
1167
1168                if store_metric_thread.isAlive():
1169
1170                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
1171                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1172                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
1173
1174                debug_msg( 1, 'ganglia_store_thread(): finished.' )
1175
1176                return 0
1177
1178        def storeThread( self ):
1179                """Actual metric storing thread"""
1180
1181                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1182                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
1183                ret = self.myXMLHandler.storeMetrics()
1184                if ret > 0:
1185                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
1186                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1187                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1188               
1189                return 0
1190
1191        def processXML( self ):
1192                """Process XML"""
1193
1194                try:
1195                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1196                        parsethread.start()
1197                except thread.error, msg:
1198                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1199                        return 1
1200
1201                debug_msg( 1, 'ganglia_xml_thread(): started.' )
1202
1203                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1204                time.sleep( float( self.config.getLowestInterval() ) ) 
1205                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
1206
1207                if parsethread.isAlive():
1208
1209                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
1210                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1211                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
1212
1213                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
1214
1215                return 0
1216
1217        def parseThread( self ):
1218                """Actual parsing thread"""
1219
1220                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1221                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1222                #self.myXMLSource = self.myXMLGatherer.getFileObject()
1223               
1224                my_data = self.myXMLSource.getData()
1225
1226                #print my_data
1227
1228                try:
1229                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1230                except socket.error, msg:
1231                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1232
1233                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1234                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
1235
1236                return 0
1237
1238class GangliaConfigParser:
1239
1240        sources = [ ]
1241
1242        def __init__( self, config ):
1243                """Parse some stuff from our gmetad's config, such as polling interval"""
1244
1245                self.config = config
1246                self.parseValues()
1247
1248        def parseValues( self ):
1249                """Parse certain values from gmetad.conf"""
1250
1251                readcfg = open( self.config, 'r' )
1252
1253                for line in readcfg.readlines():
1254
1255                        if line.count( '"' ) > 1:
1256
1257                                if line.find( 'data_source' ) != -1 and line[0] != '#':
1258
1259                                        source          = { }
1260                                        source['name']  = line.split( '"' )[1]
1261                                        source_words    = line.split( '"' )[2].split( ' ' )
1262
1263                                        for word in source_words:
1264
1265                                                valid_interval = 1
1266
1267                                                for letter in word:
1268
1269                                                        if letter not in string.digits:
1270
1271                                                                valid_interval = 0
1272
1273                                                if valid_interval and len(word) > 0:
1274
1275                                                        source['interval'] = word
1276                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
1277       
1278                                        # No interval found, use Ganglia's default     
1279                                        if not source.has_key( 'interval' ):
1280                                                source['interval'] = 15
1281                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1282
1283                                        self.sources.append( source )
1284
1285        def getInterval( self, source_name ):
1286                """Return interval for source_name"""
1287
1288                for source in self.sources:
1289
1290                        if source['name'] == source_name:
1291
1292                                return source['interval']
1293
1294                return None
1295
1296        def getLowestInterval( self ):
1297                """Return the lowest interval of all clusters"""
1298
1299                lowest_interval = 0
1300
1301                for source in self.sources:
1302
1303                        if not lowest_interval or source['interval'] <= lowest_interval:
1304
1305                                lowest_interval = source['interval']
1306
1307                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1308                if lowest_interval:
1309                        return lowest_interval
1310                else:
1311                        return 15
1312
1313class RRDHandler:
1314        """Class for handling RRD activity"""
1315
1316        myMetrics = { }
1317        lastStored = { }
1318        timeserials = { }
1319        slot = None
1320
1321        def __init__( self, config, cluster ):
1322                """Setup initial variables"""
1323
1324                self.block      = 0
1325                self.cluster    = cluster
1326                self.config     = config
1327                self.slot       = threading.Lock()
1328                self.rrdm       = RRDMutator( RRDTOOL )
1329
1330                global DEBUG_LEVEL
1331
1332                if DEBUG_LEVEL <= 2:
1333                        self.gatherLastUpdates()
1334
1335        def gatherLastUpdates( self ):
1336                """Populate the lastStored list, containing timestamps of all last updates"""
1337
1338                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1339
1340                hosts = [ ]
1341
1342                if os.path.exists( cluster_dir ):
1343
1344                        dirlist = os.listdir( cluster_dir )
1345
1346                        for dir in dirlist:
1347
1348                                hosts.append( dir )
1349
1350                for host in hosts:
1351
1352                        host_dir        = cluster_dir + '/' + host
1353                        dirlist         = os.listdir( host_dir )
1354
1355                        for dir in dirlist:
1356
1357                                if not self.timeserials.has_key( host ):
1358
1359                                        self.timeserials[ host ] = [ ]
1360
1361                                self.timeserials[ host ].append( dir )
1362
1363                        last_serial = self.getLastRrdTimeSerial( host )
1364
1365                        if last_serial:
1366
1367                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1368
1369                                if os.path.exists( metric_dir ):
1370
1371                                        dirlist = os.listdir( metric_dir )
1372
1373                                        for file in dirlist:
1374
1375                                                metricname = file.split( '.rrd' )[0]
1376
1377                                                if not self.lastStored.has_key( host ):
1378
1379                                                        self.lastStored[ host ] = { }
1380
1381                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1382
1383        def getClusterName( self ):
1384                """Return clustername"""
1385
1386                return self.cluster
1387
1388        def memMetric( self, host, metric ):
1389                """Store metric from host in memory"""
1390
1391                # <ATOMIC>
1392                #
1393                self.slot.acquire()
1394               
1395                if self.myMetrics.has_key( host ):
1396
1397                        if self.myMetrics[ host ].has_key( metric['name'] ):
1398
1399                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1400
1401                                        if mymetric['time'] == metric['time']:
1402
1403                                                # Allready have this metric, abort
1404                                                self.slot.release()
1405                                                return 1
1406                        else:
1407                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1408                else:
1409                        self.myMetrics[ host ]                          = { }
1410                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
1411
1412                # Push new metric onto stack
1413                # atomic code; only 1 thread at a time may access the stack
1414
1415                self.myMetrics[ host ][ metric['name'] ].append( metric )
1416
1417                self.slot.release()
1418                #
1419                # </ATOMIC>
1420
1421        def makeUpdateList( self, host, metriclist ):
1422                """
1423                Make a list of update values for rrdupdate
1424                but only those that we didn't store before
1425                """
1426
1427                update_list     = [ ]
1428                metric          = None
1429
1430                while len( metriclist ) > 0:
1431
1432                        metric = metriclist.pop( 0 )
1433
1434                        if self.checkStoreMetric( host, metric ):
1435
1436                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1437                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1438                                update_list.append( u_val )
1439
1440                return update_list
1441
1442        def checkStoreMetric( self, host, metric ):
1443                """Check if supplied metric if newer than last one stored"""
1444
1445                if self.lastStored.has_key( host ):
1446
1447                        if self.lastStored[ host ].has_key( metric['name'] ):
1448
1449                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1450
1451                                        # This is old
1452                                        return 0
1453
1454                return 1
1455
1456        def memLastUpdate( self, host, metricname, metriclist ):
1457                """
1458                Memorize the time of the latest metric from metriclist
1459                but only if it wasn't allready memorized
1460                """
1461
1462                if not self.lastStored.has_key( host ):
1463                        self.lastStored[ host ] = { }
1464
1465                last_update_time = 0
1466
1467                for metric in metriclist:
1468
1469                        if metric['name'] == metricname:
1470
1471                                if metric['time'] > last_update_time:
1472
1473                                        last_update_time = metric['time']
1474
1475                if self.lastStored[ host ].has_key( metricname ):
1476                       
1477                        if last_update_time <= self.lastStored[ host ][ metricname ]:
1478                                return 1
1479
1480                self.lastStored[ host ][ metricname ] = last_update_time
1481
1482        def storeMetrics( self ):
1483                """
1484                Store all metrics from memory to disk
1485                and do it to the RRD's in appropriate timeperiod directory
1486                """
1487
1488                debug_msg( 5, "Entering storeMetrics()")
1489
1490                count_values    = 0
1491                count_metrics   = 0
1492                count_bits      = 0
1493
1494                for hostname, mymetrics in self.myMetrics.items():     
1495
1496                        for metricname, mymetric in mymetrics.items():
1497
1498                                count_metrics += 1
1499
1500                                for dmetric in mymetric:
1501
1502                                        count_values += 1
1503
1504                                        count_bits      += len( dmetric['time'] )
1505                                        count_bits      += len( dmetric['val'] )
1506
1507                count_bytes     = count_bits / 8
1508
1509                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1510                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1511                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1512                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1513
1514                for hostname, mymetrics in self.myMetrics.items():     
1515
1516                        for metricname, mymetric in mymetrics.items():
1517
1518                                metrics_to_store = [ ]
1519
1520                                # Pop metrics from stack for storing until none is left
1521                                # atomic code: only 1 thread at a time may access myMetrics
1522
1523                                # <ATOMIC>
1524                                #
1525                                self.slot.acquire() 
1526
1527                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1528
1529                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1530
1531                                                try:
1532                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1533                                                except IndexError, msg:
1534
1535                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1536                                                        # is still len 0 when the statement is executed.
1537                                                        # Just ignore indexerror's..
1538                                                        pass
1539
1540                                self.slot.release()
1541                                #
1542                                # </ATOMIC>
1543
1544                                # Create a mapping table, each metric to the period where it should be stored
1545                                #
1546                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1547
1548                                update_rets = [ ]
1549
1550                                for period, pmetric in metric_serial_table.items():
1551
1552                                        create_ret = self.createCheck( hostname, metricname, period )   
1553
1554                                        update_ret = self.update( hostname, metricname, period, pmetric )
1555
1556                                        if update_ret == 0:
1557
1558                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1559                                        else:
1560                                                debug_msg( 9, 'metric update failed' )
1561
1562                                        update_rets.append( create_ret )
1563                                        update_rets.append( update_ret )
1564
1565                                # Lets ignore errors here for now, we need to make sure last update time
1566                                # is correct!
1567                                #
1568                                #if not (1) in update_rets:
1569
1570                                self.memLastUpdate( hostname, metricname, metrics_to_store )
1571
1572                debug_msg( 5, "Leaving storeMetrics()")
1573
1574        def makeTimeSerial( self ):
1575                """Generate a time serial. Seconds since epoch"""
1576
1577                # Seconds since epoch
1578                mytime = int( time.time() )
1579
1580                return mytime
1581
1582        def makeRrdPath( self, host, metricname, timeserial ):
1583                """Make a RRD location/path and filename"""
1584
1585                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1586                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
1587
1588                return rrd_dir, rrd_file
1589
1590        def getLastRrdTimeSerial( self, host ):
1591                """Find the last timeserial (directory) for this host"""
1592
1593                newest_timeserial = 0
1594
1595                for dir in self.timeserials[ host ]:
1596
1597                        valid_dir = 1
1598
1599                        for letter in dir:
1600                                if letter not in string.digits:
1601                                        valid_dir = 0
1602
1603                        if valid_dir:
1604                                timeserial = dir
1605                                if timeserial > newest_timeserial:
1606                                        newest_timeserial = timeserial
1607
1608                if newest_timeserial:
1609                        return newest_timeserial
1610                else:
1611                        return 0
1612
1613        def determinePeriod( self, host, check_serial ):
1614                """Determine to which period (directory) this time(serial) belongs"""
1615
1616                period_serial = 0
1617
1618                if self.timeserials.has_key( host ):
1619
1620                        for serial in self.timeserials[ host ]:
1621
1622                                if check_serial >= serial and period_serial < serial:
1623
1624                                        period_serial = serial
1625
1626                return period_serial
1627
1628        def determineSerials( self, host, metricname, metriclist ):
1629                """
1630                Determine the correct serial and corresponding rrd to store
1631                for a list of metrics
1632                """
1633
1634                metric_serial_table = { }
1635
1636                for metric in metriclist:
1637
1638                        if metric['name'] == metricname:
1639
1640                                period          = self.determinePeriod( host, metric['time'] ) 
1641
1642                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1643
1644                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1645
1646                                        # This one should get it's own new period
1647                                        period = metric['time']
1648
1649                                        if not self.timeserials.has_key( host ):
1650                                                self.timeserials[ host ] = [ ]
1651
1652                                        self.timeserials[ host ].append( period )
1653
1654                                if not metric_serial_table.has_key( period ):
1655
1656                                        metric_serial_table[ period ] = [ ]
1657
1658                                metric_serial_table[ period ].append( metric )
1659
1660                return metric_serial_table
1661
1662        def createCheck( self, host, metricname, timeserial ):
1663                """Check if an rrd allready exists for this metric, create if not"""
1664
1665                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1666               
1667                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1668
1669                if not os.path.exists( rrd_dir ):
1670
1671                        try:
1672                                os.makedirs( rrd_dir )
1673
1674                        except os.OSError, msg:
1675
1676                                if msg.find( 'File exists' ) != -1:
1677
1678                                        # Ignore exists errors
1679                                        pass
1680
1681                                else:
1682
1683                                        print msg
1684                                        return
1685
1686                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
1687
1688                if not os.path.exists( rrd_file ):
1689
1690                        interval        = self.config.getInterval( self.cluster )
1691                        heartbeat       = 8 * int( interval )
1692
1693                        params          = [ ]
1694
1695                        params.append( '--step' )
1696                        params.append( str( interval ) )
1697
1698                        params.append( '--start' )
1699                        params.append( str( int( timeserial ) - 1 ) )
1700
1701                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1702                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
1703
1704                        self.rrdm.create( str(rrd_file), params )
1705
1706                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1707
1708        def update( self, host, metricname, timeserial, metriclist ):
1709                """
1710                Update rrd file for host with metricname
1711                in directory timeserial with metriclist
1712                """
1713
1714                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1715
1716                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
1717
1718                update_list             = self.makeUpdateList( host, metriclist )
1719
1720                if len( update_list ) > 0:
1721                        ret = self.rrdm.update( str(rrd_file), update_list )
1722
1723                        if ret:
1724                                return 1
1725               
1726                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
1727
1728                return 0
1729
1730def daemon():
1731        """daemonized threading"""
1732
1733        # Fork the first child
1734        #
1735        pid = os.fork()
1736
1737        if pid > 0:
1738
1739                sys.exit(0)  # end parent
1740
1741        # creates a session and sets the process group ID
1742        #
1743        os.setsid()
1744
1745        # Fork the second child
1746        #
1747        pid = os.fork()
1748
1749        if pid > 0:
1750
1751                sys.exit(0)  # end parent
1752
1753        # Go to the root directory and set the umask
1754        #
1755        os.chdir('/')
1756        os.umask(0)
1757
1758        sys.stdin.close()
1759        sys.stdout.close()
1760        sys.stderr.close()
1761
1762        os.open('/dev/null', os.O_RDWR)
1763        os.dup2(0, 1)
1764        os.dup2(0, 2)
1765
1766        run()
1767
1768def run():
1769        """Threading start"""
1770
1771        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1772        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
1773
1774        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1775        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
1776
1777        try:
1778                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1779                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
1780
1781                torque_xml_thread.start()
1782                ganglia_xml_thread.start()
1783               
1784        except thread.error, msg:
1785                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1786                syslog.closelog()
1787                sys.exit(1)
1788               
1789        debug_msg( 0, 'main threading started.' )
1790
1791def main():
1792        """Program startup"""
1793
1794        global DAEMONIZE, USE_SYSLOG
1795
1796        if not processArgs( sys.argv[1:] ):
1797                sys.exit( 1 )
1798
1799        if( DAEMONIZE and USE_SYSLOG ):
1800                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1801
1802        if DAEMONIZE:
1803                daemon()
1804        else:
1805                run()
1806
1807#
1808# Global functions
1809#
1810
1811def check_dir( directory ):
1812        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
1813
1814        if directory[-1] == '/':
1815                directory = directory[:-1]
1816
1817        return directory
1818
1819def reqtime2epoch( rtime ):
1820
1821        (hours, minutes, seconds )      = rtime.split( ':' )
1822
1823        etime   = int(seconds)
1824        etime   = etime + ( int(minutes) * 60 )
1825        etime   = etime + ( int(hours) * 60 * 60 )
1826
1827        return etime
1828
1829def debug_msg( level, msg ):
1830        """Only print msg if correct levels"""
1831
1832        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1833                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1834       
1835        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1836                syslog.syslog( msg )
1837
1838def printTime( ):
1839        """Print current time in human readable format"""
1840
1841        return time.strftime("%a %d %b %Y %H:%M:%S")
1842
1843# Ooohh, someone started me! Let's go..
1844if __name__ == '__main__':
1845        main()
Note: See TracBrowser for help on using the repository browser.