source: trunk/jobarchived/jobarchived.py @ 265

Last change on this file since 265 was 230, checked in by bastiaans, 18 years ago

INSTALL:

  • changed pbs_python req to 2.8.2

jobarchived/jobarchived.py:

  • added svn keyword propset
  • Property svn:keywords set to Id
File size: 36.3 KB
RevLine 
[3]1#!/usr/bin/env python
[225]2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
[230]21# SVN $Id: jobarchived.py 230 2006-03-10 16:31:36Z bastiaans $
22#
[3]23
[214]24import getopt, syslog, ConfigParser, sys
[3]25
[214]26def processArgs( args ):
[6]27
[214]28        SHORT_L = 'c:'
29        LONG_L = 'config='
[169]30
[214]31        config_filename = None
[169]32
[214]33        try:
[169]34
[214]35                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
[9]36
[214]37        except getopt.error, detail:
[60]38
[214]39                print detail
40                sys.exit(1)
[9]41
[214]42        for opt, value in opts:
[60]43
[214]44                if opt in [ '--config', '-c' ]:
[13]45
[214]46                        config_filename = value
[198]47
[214]48        if not config_filename:
[60]49
[214]50                config_filename = '/etc/jobarchived.conf'
[22]51
[214]52        try:
53                return loadConfig( config_filename )
[13]54
[214]55        except ConfigParser.NoOptionError, detail:
56
57                print detail
58                sys.exit( 1 )
59
60def loadConfig( filename ):
61
62        def getlist( cfg_string ):
63
64                my_list = [ ]
65
66                for item_txt in cfg_string.split( ',' ):
67
68                        sep_char = None
69
70                        item_txt = item_txt.strip()
71
72                        for s_char in [ "'", '"' ]:
73
74                                if item_txt.find( s_char ) != -1:
75
76                                        if item_txt.count( s_char ) != 2:
77
78                                                print 'Missing quote: %s' %item_txt
79                                                sys.exit( 1 )
80
81                                        else:
82
83                                                sep_char = s_char
84                                                break
85
86                        if sep_char:
87
88                                item_txt = item_txt.split( sep_char )[1]
89
90                        my_list.append( item_txt )
91
92                return my_list
93
94        cfg = ConfigParser.ConfigParser()
95
96        cfg.read( filename )
97
[224]98        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL
[214]99
100        ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
101
102        ARCHIVE_HOURS_PER_RRD = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
103
104        DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
105
106        USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
107
108        SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
109
110        try:
111
112                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
113
114        except AttributeError, detail:
115
116                print 'Unknown syslog facility'
117                sys.exit( 1 )
118
119        GMETAD_CONF = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
120
121        ARCHIVE_XMLSOURCE = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
122
123        ARCHIVE_DATASOURCES = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
124
125        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
126
127        JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
128
129        DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
130
[224]131        RRDTOOL = cfg.get( 'DEFAULT', 'RRDTOOL' )
132
[214]133        return True
134
[17]135# What XML data types not to store
[13]136#
[17]137UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
[9]138
[47]139# Maximum time (in seconds) a parsethread may run
140#
141PARSE_TIMEOUT = 60
142
143# Maximum time (in seconds) a storethread may run
144#
145STORE_TIMEOUT = 360
146
[8]147"""
[224]148The Job Archiving Daemon
[8]149"""
150
[214]151from types import *
152
153import DBClass
154import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
155
[84]156class DataSQLStore:
157
158        db_vars = None
159        dbc = None
160
161        def __init__( self, hostname, database ):
162
163                self.db_vars = DBClass.InitVars(DataBaseName=database,
164                                User='root',
165                                Host=hostname,
166                                Password='',
167                                Dictionary='true')
168
169                try:
170                        self.dbc     = DBClass.DB(self.db_vars)
171                except DBClass.DBError, details:
[169]172                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
[84]173                        sys.exit(1)
174
175        def setDatabase(self, statement):
176                ret = self.doDatabase('set', statement)
177                return ret
178               
179        def getDatabase(self, statement):
180                ret = self.doDatabase('get', statement)
181                return ret
182
183        def doDatabase(self, type, statement):
184
185                debug_msg( 6, 'doDatabase(): %s: %s' %(type, statement) )
186                try:
187                        if type == 'set':
188                                result = self.dbc.Set( statement )
189                                self.dbc.Commit()
190                        elif type == 'get':
191                                result = self.dbc.Get( statement )
192                               
193                except DBClass.DBError, detail:
194                        operation = statement.split(' ')[0]
[169]195                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
[84]196                        sys.exit(1)
197
198                debug_msg( 6, 'doDatabase(): result: %s' %(result) )
199                return result
200
[191]201        def getJobNodeId( self, job_id, node_id ):
202
203                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
204                if len( id ) > 0:
205
206                        if len( id[0] ) > 0 and id[0] != '':
207                       
208                                return 1
209
210                return 0
211
[84]212        def getNodeId( self, hostname ):
213
[89]214                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
[84]215
[89]216                if len( id ) > 0:
[84]217
[89]218                        id = id[0][0]
219
[84]220                        return id
221                else:
222                        return None
223
224        def getNodeIds( self, hostnames ):
225
226                ids = [ ]
227
228                for node in hostnames:
229
230                        id = self.getNodeId( node )
231
232                        if id:
233                                ids.append( id )
234
235                return ids
236
237        def getJobId( self, jobid ):
238
239                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
240
241                if id:
[89]242                        id = id[0][0]
[84]243
244                        return id
245                else:
246                        return None
247
248        def addJob( self, job_id, jobattrs ):
249
250                if not self.getJobId( job_id ):
251
252                        self.mutateJob( 'insert', job_id, jobattrs ) 
253                else:
254                        self.mutateJob( 'update', job_id, jobattrs )
255
256        def mutateJob( self, action, job_id, jobattrs ):
257
258                job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
259
260                insert_col_str = 'job_id'
261                insert_val_str = "'%s'" %job_id
262                update_str = None
263
264                debug_msg( 6, 'mutateJob(): %s %s' %(action,job_id))
265
[99]266                ids = [ ]
267
[84]268                for valname, value in jobattrs.items():
269
[96]270                        if valname in job_values and value != '':
[84]271
272                                column_name = 'job_' + valname
273
274                                if action == 'insert':
275
276                                        if not insert_col_str:
277                                                insert_col_str = column_name
278                                        else:
279                                                insert_col_str = insert_col_str + ',' + column_name
280
281                                        if not insert_val_str:
282                                                insert_val_str = value
283                                        else:
284                                                insert_val_str = insert_val_str + ",'%s'" %value
285
286                                elif action == 'update':
287                                       
288                                        if not update_str:
289                                                update_str = "%s='%s'" %(column_name, value)
290                                        else:
291                                                update_str = update_str + ",%s='%s'" %(column_name, value)
292
[90]293                        elif valname == 'nodes' and value:
[84]294
[191]295                                node_valid = 1
[190]296
297                                if len(value) == 1:
298                               
[191]299                                        if jobattrs['status'] == 'Q':
[190]300
[191]301                                                node_valid = 0
[190]302
[191]303                                        else:
[190]304
[191]305                                                node_valid = 0
[190]306
[191]307                                                for node_char in str(value[0]):
[190]308
[191]309                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
[190]310
[191]311                                                                node_valid = 1
[84]312
[191]313                                if node_valid:
314
315                                        ids = self.addNodes( value, jobattrs['domain'] )
316
[84]317                if action == 'insert':
318
319                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
[86]320
[84]321                elif action == 'update':
322
[89]323                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
[84]324
[191]325                if len( ids ) > 0:
326                        self.addJobNodes( job_id, ids )
[190]327
[154]328        def addNodes( self, hostnames, domain ):
[84]329
[98]330                ids = [ ]
331
[84]332                for node in hostnames:
333
[154]334                        node = '%s.%s' %( node, domain )
[84]335                        id = self.getNodeId( node )
336       
337                        if not id:
338                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
[98]339                                id = self.getNodeId( node )
[84]340
[98]341                        ids.append( id )
342
343                return ids
344
[86]345        def addJobNodes( self, jobid, nodes ):
346
347                for node in nodes:
348
[191]349                        if not self.getJobNodeId( jobid, node ):
350
351                                self.addJobNode( jobid, node )
352
[84]353        def addJobNode( self, jobid, nodeid ):
354
355                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
356
357        def storeJobInfo( self, jobid, jobattrs ):
358
359                self.addJob( jobid, jobattrs )
360
[37]361class RRDMutator:
[63]362        """A class for performing RRD mutations"""
[37]363
[224]364        binary = RRDTOOL
[37]365
366        def __init__( self, binary=None ):
[63]367                """Set alternate binary if supplied"""
[37]368
369                if binary:
370                        self.binary = binary
371
372        def create( self, filename, args ):
[63]373                """Create a new rrd with args"""
374
[40]375                return self.perform( 'create', '"' + filename + '"', args )
[37]376
377        def update( self, filename, args ):
[63]378                """Update a rrd with args"""
379
[40]380                return self.perform( 'update', '"' + filename + '"', args )
[37]381
[42]382        def grabLastUpdate( self, filename ):
[63]383                """Determine the last update time of filename rrd"""
[42]384
385                last_update = 0
386
[53]387                debug_msg( 8, self.binary + ' info "' + filename + '"' )
388
[42]389                for line in os.popen( self.binary + ' info "' + filename + '"' ).readlines():
390
391                        if line.find( 'last_update') != -1:
392
393                                last_update = line.split( ' = ' )[1]
394
395                if last_update:
396                        return last_update
397                else:
398                        return 0
399
[40]400        def perform( self, action, filename, args ):
[63]401                """Perform action on rrd filename with args"""
[37]402
403                arg_string = None
404
[40]405                if type( args ) is not ListType:
406                        debug_msg( 8, 'Arguments needs to be of type List' )
407                        return 1
408
[37]409                for arg in args:
410
411                        if not arg_string:
412
413                                arg_string = arg
414                        else:
415                                arg_string = arg_string + ' ' + arg
416
[54]417                debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
[37]418
[146]419                cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
420                lines = cmd.readlines()
421                cmd.close()
[37]422
[146]423                for line in lines:
424
[37]425                        if line.find( 'ERROR' ) != -1:
426
427                                error_msg = string.join( line.split( ' ' )[1:] )
[40]428                                debug_msg( 8, error_msg )
[37]429                                return 1
430
431                return 0
432
[78]433class XMLProcessor:
434        """Skeleton class for XML processor's"""
435
436        def run( self ):
437                """Do main processing of XML here"""
438
439                pass
440
441class TorqueXMLProcessor( XMLProcessor ):
442        """Main class for processing XML and acting with it"""
443
444        def __init__( self ):
445                """Setup initial XML connection and handlers"""
446
447                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 
448                self.myXMLSource = self.myXMLGatherer.getFileObject()
449                self.myXMLHandler = TorqueXMLHandler()
450                self.myXMLError = XMLErrorHandler()
[87]451                self.config = GangliaConfigParser( GMETAD_CONF )
[78]452
453        def run( self ):
454                """Main XML processing"""
455
[169]456                debug_msg( 1, 'torque_xml_thread(): started.' )
[87]457
[78]458                while( 1 ):
459
460                        self.myXMLSource = self.myXMLGatherer.getFileObject()
[169]461                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
[176]462
463                        try:
464                                xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
465                        except socket.error, msg:
466                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
467                               
[169]468                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
469                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
[87]470                        time.sleep( self.config.getLowestInterval() )
[78]471
[71]472class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
[63]473        """Parse Torque's jobinfo XML from our plugin"""
474
[72]475        jobAttrs = { }
476
[84]477        def __init__( self ):
478
[214]479                self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
[98]480                self.jobs_processed = [ ]
481                self.jobs_to_store = [ ]
[84]482
[183]483        def startDocument( self ):
484
485                self.heartbeat = 0
486
[63]487        def startElement( self, name, attrs ):
488                """
489                This XML will be all gmetric XML
490                so there will be no specific start/end element
491                just one XML statement with all info
492                """
493               
[73]494                jobinfo = { }
[63]495
[199]496                if name == 'CLUSTER':
[63]497
[199]498                        self.clustername = attrs.get( 'NAME', "" )
499
500                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
501
[70]502                        metricname = attrs.get( 'NAME', "" )
[63]503
504                        if metricname == 'TOGA-HEARTBEAT':
[70]505                                self.heartbeat = attrs.get( 'VAL', "" )
[63]506
[70]507                        elif metricname.find( 'TOGA-JOB' ) != -1:
[63]508
[72]509                                job_id = metricname.split( 'TOGA-JOB-' )[1]
[63]510                                val = attrs.get( 'VAL', "" )
511
[96]512                                if not job_id in self.jobs_processed:
513                                        self.jobs_processed.append( job_id )
514
[73]515                                check_change = 0
516
517                                if self.jobAttrs.has_key( job_id ):
518                                        check_change = 1
519
[63]520                                valinfo = val.split( ' ' )
521
522                                for myval in valinfo:
523
[84]524                                        if len( myval.split( '=' ) ) > 1:
[63]525
[84]526                                                valname = myval.split( '=' )[0]
527                                                value = myval.split( '=' )[1]
[70]528
[84]529                                                if valname == 'nodes':
530                                                        value = value.split( ';' )
[72]531
[84]532                                                jobinfo[ valname ] = value
533
[73]534                                if check_change:
[182]535                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
[100]536                                                self.jobAttrs[ job_id ]['stop_timestamp'] = ''
[82]537                                                self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
[84]538                                                if not job_id in self.jobs_to_store:
539                                                        self.jobs_to_store.append( job_id )
540
[102]541                                                debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
[73]542                                else:
543                                        self.jobAttrs[ job_id ] = jobinfo
[84]544
545                                        if not job_id in self.jobs_to_store:
546                                                self.jobs_to_store.append( job_id )
547
[102]548                                        debug_msg( 6, 'jobinfo for job %s has changed' %job_id )
[73]549                                       
[77]550        def endDocument( self ):
[74]551                """When all metrics have gone, check if any jobs have finished"""
[72]552
[182]553                if self.heartbeat:
554                        for jobid, jobinfo in self.jobAttrs.items():
[74]555
[182]556                                # This is an old job, not in current jobinfo list anymore
557                                # it must have finished, since we _did_ get a new heartbeat
558                                #
559                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
[102]560
[184]561                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
[74]562
[182]563                                        if not jobid in self.jobs_processed:
564                                                self.jobs_processed.append( jobid )
[96]565
[182]566                                        self.jobAttrs[ jobid ]['status'] = 'F'
567                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( mytime )
[96]568
[182]569                                        if not jobid in self.jobs_to_store:
570                                                self.jobs_to_store.append( jobid )
[74]571
[182]572                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
[87]573
[182]574                        for jobid in self.jobs_to_store:
575                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'Q', 'F' ]:
[84]576
[184]577                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
578
579                                        if self.jobAttrs[ jobid ]['status'] == 'F':
580                                                del self.jobAttrs[ jobid ]
581
[182]582                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
[87]583
[182]584                        self.jobs_processed = [ ]
585                        self.jobs_to_store = [ ]
[84]586
[82]587        def setJobAttrs( self, old, new ):
588                """
589                Set new job attributes in old, but not lose existing fields
590                if old attributes doesn't have those
591                """
592
593                for valname, value in new.items():
594                        old[ valname ] = value
595
596                return old
597               
598
[73]599        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
[74]600                """
601                Check if jobinfo has changed from jobattrs[jobid]
602                if it's report time is bigger than previous one
603                and it is report time is recent (equal to heartbeat)
604                """
[72]605
[87]606                ignore_changes = [ 'reported' ]
607
[73]608                if jobattrs.has_key( jobid ):
609
610                        for valname, value in jobinfo.items():
611
[87]612                                if valname not in ignore_changes:
[73]613
[87]614                                        if jobattrs[ jobid ].has_key( valname ):
[73]615
[87]616                                                if value != jobattrs[ jobid ][ valname ]:
[73]617
[87]618                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
619                                                                return 1
[73]620
[87]621                                        else:
622                                                return 1
623
[73]624                return 0
625
[71]626class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
[63]627        """Parse Ganglia's XML"""
[3]628
[33]629        def __init__( self, config ):
[63]630                """Setup initial variables and gather info on existing rrd archive"""
631
[33]632                self.config = config
[35]633                self.clusters = { }
[169]634                debug_msg( 1, 'Checking existing toga rrd archive..' )
[44]635                self.gatherClusters()
[169]636                debug_msg( 1, 'Check done.' )
[33]637
[44]638        def gatherClusters( self ):
[63]639                """Find all existing clusters in archive dir"""
[44]640
[45]641                archive_dir = check_dir(ARCHIVE_PATH)
[44]642
643                hosts = [ ]
644
645                if os.path.exists( archive_dir ):
646
647                        dirlist = os.listdir( archive_dir )
648
649                        for item in dirlist:
650
651                                clustername = item
652
[60]653                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
[44]654
655                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
656
[6]657        def startElement( self, name, attrs ):
[63]658                """Memorize appropriate data from xml start tags"""
[3]659
[7]660                if name == 'GANGLIA_XML':
[32]661
662                        self.XMLSource = attrs.get( 'SOURCE', "" )
663                        self.gangliaVersion = attrs.get( 'VERSION', "" )
664
[12]665                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
[6]666
[7]667                elif name == 'GRID':
[32]668
669                        self.gridName = attrs.get( 'NAME', "" )
670                        self.time = attrs.get( 'LOCALTIME', "" )
671
[12]672                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
[6]673
[7]674                elif name == 'CLUSTER':
[32]675
676                        self.clusterName = attrs.get( 'NAME', "" )
677                        self.time = attrs.get( 'LOCALTIME', "" )
678
[60]679                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
[32]680
[34]681                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
[33]682
[35]683                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
[6]684
[60]685                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
[32]686
687                        self.hostName = attrs.get( 'NAME', "" )
688                        self.hostIp = attrs.get( 'IP', "" )
689                        self.hostReported = attrs.get( 'REPORTED', "" )
690
[12]691                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
[6]692
[60]693                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
[6]694
[32]695                        type = attrs.get( 'TYPE', "" )
[198]696                       
697                        exclude_metric = False
698                       
699                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
[6]700
[198]701                                orig_name = attrs.get( 'NAME', "" )     
[3]702
[198]703                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
704                               
705                                        exclude_metric = True
706
707                                elif re.match( ex_metricstr, orig_name ):
708
709                                        exclude_metric = True
710
711                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
712
[32]713                                myMetric = { }
714                                myMetric['name'] = attrs.get( 'NAME', "" )
715                                myMetric['val'] = attrs.get( 'VAL', "" )
716                                myMetric['time'] = self.hostReported
[3]717
[34]718                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
[3]719
[34]720                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
[6]721
[34]722        def storeMetrics( self ):
[63]723                """Store metrics of each cluster rrd handler"""
[9]724
[34]725                for clustername, rrdh in self.clusters.items():
[16]726
[38]727                        ret = rrdh.storeMetrics()
[9]728
[38]729                        if ret:
730                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
731                                return 1
732
733                return 0
734
[71]735class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
736
737        def error( self, exception ):
738                """Recoverable error"""
739
[169]740                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
[71]741
742        def fatalError( self, exception ):
743                """Non-recoverable error"""
744
745                exception_str = str( exception )
746
747                # Ignore 'no element found' errors
748                if exception_str.find( 'no element found' ) != -1:
[169]749                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
[71]750                        return 0
751
[170]752                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
[71]753                sys.exit( 1 )
754
755        def warning( self, exception ):
756                """Warning"""
757
758                debug_msg( 0, 'Warning ' + str( exception ) )
759
[78]760class XMLGatherer:
[63]761        """Setup a connection and file object to Ganglia's XML"""
[3]762
[8]763        s = None
[70]764        fd = None
[8]765
766        def __init__( self, host, port ):
[63]767                """Store host and port for connection"""
[8]768
[5]769                self.host = host
770                self.port = port
[33]771                self.connect()
[70]772                self.makeFileDescriptor()
[3]773
[33]774        def connect( self ):
[63]775                """Setup connection to XML source"""
[8]776
777                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
[32]778
[5]779                        af, socktype, proto, canonname, sa = res
[32]780
[5]781                        try:
[32]782
[8]783                                self.s = socket.socket( af, socktype, proto )
[32]784
[5]785                        except socket.error, msg:
[32]786
[8]787                                self.s = None
[5]788                                continue
[32]789
[5]790                        try:
[32]791
[8]792                                self.s.connect( sa )
[32]793
[5]794                        except socket.error, msg:
[32]795
[70]796                                self.disconnect()
[5]797                                continue
[32]798
[5]799                        break
[3]800
[8]801                if self.s is None:
[32]802
[170]803                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
[33]804                        sys.exit( 1 )
[5]805
[33]806        def disconnect( self ):
[63]807                """Close socket"""
[33]808
809                if self.s:
[71]810                        self.s.shutdown( 2 )
[33]811                        self.s.close()
812                        self.s = None
813
814        def __del__( self ):
[63]815                """Kill the socket before we leave"""
[33]816
817                self.disconnect()
818
[70]819        def reconnect( self ):
820                """Reconnect"""
[33]821
[38]822                if self.s:
823                        self.disconnect()
[33]824
[70]825                self.connect()
[5]826
[70]827        def makeFileDescriptor( self ):
828                """Make file descriptor that points to our socket connection"""
829
830                self.reconnect()
831
832                if self.s:
833                        self.fd = self.s.makefile( 'r' )
834
835        def getFileObject( self ):
836                """Connect, and return a file object"""
837
[78]838                self.makeFileDescriptor()
839
[70]840                if self.fd:
841                        return self.fd
842
[78]843class GangliaXMLProcessor( XMLProcessor ):
[63]844        """Main class for processing XML and acting with it"""
[5]845
[33]846        def __init__( self ):
[63]847                """Setup initial XML connection and handlers"""
[33]848
849                self.config = GangliaConfigParser( GMETAD_CONF )
850
[78]851                self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 
[70]852                self.myXMLSource = self.myXMLGatherer.getFileObject()
[78]853                self.myXMLHandler = GangliaXMLHandler( self.config )
854                self.myXMLError = XMLErrorHandler()
[73]855
[9]856        def run( self ):
[63]857                """Main XML processing; start a xml and storethread"""
[8]858
[102]859                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
860                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
[22]861
[36]862                while( 1 ):
863
[102]864                        if not xml_thread.isAlive():
[36]865                                # Gather XML at the same interval as gmetad
866
867                                # threaded call to: self.processXML()
868                                #
[169]869                                try:
870                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
871                                        xml_thread.start()
[176]872                                except thread.error, msg:
[169]873                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
874                                        #return 1
[36]875
[102]876                        if not store_thread.isAlive():
[55]877                                # Store metrics every .. sec
[36]878
[55]879                                # threaded call to: self.storeMetrics()
880                                #
[169]881                                try:
882                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
883                                        store_thread.start()
[176]884                                except thread.error, msg:
[169]885                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
886                                        #return 1
[36]887               
888                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
889                        time.sleep( 1 ) 
890
[33]891        def storeMetrics( self ):
[63]892                """Store metrics retained in memory to disk"""
[22]893
[63]894                # Store metrics somewhere between every 360 and 640 seconds
[38]895                #
[55]896                STORE_INTERVAL = random.randint( 360, 640 )
[22]897
[169]898                try:
899                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
900                        store_metric_thread.start()
[176]901                except thread.error, msg:
[169]902                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
903                        return 1
[36]904
[169]905                debug_msg( 1, 'ganglia_store_thread(): started.' )
906
907                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
[36]908                time.sleep( STORE_INTERVAL )
[169]909                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
[36]910
[102]911                if store_metric_thread.isAlive():
[36]912
[169]913                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
[136]914                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
[169]915                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
[36]916
[169]917                debug_msg( 1, 'ganglia_store_thread(): finished.' )
[36]918
919                return 0
920
[39]921        def storeThread( self ):
[63]922                """Actual metric storing thread"""
[39]923
[169]924                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
925                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
[78]926                ret = self.myXMLHandler.storeMetrics()
[176]927                if ret > 0:
928                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
[169]929                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
930                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
[39]931               
[176]932                return 0
[39]933
[8]934        def processXML( self ):
[63]935                """Process XML"""
[8]936
[169]937                try:
938                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
939                        parsethread.start()
[176]940                except thread.error, msg:
[169]941                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
942                        return 1
[8]943
[169]944                debug_msg( 1, 'ganglia_xml_thread(): started.' )
[36]945
[169]946                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
[36]947                time.sleep( float( self.config.getLowestInterval() ) ) 
[169]948                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
[36]949
950                if parsethread.isAlive():
951
[169]952                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
[47]953                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
[169]954                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
[36]955
[169]956                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
[36]957
958                return 0
959
[39]960        def parseThread( self ):
[63]961                """Actual parsing thread"""
[39]962
[169]963                debug_msg( 1, 'ganglia_parse_thread(): started.' )
964                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
[78]965                self.myXMLSource = self.myXMLGatherer.getFileObject()
[176]966
967                try:
968                        xml.sax.parse( self.myXMLSource, self.myXMLHandler, self.myXMLError )
969                except socket.error, msg:
970                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
971
[169]972                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
973                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
[39]974
[176]975                return 0
[39]976
[9]977class GangliaConfigParser:
978
[34]979        sources = [ ]
[9]980
981        def __init__( self, config ):
[63]982                """Parse some stuff from our gmetad's config, such as polling interval"""
[32]983
[9]984                self.config = config
985                self.parseValues()
986
[32]987        def parseValues( self ):
[63]988                """Parse certain values from gmetad.conf"""
[9]989
990                readcfg = open( self.config, 'r' )
991
992                for line in readcfg.readlines():
993
994                        if line.count( '"' ) > 1:
995
[10]996                                if line.find( 'data_source' ) != -1 and line[0] != '#':
[9]997
[11]998                                        source = { }
999                                        source['name'] = line.split( '"' )[1]
[9]1000                                        source_words = line.split( '"' )[2].split( ' ' )
1001
1002                                        for word in source_words:
1003
1004                                                valid_interval = 1
1005
1006                                                for letter in word:
[32]1007
[9]1008                                                        if letter not in string.digits:
[32]1009
[9]1010                                                                valid_interval = 0
1011
[10]1012                                                if valid_interval and len(word) > 0:
[32]1013
[9]1014                                                        source['interval'] = word
[12]1015                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
[33]1016       
1017                                        # No interval found, use Ganglia's default     
1018                                        if not source.has_key( 'interval' ):
1019                                                source['interval'] = 15
1020                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
[32]1021
[33]1022                                        self.sources.append( source )
[9]1023
1024        def getInterval( self, source_name ):
[63]1025                """Return interval for source_name"""
[32]1026
[9]1027                for source in self.sources:
[32]1028
[12]1029                        if source['name'] == source_name:
[32]1030
[9]1031                                return source['interval']
[32]1032
[9]1033                return None
1034
[34]1035        def getLowestInterval( self ):
[63]1036                """Return the lowest interval of all clusters"""
[34]1037
1038                lowest_interval = 0
1039
1040                for source in self.sources:
1041
1042                        if not lowest_interval or source['interval'] <= lowest_interval:
1043
1044                                lowest_interval = source['interval']
1045
1046                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1047                if lowest_interval:
1048                        return lowest_interval
1049                else:
1050                        return 15
1051
[9]1052class RRDHandler:
[63]1053        """Class for handling RRD activity"""
[9]1054
[32]1055        myMetrics = { }
[40]1056        lastStored = { }
[47]1057        timeserials = { }
[36]1058        slot = None
[32]1059
[33]1060        def __init__( self, config, cluster ):
[63]1061                """Setup initial variables"""
[78]1062
[44]1063                self.block = 0
[9]1064                self.cluster = cluster
[33]1065                self.config = config
[40]1066                self.slot = threading.Lock()
[37]1067                self.rrdm = RRDMutator()
[42]1068                self.gatherLastUpdates()
[9]1069
[42]1070        def gatherLastUpdates( self ):
[63]1071                """Populate the lastStored list, containing timestamps of all last updates"""
[42]1072
1073                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1074
1075                hosts = [ ]
1076
1077                if os.path.exists( cluster_dir ):
1078
[44]1079                        dirlist = os.listdir( cluster_dir )
[42]1080
[44]1081                        for dir in dirlist:
[42]1082
[44]1083                                hosts.append( dir )
[42]1084
1085                for host in hosts:
1086
[47]1087                        host_dir = cluster_dir + '/' + host
1088                        dirlist = os.listdir( host_dir )
1089
1090                        for dir in dirlist:
1091
1092                                if not self.timeserials.has_key( host ):
1093
1094                                        self.timeserials[ host ] = [ ]
1095
1096                                self.timeserials[ host ].append( dir )
1097
[42]1098                        last_serial = self.getLastRrdTimeSerial( host )
1099                        if last_serial:
1100
1101                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1102                                if os.path.exists( metric_dir ):
1103
[44]1104                                        dirlist = os.listdir( metric_dir )
[42]1105
[44]1106                                        for file in dirlist:
[42]1107
[44]1108                                                metricname = file.split( '.rrd' )[0]
[42]1109
[44]1110                                                if not self.lastStored.has_key( host ):
[42]1111
[44]1112                                                        self.lastStored[ host ] = { }
[42]1113
[44]1114                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
[42]1115
[32]1116        def getClusterName( self ):
[63]1117                """Return clustername"""
1118
[32]1119                return self.cluster
1120
1121        def memMetric( self, host, metric ):
[63]1122                """Store metric from host in memory"""
[32]1123
[179]1124                # <ATOMIC>
1125                #
1126                self.slot.acquire()
1127               
[34]1128                if self.myMetrics.has_key( host ):
[32]1129
[34]1130                        if self.myMetrics[ host ].has_key( metric['name'] ):
[32]1131
[34]1132                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
[32]1133
[34]1134                                        if mymetric['time'] == metric['time']:
[32]1135
[34]1136                                                # Allready have this metric, abort
[179]1137                                                self.slot.release()
[34]1138                                                return 1
1139                        else:
1140                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1141                else:
[32]1142                        self.myMetrics[ host ] = { }
[34]1143                        self.myMetrics[ host ][ metric['name'] ] = [ ]
[32]1144
[63]1145                # Push new metric onto stack
1146                # atomic code; only 1 thread at a time may access the stack
1147
[32]1148                self.myMetrics[ host ][ metric['name'] ].append( metric )
1149
[40]1150                self.slot.release()
[53]1151                #
1152                # </ATOMIC>
[40]1153
[47]1154        def makeUpdateList( self, host, metriclist ):
[63]1155                """
1156                Make a list of update values for rrdupdate
1157                but only those that we didn't store before
1158                """
[37]1159
1160                update_list = [ ]
[41]1161                metric = None
[37]1162
[47]1163                while len( metriclist ) > 0:
[37]1164
[53]1165                        metric = metriclist.pop( 0 )
[37]1166
[53]1167                        if self.checkStoreMetric( host, metric ):
1168                                update_list.append( '%s:%s' %( metric['time'], metric['val'] ) )
[40]1169
[37]1170                return update_list
1171
[49]1172        def checkStoreMetric( self, host, metric ):
[63]1173                """Check if supplied metric if newer than last one stored"""
[40]1174
1175                if self.lastStored.has_key( host ):
1176
[47]1177                        if self.lastStored[ host ].has_key( metric['name'] ):
[40]1178
[47]1179                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
[40]1180
[50]1181                                        # This is old
[40]1182                                        return 0
1183
[50]1184                return 1
1185
[54]1186        def memLastUpdate( self, host, metricname, metriclist ):
[63]1187                """
1188                Memorize the time of the latest metric from metriclist
1189                but only if it wasn't allready memorized
1190                """
[50]1191
[54]1192                if not self.lastStored.has_key( host ):
1193                        self.lastStored[ host ] = { }
1194
[50]1195                last_update_time = 0
1196
1197                for metric in metriclist:
1198
[54]1199                        if metric['name'] == metricname:
[50]1200
[54]1201                                if metric['time'] > last_update_time:
[50]1202
[54]1203                                        last_update_time = metric['time']
[40]1204
[54]1205                if self.lastStored[ host ].has_key( metricname ):
[52]1206                       
[54]1207                        if last_update_time <= self.lastStored[ host ][ metricname ]:
[52]1208                                return 1
[40]1209
[54]1210                self.lastStored[ host ][ metricname ] = last_update_time
[52]1211
[33]1212        def storeMetrics( self ):
[63]1213                """
1214                Store all metrics from memory to disk
1215                and do it to the RRD's in appropriate timeperiod directory
1216                """
[33]1217
1218                for hostname, mymetrics in self.myMetrics.items():     
1219
1220                        for metricname, mymetric in mymetrics.items():
1221
[53]1222                                metrics_to_store = [ ]
1223
[63]1224                                # Pop metrics from stack for storing until none is left
1225                                # atomic code: only 1 thread at a time may access myMetrics
1226
[53]1227                                # <ATOMIC>
[50]1228                                #
[47]1229                                self.slot.acquire() 
[33]1230
[54]1231                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1232
[54]1233                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
[53]1234
[176]1235                                                try:
1236                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1237                                                except IndexError, msg:
1238
[179]1239                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1240                                                        # is still len 0 when the statement is executed.
1241                                                        # Just ignore indexerror's..
[176]1242                                                        pass
1243
[53]1244                                self.slot.release()
1245                                #
1246                                # </ATOMIC>
1247
[47]1248                                # Create a mapping table, each metric to the period where it should be stored
1249                                #
[53]1250                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
[33]1251
[50]1252                                update_rets = [ ]
1253
[47]1254                                for period, pmetric in metric_serial_table.items():
1255
[146]1256                                        create_ret = self.createCheck( hostname, metricname, period )   
[47]1257
1258                                        update_ret = self.update( hostname, metricname, period, pmetric )
1259
1260                                        if update_ret == 0:
1261
1262                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1263                                        else:
1264                                                debug_msg( 9, 'metric update failed' )
1265
[146]1266                                        update_rets.append( create_ret )
[50]1267                                        update_rets.append( update_ret )
[47]1268
[179]1269                                # Lets ignore errors here for now, we need to make sure last update time
1270                                # is correct!
1271                                #
1272                                #if not (1) in update_rets:
[50]1273
[179]1274                                self.memLastUpdate( hostname, metricname, metrics_to_store )
[50]1275
[17]1276        def makeTimeSerial( self ):
[63]1277                """Generate a time serial. Seconds since epoch"""
[17]1278
1279                # Seconds since epoch
1280                mytime = int( time.time() )
1281
1282                return mytime
1283
[50]1284        def makeRrdPath( self, host, metricname, timeserial ):
[63]1285                """Make a RRD location/path and filename"""
[17]1286
[50]1287                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1288                rrd_file = '%s/%s.rrd' %( rrd_dir, metricname )
[17]1289
1290                return rrd_dir, rrd_file
1291
[20]1292        def getLastRrdTimeSerial( self, host ):
[63]1293                """Find the last timeserial (directory) for this host"""
[17]1294
[19]1295                newest_timeserial = 0
1296
[47]1297                for dir in self.timeserials[ host ]:
[32]1298
[47]1299                        valid_dir = 1
[17]1300
[47]1301                        for letter in dir:
1302                                if letter not in string.digits:
1303                                        valid_dir = 0
[17]1304
[47]1305                        if valid_dir:
1306                                timeserial = dir
1307                                if timeserial > newest_timeserial:
1308                                        newest_timeserial = timeserial
[17]1309
1310                if newest_timeserial:
[18]1311                        return newest_timeserial
[17]1312                else:
1313                        return 0
1314
[47]1315        def determinePeriod( self, host, check_serial ):
[63]1316                """Determine to which period (directory) this time(serial) belongs"""
[47]1317
1318                period_serial = 0
1319
[56]1320                if self.timeserials.has_key( host ):
[47]1321
[56]1322                        for serial in self.timeserials[ host ]:
[47]1323
[56]1324                                if check_serial >= serial and period_serial < serial:
[47]1325
[56]1326                                        period_serial = serial
1327
[47]1328                return period_serial
1329
1330        def determineSerials( self, host, metricname, metriclist ):
1331                """
1332                Determine the correct serial and corresponding rrd to store
1333                for a list of metrics
1334                """
1335
1336                metric_serial_table = { }
1337
1338                for metric in metriclist:
1339
1340                        if metric['name'] == metricname:
1341
1342                                period = self.determinePeriod( host, metric['time'] )   
1343
1344                                archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1345
[49]1346                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
[47]1347
1348                                        # This one should get it's own new period
1349                                        period = metric['time']
[57]1350
1351                                        if not self.timeserials.has_key( host ):
1352                                                self.timeserials[ host ] = [ ]
1353
[50]1354                                        self.timeserials[ host ].append( period )
[47]1355
1356                                if not metric_serial_table.has_key( period ):
1357
[49]1358                                        metric_serial_table[ period ] = [ ]
[47]1359
1360                                metric_serial_table[ period ].append( metric )
1361
1362                return metric_serial_table
1363
[33]1364        def createCheck( self, host, metricname, timeserial ):
[63]1365                """Check if an rrd allready exists for this metric, create if not"""
[9]1366
[35]1367                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[47]1368               
[33]1369                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[17]1370
[9]1371                if not os.path.exists( rrd_dir ):
[58]1372
1373                        try:
1374                                os.makedirs( rrd_dir )
1375
[169]1376                        except os.OSError, msg:
[58]1377
1378                                if msg.find( 'File exists' ) != -1:
1379
1380                                        # Ignore exists errors
1381                                        pass
1382
1383                                else:
1384
1385                                        print msg
1386                                        return
1387
[14]1388                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
[9]1389
[14]1390                if not os.path.exists( rrd_file ):
[9]1391
[33]1392                        interval = self.config.getInterval( self.cluster )
[47]1393                        heartbeat = 8 * int( interval )
[9]1394
[37]1395                        params = [ ]
[12]1396
[37]1397                        params.append( '--step' )
1398                        params.append( str( interval ) )
[12]1399
[37]1400                        params.append( '--start' )
[47]1401                        params.append( str( int( timeserial ) - 1 ) )
[12]1402
[37]1403                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1404                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
[13]1405
[37]1406                        self.rrdm.create( str(rrd_file), params )
1407
[14]1408                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1409
[47]1410        def update( self, host, metricname, timeserial, metriclist ):
[63]1411                """
1412                Update rrd file for host with metricname
1413                in directory timeserial with metriclist
1414                """
[9]1415
[35]1416                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
[9]1417
[33]1418                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
[18]1419
[47]1420                update_list = self.makeUpdateList( host, metriclist )
[15]1421
[41]1422                if len( update_list ) > 0:
1423                        ret = self.rrdm.update( str(rrd_file), update_list )
[32]1424
[41]1425                        if ret:
1426                                return 1
[27]1427               
[41]1428                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
[15]1429
[36]1430                return 0
1431
[169]1432def daemon():
1433        """daemonized threading"""
[8]1434
[169]1435        # Fork the first child
1436        #
1437        pid = os.fork()
1438
1439        if pid > 0:
1440
1441                sys.exit(0)  # end parent
1442
1443        # creates a session and sets the process group ID
1444        #
1445        os.setsid()
1446
1447        # Fork the second child
1448        #
1449        pid = os.fork()
1450
1451        if pid > 0:
1452
1453                sys.exit(0)  # end parent
1454
1455        # Go to the root directory and set the umask
1456        #
1457        os.chdir('/')
1458        os.umask(0)
1459
1460        sys.stdin.close()
1461        sys.stdout.close()
1462        sys.stderr.close()
1463
1464        os.open('/dev/null', 0)
1465        os.dup(0)
1466        os.dup(0)
1467
1468        run()
1469
1470def run():
1471        """Threading start"""
1472
[102]1473        myTorqueProcessor = TorqueXMLProcessor()
1474        myGangliaProcessor = GangliaXMLProcessor()
[8]1475
[169]1476        try:
[102]1477                torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1478                ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
[22]1479
[169]1480                torque_xml_thread.start()
1481                ganglia_xml_thread.start()
1482               
[176]1483        except thread.error, msg:
[169]1484                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1485                syslog.closelog()
1486                sys.exit(1)
1487               
1488        debug_msg( 0, 'main threading started.' )
[78]1489
[169]1490def main():
1491        """Program startup"""
1492
[214]1493        if not processArgs( sys.argv[1:] ):
1494                sys.exit( 1 )
1495
[169]1496        if( DAEMONIZE and USE_SYSLOG ):
1497                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1498
1499        if DAEMONIZE:
1500                daemon()
1501        else:
1502                run()
1503
1504#
[81]1505# Global functions
[169]1506#
[81]1507
[9]1508def check_dir( directory ):
[63]1509        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
[9]1510
1511        if directory[-1] == '/':
1512                directory = directory[:-1]
1513
1514        return directory
1515
[12]1516def debug_msg( level, msg ):
[169]1517        """Only print msg if correct levels"""
[12]1518
[169]1519        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1520                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1521       
1522        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1523                syslog.syslog( msg )
[12]1524
[46]1525def printTime( ):
[63]1526        """Print current time in human readable format"""
[46]1527
1528        return time.strftime("%a %d %b %Y %H:%M:%S")
1529
[63]1530# Ooohh, someone started me! Let's go..
[9]1531if __name__ == '__main__':
1532        main()
Note: See TracBrowser for help on using the repository browser.