source: trunk/jobarchived/jobarchived.py @ 435

Last change on this file since 435 was 435, checked in by bastiaans, 15 years ago

jobarchived/jobarchived.py:

  • added pidfile support

pkg/rpm/init.d/jobarchived,
pkg/rpm/jobmonarch-jobarchived.spec,
Makefile:

  • updated
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 47.3 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobarchived.py 435 2007-07-11 14:57:42Z bastiaans $
22#
23
24DEFAULT_SEARCH_PATH     = '/usr/share/jobarchived'
25
26import sys
27
28if DEFAULT_SEARCH_PATH not in sys.path:
29
30        sys.path.append( DEFAULT_SEARCH_PATH )
31
32import getopt, syslog, ConfigParser
33
34def usage():
35
36        print
37        print 'usage: jobarchived [options]'
38        print 'options:'
39        print '      --config, -c      configuration file'
40        print '      --pidfile, -p     pid file'
41        print '      --help, -h        help'
42        print
43
44def processArgs( args ):
45
46        SHORT_L = 'p:hc:'
47        LONG_L  = [ 'help', 'config=', 'pidfile=' ]
48
49        config_filename = None
50
51        global PIDFILE
52
53        PIDFILE = None
54
55        try:
56
57                opts, args = getopt.getopt( args, SHORT_L, LONG_L )
58
59        except getopt.error, detail:
60
61                print detail
62                sys.exit(1)
63
64        for opt, value in opts:
65
66                if opt in [ '--config', '-c' ]:
67
68                        config_filename = value
69
70                if opt in [ '--pidfile', '-p' ]:
71
72                        PIDFILE         = value
73
74                if opt in [ '--help', '-h' ]:
75
76                        usage()
77                        sys.exit( 0 )
78
79        if not config_filename:
80
81                config_filename = '/etc/jobarchived.conf'
82
83        try:
84                return loadConfig( config_filename )
85
86        except ConfigParser.NoOptionError, detail:
87
88                print detail
89                sys.exit( 1 )
90
91def loadConfig( filename ):
92
93        def getlist( cfg_string ):
94
95                my_list = [ ]
96
97                for item_txt in cfg_string.split( ',' ):
98
99                        sep_char = None
100
101                        item_txt = item_txt.strip()
102
103                        for s_char in [ "'", '"' ]:
104
105                                if item_txt.find( s_char ) != -1:
106
107                                        if item_txt.count( s_char ) != 2:
108
109                                                print 'Missing quote: %s' %item_txt
110                                                sys.exit( 1 )
111
112                                        else:
113
114                                                sep_char = s_char
115                                                break
116
117                        if sep_char:
118
119                                item_txt = item_txt.split( sep_char )[1]
120
121                        my_list.append( item_txt )
122
123                return my_list
124
125        cfg = ConfigParser.ConfigParser()
126
127        cfg.read( filename )
128
129        global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE, ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS, JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL
130
131        ARCHIVE_PATH            = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )
132
133        ARCHIVE_HOURS_PER_RRD   = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )
134
135        DEBUG_LEVEL             = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
136
137        USE_SYSLOG              = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
138
139        SYSLOG_LEVEL            = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
140
141        MODRRDTOOL              = False
142
143        try:
144                import rrdtool
145
146                MODRRDTOOL              = True
147
148        except ImportError:
149
150                MODRRDTOOL              = False
151
152                debug_msg( 0, "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobmond significantly!" )
153
154        try:
155
156                SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
157
158        except AttributeError, detail:
159
160                print 'Unknown syslog facility'
161                sys.exit( 1 )
162
163        GMETAD_CONF             = cfg.get( 'DEFAULT', 'GMETAD_CONF' )
164
165        ARCHIVE_XMLSOURCE       = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )
166
167        ARCHIVE_DATASOURCES     = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) )
168
169        ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )
170
171        JOB_SQL_DBASE           = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )
172
173        JOB_TIMEOUT             = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )
174
175        DAEMONIZE               = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
176
177        RRDTOOL                 = cfg.get( 'DEFAULT', 'RRDTOOL' )
178
179        return True
180
181# What XML data types not to store
182#
183UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ]
184
185# Maximum time (in seconds) a parsethread may run
186#
187PARSE_TIMEOUT = 60
188
189# Maximum time (in seconds) a storethread may run
190#
191STORE_TIMEOUT = 360
192
193"""
194The Job Archiving Daemon
195"""
196
197from types import *
198
199import xml.sax, xml.sax.handler, socket, string, os, os.path, time, thread, threading, random, re
200import rrdtool
201from pyPgSQL import PgSQL
202
203# Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed
204# and added some more functions for postgres.
205#       
206#
207# Changed by: Bas van der Vlies <basv@sara.nl>
208#       
209# SARA API for Postgres Database
210#
211# Changed by: Ramon Bastiaans for Job Monarch
212#
213
214class InitVars:
215        Vars = {}
216       
217        def __init__(self, **key_arg):
218                for (key, value) in key_arg.items():
219                        if value:
220                                self.Vars[key] = value
221                        else:   
222                                self.Vars[key] = None
223                               
224        def __call__(self, *key):
225                key = "%s" % key
226                return self.Vars[key]
227               
228        def __getitem__(self, key):
229                return self.Vars[key]
230               
231        def __repr__(self):
232                return repr(self.Vars)
233               
234        def keys(self):
235                barf =  map(None, self.Vars.keys())
236                return barf
237               
238        def values(self):
239                barf =  map(None, self.Vars.values())
240                return barf
241               
242        def has_key(self, key):
243                if self.Vars.has_key(key):
244                        return 1
245                else:   
246                        return 0
247                       
248class DBError(Exception):
249        def __init__(self, msg=''):
250                self.msg = msg
251                Exception.__init__(self, msg)
252        def __repr__(self):
253                return self.msg
254        __str__ = __repr__
255
256#
257# Class to connect to a database
258# and return the queury in a list or dictionairy.
259#
260class DB:
261        def __init__(self, db_vars):
262
263                self.dict = db_vars
264
265                if self.dict.has_key('User'):
266                        self.user = self.dict['User']
267                else:
268                        self.user = 'postgres'
269
270                if self.dict.has_key('Host'):
271                        self.host = self.dict['Host']
272                else:
273                        self.host = 'localhost'
274
275                if self.dict.has_key('Password'):
276                        self.passwd = self.dict['Password']
277                else:
278                        self.passwd = ''
279
280                if self.dict.has_key('DataBaseName'):
281                        self.db = self.dict['DataBaseName']
282                else:
283                        self.db = 'uva_cluster_db'
284
285                # connect_string = 'host:port:database:user:password:
286                dsn = "%s::%s:%s:%s" %(self.host, self.db, self.user, self.passwd)
287
288                try:
289                        self.SQL = PgSQL.connect(dsn)
290                except PgSQL.Error, details:
291                        str = "%s" %details
292                        raise DBError(str)
293
294        def __repr__(self):
295                return repr(self.result)
296
297        def __nonzero__(self):
298                return not(self.result == None)
299
300        def __len__(self):
301                return len(self.result)
302
303        def __getitem__(self,i):
304                return self.result[i]
305
306        def __getslice__(self,i,j):
307                return self.result[i:j]
308
309        def Get(self, q_str):
310                c = self.SQL.cursor()
311                try:
312                        c.execute(q_str)
313                        result = c.fetchall()
314                except PgSQL.Error, details:
315                        c.close()
316                        str = "%s" %details
317                        raise DBError(str)
318
319                c.close()
320                return result
321
322        def Set(self, q_str):
323                c = self.SQL.cursor()
324                try:
325                        c.execute(q_str)
326                        result = c.oidValue
327
328                except PgSQL.Error, details:
329                        c.close()
330                        str = "%s" %details
331                        raise DBError(str)
332
333                c.close()
334                return result
335
336        def Commit(self):
337                self.SQL.commit()
338
339class DataSQLStore:
340
341        db_vars = None
342        dbc = None
343
344        def __init__( self, hostname, database ):
345
346                self.db_vars = InitVars(DataBaseName=database,
347                                User='root',
348                                Host=hostname,
349                                Password='',
350                                Dictionary='true')
351
352                try:
353                        self.dbc     = DB(self.db_vars)
354                except DBError, details:
355                        debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) )
356                        sys.exit(1)
357
358        def setDatabase(self, statement):
359                ret = self.doDatabase('set', statement)
360                return ret
361               
362        def getDatabase(self, statement):
363                ret = self.doDatabase('get', statement)
364                return ret
365
366        def doDatabase(self, type, statement):
367
368                debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) )
369                try:
370                        if type == 'set':
371                                result = self.dbc.Set( statement )
372                                self.dbc.Commit()
373                        elif type == 'get':
374                                result = self.dbc.Get( statement )
375                               
376                except DBError, detail:
377                        operation = statement.split(' ')[0]
378                        debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) )
379                        sys.exit(1)
380
381                debug_msg( 10, 'doDatabase(): result: %s' %(result) )
382                return result
383
384        def getJobNodeId( self, job_id, node_id ):
385
386                id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) )
387                if len( id ) > 0:
388
389                        if len( id[0] ) > 0 and id[0] != '':
390                       
391                                return 1
392
393                return 0
394
395        def getNodeId( self, hostname ):
396
397                id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname )
398
399                if len( id ) > 0:
400
401                        id = id[0][0]
402
403                        return id
404                else:
405                        return None
406
407        def getNodeIds( self, hostnames ):
408
409                ids = [ ]
410
411                for node in hostnames:
412
413                        id = self.getNodeId( node )
414
415                        if id:
416                                ids.append( id )
417
418                return ids
419
420        def getJobId( self, jobid ):
421
422                id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid )
423
424                if id:
425                        id = id[0][0]
426
427                        return id
428                else:
429                        return None
430
431        def addJob( self, job_id, jobattrs ):
432
433                if not self.getJobId( job_id ):
434
435                        self.mutateJob( 'insert', job_id, jobattrs ) 
436                else:
437                        self.mutateJob( 'update', job_id, jobattrs )
438
439        def mutateJob( self, action, job_id, jobattrs ):
440
441                job_values      = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]
442
443                insert_col_str  = 'job_id'
444                insert_val_str  = "'%s'" %job_id
445                update_str      = None
446
447                debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id))
448
449                ids = [ ]
450
451                for valname, value in jobattrs.items():
452
453                        if valname in job_values and value != '':
454
455                                column_name = 'job_' + valname
456
457                                if action == 'insert':
458
459                                        if not insert_col_str:
460                                                insert_col_str = column_name
461                                        else:
462                                                insert_col_str = insert_col_str + ',' + column_name
463
464                                        if not insert_val_str:
465                                                insert_val_str = value
466                                        else:
467                                                insert_val_str = insert_val_str + ",'%s'" %value
468
469                                elif action == 'update':
470                                       
471                                        if not update_str:
472                                                update_str = "%s='%s'" %(column_name, value)
473                                        else:
474                                                update_str = update_str + ",%s='%s'" %(column_name, value)
475
476                        elif valname == 'nodes' and value:
477
478                                node_valid = 1
479
480                                if len(value) == 1:
481                               
482                                        if jobattrs['status'] == 'Q':
483
484                                                node_valid = 0
485
486                                        else:
487
488                                                node_valid = 0
489
490                                                for node_char in str(value[0]):
491
492                                                        if string.find( string.digits, node_char ) != -1 and not node_valid:
493
494                                                                node_valid = 1
495
496                                if node_valid:
497
498                                        ids = self.addNodes( value, jobattrs['domain'] )
499
500                if action == 'insert':
501
502                        self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) )
503
504                elif action == 'update':
505
506                        self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) )
507
508                if len( ids ) > 0:
509                        self.addJobNodes( job_id, ids )
510
511        def addNodes( self, hostnames, domain ):
512
513                ids = [ ]
514
515                for node in hostnames:
516
517                        node    = '%s.%s' %( node, domain )
518                        id      = self.getNodeId( node )
519       
520                        if not id:
521                                self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node )
522                                id = self.getNodeId( node )
523
524                        ids.append( id )
525
526                return ids
527
528        def addJobNodes( self, jobid, nodes ):
529
530                for node in nodes:
531
532                        if not self.getJobNodeId( jobid, node ):
533
534                                self.addJobNode( jobid, node )
535
536        def addJobNode( self, jobid, nodeid ):
537
538                self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) )
539
540        def storeJobInfo( self, jobid, jobattrs ):
541
542                self.addJob( jobid, jobattrs )
543
544        def checkStaleJobs( self ):
545
546                q = "SELECT * from jobs WHERE job_status != 'F'"
547
548                r = self.getDatabase( q )
549
550                if len( r ) == 0:
551
552                        return None
553
554                cleanjobs       = [ ]
555                timeoutjobs     = [ ]
556
557                jobtimeout_sec  = JOB_TIMEOUT * (60 * 60)
558                cur_time        = time.time()
559
560                for row in r:
561
562                        job_id                  = row[0]
563                        job_requested_time      = row[4]
564                        job_status              = row[7]
565                        job_start_timestamp     = row[8]
566
567                        if job_status == 'Q' or not job_start_timestamp:
568
569                                cleanjobs.append( job_id )
570
571                        else:
572
573                                start_timestamp = int( job_start_timestamp )
574
575                                if ( cur_time - start_timestamp ) > jobtimeout_sec:
576
577                                        if job_requested_time:
578
579                                                rtime_epoch     = reqtime2epoch( job_requested_time )
580                                        else:
581                                                rtime_epoch     = None
582                                       
583                                        timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) )
584
585                debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' )
586
587                for j in cleanjobs:
588
589                        q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'"
590                        self.setDatabase( q )
591
592                debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' )
593
594                for j in timeoutjobs:
595
596                        ( i, s, r )             = j
597
598                        if r:
599                                new_end_timestamp       = int( s ) + r
600
601                        q = "UPDATE jobs SET job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'"
602                        self.setDatabase( q )
603
604class RRDMutator:
605        """A class for performing RRD mutations"""
606
607        binary = None
608
609
610        def __init__( self, binary=None ):
611                """Set alternate binary if supplied"""
612
613                if binary:
614                        self.binary = binary
615
616        def create( self, filename, args ):
617                """Create a new rrd with args"""
618
619                global MODRRDTOOL
620
621                if MODRRDTOOL:
622                        return self.perform( 'create', filename, args )
623                else:
624                        return self.perform( 'create', '"' + filename + '"', args )
625
626        def update( self, filename, args ):
627                """Update a rrd with args"""
628
629                global MODRRDTOOL
630
631                if MODRRDTOOL:
632                        return self.perform( 'update', filename, args )
633                else:
634                        return self.perform( 'update', '"' + filename + '"', args )
635
636        def grabLastUpdate( self, filename ):
637                """Determine the last update time of filename rrd"""
638
639                global MODRRDTOOL
640
641                last_update = 0
642
643                if MODRRDTOOL:
644
645                        debug_msg( 8, 'rrdtool.info( ' + filename + ' )' )
646
647                        rrd_header      = { }
648
649                        try:
650                                rrd_header      = rrdtool.info( filename )
651                        except rrdtool.error, msg:
652                                debug_msg( 8, str( msg ) )
653
654                        if rrd_header.has_key( 'last_update' ):
655                                return last_update
656                        else:
657                                return 0
658
659                else:
660                        debug_msg( 8, self.binary + ' info ' + filename )
661
662                        my_pipe         = os.popen( self.binary + ' info "' + filename + '"' )
663
664                        for line in my_pipe.readlines():
665
666                                if line.find( 'last_update') != -1:
667
668                                        last_update = line.split( ' = ' )[1]
669
670                        if my_pipe:
671
672                                my_pipe.close()
673
674                        if last_update:
675                                return last_update
676                        else:
677                                return 0
678
679
680        def perform( self, action, filename, args ):
681                """Perform action on rrd filename with args"""
682
683                global MODRRDTOOL
684
685                arg_string = None
686
687                if type( args ) is not ListType:
688                        debug_msg( 8, 'Arguments needs to be of type List' )
689                        return 1
690
691                for arg in args:
692
693                        if not arg_string:
694
695                                arg_string = arg
696                        else:
697                                arg_string = arg_string + ' ' + arg
698
699                if MODRRDTOOL:
700
701                        debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" )
702
703                        try:
704                                debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) )
705
706                                if action == 'create':
707
708                                        rrdtool.create( str( filename ), *args )
709
710                                elif action == 'update':
711
712                                        rrdtool.update( str( filename ), *args )
713
714                        except rrdtool.error, msg:
715
716                                error_msg = str( msg )
717                                debug_msg( 8, error_msg )
718                                return 1
719
720                else:
721
722                        debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string  )
723
724                        cmd     = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string )
725                        lines   = cmd.readlines()
726
727                        cmd.close()
728
729                        for line in lines:
730
731                                if line.find( 'ERROR' ) != -1:
732
733                                        error_msg = string.join( line.split( ' ' )[1:] )
734                                        debug_msg( 8, error_msg )
735                                        return 1
736
737                return 0
738
739class XMLProcessor:
740        """Skeleton class for XML processor's"""
741
742        def run( self ):
743                """Do main processing of XML here"""
744
745                pass
746
747class TorqueXMLProcessor( XMLProcessor ):
748        """Main class for processing XML and acting with it"""
749
750        def __init__( self, XMLSource, DataStore ):
751                """Setup initial XML connection and handlers"""
752
753                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
754                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
755                self.myXMLSource        = XMLSource
756                self.myXMLHandler       = TorqueXMLHandler( DataStore )
757                self.myXMLError         = XMLErrorHandler()
758
759                self.config             = GangliaConfigParser( GMETAD_CONF )
760
761        def run( self ):
762                """Main XML processing"""
763
764                debug_msg( 1, 'torque_xml_thread(): started.' )
765
766                while( 1 ):
767
768                        #self.myXMLSource = self.mXMLGatherer.getFileObject()
769                        debug_msg( 1, 'torque_xml_thread(): Parsing..' )
770
771                        my_data = self.myXMLSource.getData()
772
773                        try:
774                                xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
775                        except socket.error, msg:
776                                debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
777                               
778                        debug_msg( 1, 'torque_xml_thread(): Done parsing.' )
779                        debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) )
780                        time.sleep( self.config.getLowestInterval() )
781
782class TorqueXMLHandler( xml.sax.handler.ContentHandler ):
783        """Parse Torque's jobinfo XML from our plugin"""
784
785        jobAttrs = { }
786
787        def __init__( self, datastore ):
788
789                #self.ds                        = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
790                self.ds                 = datastore
791                self.jobs_processed     = [ ]
792                self.jobs_to_store      = [ ]
793
794        def startDocument( self ):
795
796                self.heartbeat  = 0
797                self.elementct  = 0
798
799        def startElement( self, name, attrs ):
800                """
801                This XML will be all gmetric XML
802                so there will be no specific start/end element
803                just one XML statement with all info
804                """
805               
806                jobinfo = { }
807
808                self.elementct  += 1
809
810                if name == 'CLUSTER':
811
812                        self.clustername = str( attrs.get( 'NAME', "" ) )
813
814                elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES:
815
816                        metricname = str( attrs.get( 'NAME', "" ) )
817
818                        if metricname == 'MONARCH-HEARTBEAT':
819                                self.heartbeat = str( attrs.get( 'VAL', "" ) )
820
821                        elif metricname.find( 'MONARCH-JOB' ) != -1:
822
823                                job_id  = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]
824                                val     = str( attrs.get( 'VAL', "" ) )
825
826                                if not job_id in self.jobs_processed:
827
828                                        self.jobs_processed.append( job_id )
829
830                                check_change = 0
831
832                                if self.jobAttrs.has_key( job_id ):
833
834                                        check_change = 1
835
836                                valinfo = val.split( ' ' )
837
838                                for myval in valinfo:
839
840                                        if len( myval.split( '=' ) ) > 1:
841
842                                                valname = myval.split( '=' )[0]
843                                                value   = myval.split( '=' )[1]
844
845                                                if valname == 'nodes':
846                                                        value = value.split( ';' )
847
848                                                jobinfo[ valname ] = value
849
850                                if check_change:
851                                        if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]:
852                                                self.jobAttrs[ job_id ]['stop_timestamp']       = ''
853                                                self.jobAttrs[ job_id ]                         = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )
854                                                if not job_id in self.jobs_to_store:
855                                                        self.jobs_to_store.append( job_id )
856
857                                                debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
858                                else:
859                                        self.jobAttrs[ job_id ] = jobinfo
860
861                                        if not job_id in self.jobs_to_store:
862                                                self.jobs_to_store.append( job_id )
863
864                                        debug_msg( 10, 'jobinfo for job %s has changed' %job_id )
865                                       
866        def endDocument( self ):
867                """When all metrics have gone, check if any jobs have finished"""
868
869                debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" )
870
871                if self.heartbeat:
872                        for jobid, jobinfo in self.jobAttrs.items():
873
874                                # This is an old job, not in current jobinfo list anymore
875                                # it must have finished, since we _did_ get a new heartbeat
876                                #
877                                mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] )
878
879                                if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'):
880
881                                        if not jobid in self.jobs_processed:
882                                                self.jobs_processed.append( jobid )
883
884                                        self.jobAttrs[ jobid ]['status'] = 'F'
885                                        self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat )
886
887                                        if not jobid in self.jobs_to_store:
888                                                self.jobs_to_store.append( jobid )
889
890                        debug_msg( 1, 'torque_xml_thread(): Storing..' )
891
892                        for jobid in self.jobs_to_store:
893                                if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]:
894
895                                        self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] )
896
897                                        if self.jobAttrs[ jobid ]['status'] == 'F':
898                                                del self.jobAttrs[ jobid ]
899
900                        debug_msg( 1, 'torque_xml_thread(): Done storing.' )
901
902                        self.jobs_processed     = [ ]
903                        self.jobs_to_store      = [ ]
904
905        def setJobAttrs( self, old, new ):
906                """
907                Set new job attributes in old, but not lose existing fields
908                if old attributes doesn't have those
909                """
910
911                for valname, value in new.items():
912                        old[ valname ] = value
913
914                return old
915               
916
917        def jobinfoChanged( self, jobattrs, jobid, jobinfo ):
918                """
919                Check if jobinfo has changed from jobattrs[jobid]
920                if it's report time is bigger than previous one
921                and it is report time is recent (equal to heartbeat)
922                """
923
924                ignore_changes = [ 'reported' ]
925
926                if jobattrs.has_key( jobid ):
927
928                        for valname, value in jobinfo.items():
929
930                                if valname not in ignore_changes:
931
932                                        if jobattrs[ jobid ].has_key( valname ):
933
934                                                if value != jobattrs[ jobid ][ valname ]:
935
936                                                        if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat:
937                                                                return True
938
939                                        else:
940                                                return True
941
942                return False
943
944class GangliaXMLHandler( xml.sax.handler.ContentHandler ):
945        """Parse Ganglia's XML"""
946
947        def __init__( self, config, datastore ):
948                """Setup initial variables and gather info on existing rrd archive"""
949
950                self.config     = config
951                self.clusters   = { }
952                self.ds         = datastore
953
954                debug_msg( 1, 'Checking database..' )
955
956                global DEBUG_LEVEL
957
958                if DEBUG_LEVEL <= 2:
959                        self.ds.checkStaleJobs()
960
961                debug_msg( 1, 'Check done.' )
962                debug_msg( 1, 'Checking rrd archive..' )
963                self.gatherClusters()
964                debug_msg( 1, 'Check done.' )
965
966        def gatherClusters( self ):
967                """Find all existing clusters in archive dir"""
968
969                archive_dir     = check_dir(ARCHIVE_PATH)
970
971                hosts           = [ ]
972
973                if os.path.exists( archive_dir ):
974
975                        dirlist = os.listdir( archive_dir )
976
977                        for cfgcluster in ARCHIVE_DATASOURCES:
978
979                                if cfgcluster not in dirlist:
980
981                                        # Autocreate a directory for this cluster
982                                        # assume it is new
983                                        #
984                                        cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster )
985
986                                        os.mkdir( cluster_dir )
987
988                                        dirlist.append( cfgcluster )
989
990                        for item in dirlist:
991
992                                clustername = item
993
994                                if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES:
995
996                                        self.clusters[ clustername ] = RRDHandler( self.config, clustername )
997
998                debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" )
999
1000        def startElement( self, name, attrs ):
1001                """Memorize appropriate data from xml start tags"""
1002
1003                if name == 'GANGLIA_XML':
1004
1005                        self.XMLSource          = str( attrs.get( 'SOURCE', "" ) )
1006                        self.gangliaVersion     = str( attrs.get( 'VERSION', "" ) )
1007
1008                        debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) )
1009
1010                elif name == 'GRID':
1011
1012                        self.gridName   = str( attrs.get( 'NAME', "" ) )
1013                        self.time       = str( attrs.get( 'LOCALTIME', "" ) )
1014
1015                        debug_msg( 10, '`-Grid found: %s' %( self.gridName ) )
1016
1017                elif name == 'CLUSTER':
1018
1019                        self.clusterName        = str( attrs.get( 'NAME', "" ) )
1020                        self.time               = str( attrs.get( 'LOCALTIME', "" ) )
1021
1022                        if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES:
1023
1024                                self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName )
1025
1026                                debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) )
1027
1028                elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES:     
1029
1030                        self.hostName           = str( attrs.get( 'NAME', "" ) )
1031                        self.hostIp             = str( attrs.get( 'IP', "" ) )
1032                        self.hostReported       = str( attrs.get( 'REPORTED', "" ) )
1033
1034                        debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) )
1035
1036                elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES:
1037
1038                        type = str( attrs.get( 'TYPE', "" ) )
1039                       
1040                        exclude_metric = False
1041                       
1042                        for ex_metricstr in ARCHIVE_EXCLUDE_METRICS:
1043
1044                                orig_name = str( attrs.get( 'NAME', "" ) )
1045
1046                                if string.lower( orig_name ) == string.lower( ex_metricstr ):
1047                               
1048                                        exclude_metric = True
1049
1050                                elif re.match( ex_metricstr, orig_name ):
1051
1052                                        exclude_metric = True
1053
1054                        if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric:
1055
1056                                myMetric                = { }
1057                                myMetric['name']        = str( attrs.get( 'NAME', "" ) )
1058                                myMetric['val']         = str( attrs.get( 'VAL', "" ) )
1059                                myMetric['time']        = self.hostReported
1060
1061                                self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric )
1062
1063                                debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) )
1064
1065        def storeMetrics( self ):
1066                """Store metrics of each cluster rrd handler"""
1067
1068                for clustername, rrdh in self.clusters.items():
1069
1070                        ret = rrdh.storeMetrics()
1071
1072                        if ret:
1073                                debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername )
1074                                return 1
1075
1076                return 0
1077
1078class XMLErrorHandler( xml.sax.handler.ErrorHandler ):
1079
1080        def error( self, exception ):
1081                """Recoverable error"""
1082
1083                debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' )
1084
1085        def fatalError( self, exception ):
1086                """Non-recoverable error"""
1087
1088                exception_str = str( exception )
1089
1090                # Ignore 'no element found' errors
1091                if exception_str.find( 'no element found' ) != -1:
1092                        debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' )
1093                        return 0
1094
1095                debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) )
1096                sys.exit( 1 )
1097
1098        def warning( self, exception ):
1099                """Warning"""
1100
1101                debug_msg( 0, 'Warning ' + str( exception ) )
1102
1103class XMLGatherer:
1104        """Setup a connection and file object to Ganglia's XML"""
1105
1106        s               = None
1107        fd              = None
1108        data            = None
1109        slot            = None
1110
1111        # Time since the last update
1112        #
1113        LAST_UPDATE     = 0
1114
1115        # Minimum interval between updates
1116        #
1117        MIN_UPDATE_INT  = 10
1118
1119        # Is a update occuring now
1120        #
1121        update_now      = False
1122
1123        def __init__( self, host, port ):
1124                """Store host and port for connection"""
1125
1126                self.host       = host
1127                self.port       = port
1128                self.slot       = threading.Lock()
1129
1130                self.retrieveData()
1131
1132        def retrieveData( self ):
1133                """Setup connection to XML source"""
1134
1135                self.update_now = True
1136
1137                self.slot.acquire()
1138
1139                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
1140
1141                        af, socktype, proto, canonname, sa = res
1142
1143                        try:
1144
1145                                self.s = socket.socket( af, socktype, proto )
1146
1147                        except socket.error, msg:
1148
1149                                self.s = None
1150                                continue
1151
1152                        try:
1153
1154                                self.s.connect( sa )
1155
1156                        except socket.error, msg:
1157
1158                                self.disconnect()
1159                                continue
1160
1161                        break
1162
1163                if self.s is None:
1164
1165                        debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' )
1166                        self.update_now = False
1167                        sys.exit( 1 )
1168
1169                else:
1170                        #self.s.send( '\n' )
1171
1172                        my_fp                   = self.s.makefile( 'r' )
1173                        my_data                 = my_fp.readlines()
1174                        my_data                 = string.join( my_data, '' )
1175
1176                        self.data               = my_data
1177
1178                        self.LAST_UPDATE        = time.time()
1179
1180                self.slot.release()
1181
1182                self.update_now = False
1183
1184        def disconnect( self ):
1185                """Close socket"""
1186
1187                if self.s:
1188                        #self.s.shutdown( 2 )
1189                        self.s.close()
1190                        self.s = None
1191
1192        def __del__( self ):
1193                """Kill the socket before we leave"""
1194
1195                self.disconnect()
1196
1197        def reGetData( self ):
1198                """Reconnect"""
1199
1200                while self.update_now:
1201
1202                        # Must be another update in progress:
1203                        # Wait until the update is complete
1204                        #
1205                        time.sleep( 1 )
1206
1207                if self.s:
1208                        self.disconnect()
1209
1210                self.retrieveData()
1211
1212        def getData( self ):
1213
1214                """Return the XML data"""
1215
1216                # If more than MIN_UPDATE_INT seconds passed since last data update
1217                # update the XML first before returning it
1218                #
1219
1220                cur_time        = time.time()
1221
1222                if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT:
1223
1224                        self.reGetData()
1225
1226                while self.update_now:
1227
1228                        # Must be another update in progress:
1229                        # Wait until the update is complete
1230                        #
1231                        time.sleep( 1 )
1232                       
1233                return self.data
1234
1235        def makeFileDescriptor( self ):
1236                """Make file descriptor that points to our socket connection"""
1237
1238                self.reconnect()
1239
1240                if self.s:
1241                        self.fd = self.s.makefile( 'r' )
1242
1243        def getFileObject( self ):
1244                """Connect, and return a file object"""
1245
1246                self.makeFileDescriptor()
1247
1248                if self.fd:
1249                        return self.fd
1250
1251class GangliaXMLProcessor( XMLProcessor ):
1252        """Main class for processing XML and acting with it"""
1253
1254        def __init__( self, XMLSource, DataStore ):
1255                """Setup initial XML connection and handlers"""
1256
1257                self.config             = GangliaConfigParser( GMETAD_CONF )
1258
1259                #self.myXMLGatherer     = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1260                #self.myXMLSource       = self.myXMLGatherer.getFileObject()
1261                self.myXMLSource        = XMLSource
1262                self.ds                 = DataStore
1263                self.myXMLHandler       = GangliaXMLHandler( self.config, self.ds )
1264                self.myXMLError         = XMLErrorHandler()
1265
1266        def run( self ):
1267                """Main XML processing; start a xml and storethread"""
1268
1269                xml_thread = threading.Thread( None, self.processXML, 'xmlthread' )
1270                store_thread = threading.Thread( None, self.storeMetrics, 'storethread' )
1271
1272                while( 1 ):
1273
1274                        if not xml_thread.isAlive():
1275                                # Gather XML at the same interval as gmetad
1276
1277                                # threaded call to: self.processXML()
1278                                #
1279                                try:
1280                                        xml_thread = threading.Thread( None, self.processXML, 'xml_thread' )
1281                                        xml_thread.start()
1282                                except thread.error, msg:
1283                                        debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg))
1284                                        #return 1
1285
1286                        if not store_thread.isAlive():
1287                                # Store metrics every .. sec
1288
1289                                # threaded call to: self.storeMetrics()
1290                                #
1291                                try:
1292                                        store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' )
1293                                        store_thread.start()
1294                                except thread.error, msg:
1295                                        debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg))
1296                                        #return 1
1297               
1298                        # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway
1299                        time.sleep( 1 ) 
1300
1301        def storeMetrics( self ):
1302                """Store metrics retained in memory to disk"""
1303
1304                global DEBUG_LEVEL
1305
1306                # Store metrics somewhere between every 360 and 640 seconds
1307                #
1308                if DEBUG_LEVEL > 2:
1309                        #STORE_INTERVAL = 60
1310                        STORE_INTERVAL = random.randint( 360, 640 )
1311                else:
1312                        STORE_INTERVAL = random.randint( 360, 640 )
1313
1314                try:
1315                        store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' )
1316                        store_metric_thread.start()
1317                except thread.error, msg:
1318                        debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) )
1319                        return 1
1320
1321                debug_msg( 1, 'ganglia_store_thread(): started.' )
1322
1323                debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL )
1324                time.sleep( STORE_INTERVAL )
1325                debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' )
1326
1327                if store_metric_thread.isAlive():
1328
1329                        debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' )
1330                        store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish
1331                        debug_msg( 1, 'ganglia_store_thread(): Done waiting.' )
1332
1333                debug_msg( 1, 'ganglia_store_thread(): finished.' )
1334
1335                return 0
1336
1337        def storeThread( self ):
1338                """Actual metric storing thread"""
1339
1340                debug_msg( 1, 'ganglia_store_metric_thread(): started.' )
1341                debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' )
1342                ret = self.myXMLHandler.storeMetrics()
1343                if ret > 0:
1344                        debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) )
1345                debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' )
1346                debug_msg( 1, 'ganglia_store_metric_thread(): finished.' )
1347               
1348                return 0
1349
1350        def processXML( self ):
1351                """Process XML"""
1352
1353                try:
1354                        parsethread = threading.Thread( None, self.parseThread, 'parsethread' )
1355                        parsethread.start()
1356                except thread.error, msg:
1357                        debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) )
1358                        return 1
1359
1360                debug_msg( 1, 'ganglia_xml_thread(): started.' )
1361
1362                debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() )
1363                time.sleep( float( self.config.getLowestInterval() ) ) 
1364                debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' )
1365
1366                if parsethread.isAlive():
1367
1368                        debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT )
1369                        parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish
1370                        debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' )
1371
1372                debug_msg( 1, 'ganglia_xml_thread(): finished.' )
1373
1374                return 0
1375
1376        def parseThread( self ):
1377                """Actual parsing thread"""
1378
1379                debug_msg( 1, 'ganglia_parse_thread(): started.' )
1380                debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' )
1381                #self.myXMLSource = self.myXMLGatherer.getFileObject()
1382               
1383                my_data = self.myXMLSource.getData()
1384
1385                #print my_data
1386
1387                try:
1388                        xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError )
1389                except socket.error, msg:
1390                        debug_msg( 0, 'ERROR: Socket error in connect to datasource!: %s' %msg )
1391
1392                debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' )
1393                debug_msg( 1, 'ganglia_parse_thread(): finished.' )
1394
1395                return 0
1396
1397class GangliaConfigParser:
1398
1399        sources = [ ]
1400
1401        def __init__( self, config ):
1402                """Parse some stuff from our gmetad's config, such as polling interval"""
1403
1404                self.config = config
1405                self.parseValues()
1406
1407        def parseValues( self ):
1408                """Parse certain values from gmetad.conf"""
1409
1410                readcfg = open( self.config, 'r' )
1411
1412                for line in readcfg.readlines():
1413
1414                        if line.count( '"' ) > 1:
1415
1416                                if line.find( 'data_source' ) != -1 and line[0] != '#':
1417
1418                                        source          = { }
1419                                        source['name']  = line.split( '"' )[1]
1420                                        source_words    = line.split( '"' )[2].split( ' ' )
1421
1422                                        for word in source_words:
1423
1424                                                valid_interval = 1
1425
1426                                                for letter in word:
1427
1428                                                        if letter not in string.digits:
1429
1430                                                                valid_interval = 0
1431
1432                                                if valid_interval and len(word) > 0:
1433
1434                                                        source['interval'] = word
1435                                                        debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) )
1436       
1437                                        # No interval found, use Ganglia's default     
1438                                        if not source.has_key( 'interval' ):
1439                                                source['interval'] = 15
1440                                                debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) )
1441
1442                                        self.sources.append( source )
1443
1444        def getInterval( self, source_name ):
1445                """Return interval for source_name"""
1446
1447                for source in self.sources:
1448
1449                        if source['name'] == source_name:
1450
1451                                return source['interval']
1452
1453                return None
1454
1455        def getLowestInterval( self ):
1456                """Return the lowest interval of all clusters"""
1457
1458                lowest_interval = 0
1459
1460                for source in self.sources:
1461
1462                        if not lowest_interval or source['interval'] <= lowest_interval:
1463
1464                                lowest_interval = source['interval']
1465
1466                # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays
1467                if lowest_interval:
1468                        return lowest_interval
1469                else:
1470                        return 15
1471
1472class RRDHandler:
1473        """Class for handling RRD activity"""
1474
1475        myMetrics = { }
1476        lastStored = { }
1477        timeserials = { }
1478        slot = None
1479
1480        def __init__( self, config, cluster ):
1481                """Setup initial variables"""
1482
1483                self.block      = 0
1484                self.cluster    = cluster
1485                self.config     = config
1486                self.slot       = threading.Lock()
1487                self.rrdm       = RRDMutator( RRDTOOL )
1488
1489                global DEBUG_LEVEL
1490
1491                if DEBUG_LEVEL <= 2:
1492                        self.gatherLastUpdates()
1493
1494        def gatherLastUpdates( self ):
1495                """Populate the lastStored list, containing timestamps of all last updates"""
1496
1497                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
1498
1499                hosts = [ ]
1500
1501                if os.path.exists( cluster_dir ):
1502
1503                        dirlist = os.listdir( cluster_dir )
1504
1505                        for dir in dirlist:
1506
1507                                hosts.append( dir )
1508
1509                for host in hosts:
1510
1511                        host_dir        = cluster_dir + '/' + host
1512                        dirlist         = os.listdir( host_dir )
1513
1514                        for dir in dirlist:
1515
1516                                if not self.timeserials.has_key( host ):
1517
1518                                        self.timeserials[ host ] = [ ]
1519
1520                                self.timeserials[ host ].append( dir )
1521
1522                        last_serial = self.getLastRrdTimeSerial( host )
1523
1524                        if last_serial:
1525
1526                                metric_dir = cluster_dir + '/' + host + '/' + last_serial
1527
1528                                if os.path.exists( metric_dir ):
1529
1530                                        dirlist = os.listdir( metric_dir )
1531
1532                                        for file in dirlist:
1533
1534                                                metricname = file.split( '.rrd' )[0]
1535
1536                                                if not self.lastStored.has_key( host ):
1537
1538                                                        self.lastStored[ host ] = { }
1539
1540                                                self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file )
1541
1542        def getClusterName( self ):
1543                """Return clustername"""
1544
1545                return self.cluster
1546
1547        def memMetric( self, host, metric ):
1548                """Store metric from host in memory"""
1549
1550                # <ATOMIC>
1551                #
1552                self.slot.acquire()
1553               
1554                if self.myMetrics.has_key( host ):
1555
1556                        if self.myMetrics[ host ].has_key( metric['name'] ):
1557
1558                                for mymetric in self.myMetrics[ host ][ metric['name'] ]:
1559
1560                                        if mymetric['time'] == metric['time']:
1561
1562                                                # Allready have this metric, abort
1563                                                self.slot.release()
1564                                                return 1
1565                        else:
1566                                self.myMetrics[ host ][ metric['name'] ] = [ ]
1567                else:
1568                        self.myMetrics[ host ]                          = { }
1569                        self.myMetrics[ host ][ metric['name'] ]        = [ ]
1570
1571                # Push new metric onto stack
1572                # atomic code; only 1 thread at a time may access the stack
1573
1574                self.myMetrics[ host ][ metric['name'] ].append( metric )
1575
1576                self.slot.release()
1577                #
1578                # </ATOMIC>
1579
1580        def makeUpdateList( self, host, metriclist ):
1581                """
1582                Make a list of update values for rrdupdate
1583                but only those that we didn't store before
1584                """
1585
1586                update_list     = [ ]
1587                metric          = None
1588
1589                while len( metriclist ) > 0:
1590
1591                        metric = metriclist.pop( 0 )
1592
1593                        if self.checkStoreMetric( host, metric ):
1594
1595                                u_val   = str( metric['time'] ) + ':' + str( metric['val'] )
1596                                #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) )
1597                                update_list.append( u_val )
1598
1599                return update_list
1600
1601        def checkStoreMetric( self, host, metric ):
1602                """Check if supplied metric if newer than last one stored"""
1603
1604                if self.lastStored.has_key( host ):
1605
1606                        if self.lastStored[ host ].has_key( metric['name'] ):
1607
1608                                if metric['time'] <= self.lastStored[ host ][ metric['name'] ]:
1609
1610                                        # This is old
1611                                        return 0
1612
1613                return 1
1614
1615        def memLastUpdate( self, host, metricname, metriclist ):
1616                """
1617                Memorize the time of the latest metric from metriclist
1618                but only if it wasn't allready memorized
1619                """
1620
1621                if not self.lastStored.has_key( host ):
1622                        self.lastStored[ host ] = { }
1623
1624                last_update_time = 0
1625
1626                for metric in metriclist:
1627
1628                        if metric['name'] == metricname:
1629
1630                                if metric['time'] > last_update_time:
1631
1632                                        last_update_time = metric['time']
1633
1634                if self.lastStored[ host ].has_key( metricname ):
1635                       
1636                        if last_update_time <= self.lastStored[ host ][ metricname ]:
1637                                return 1
1638
1639                self.lastStored[ host ][ metricname ] = last_update_time
1640
1641        def storeMetrics( self ):
1642                """
1643                Store all metrics from memory to disk
1644                and do it to the RRD's in appropriate timeperiod directory
1645                """
1646
1647                debug_msg( 5, "Entering storeMetrics()")
1648
1649                count_values    = 0
1650                count_metrics   = 0
1651                count_bits      = 0
1652
1653                for hostname, mymetrics in self.myMetrics.items():     
1654
1655                        for metricname, mymetric in mymetrics.items():
1656
1657                                count_metrics += 1
1658
1659                                for dmetric in mymetric:
1660
1661                                        count_values += 1
1662
1663                                        count_bits      += len( dmetric['time'] )
1664                                        count_bits      += len( dmetric['val'] )
1665
1666                count_bytes     = count_bits / 8
1667
1668                debug_msg( 5, "size of cluster '" + self.cluster + "': " + 
1669                        str( len( self.myMetrics.keys() ) ) + " hosts " + 
1670                        str( count_metrics ) + " metrics " + str( count_values ) + " values " +
1671                        str( count_bits ) + " bits " + str( count_bytes ) + " bytes " )
1672
1673                for hostname, mymetrics in self.myMetrics.items():     
1674
1675                        for metricname, mymetric in mymetrics.items():
1676
1677                                metrics_to_store = [ ]
1678
1679                                # Pop metrics from stack for storing until none is left
1680                                # atomic code: only 1 thread at a time may access myMetrics
1681
1682                                # <ATOMIC>
1683                                #
1684                                self.slot.acquire() 
1685
1686                                while len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1687
1688                                        if len( self.myMetrics[ hostname ][ metricname ] ) > 0:
1689
1690                                                try:
1691                                                        metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) )
1692                                                except IndexError, msg:
1693
1694                                                        # Somehow sometimes myMetrics[ hostname ][ metricname ]
1695                                                        # is still len 0 when the statement is executed.
1696                                                        # Just ignore indexerror's..
1697                                                        pass
1698
1699                                self.slot.release()
1700                                #
1701                                # </ATOMIC>
1702
1703                                # Create a mapping table, each metric to the period where it should be stored
1704                                #
1705                                metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store )
1706
1707                                update_rets = [ ]
1708
1709                                for period, pmetric in metric_serial_table.items():
1710
1711                                        create_ret = self.createCheck( hostname, metricname, period )   
1712
1713                                        update_ret = self.update( hostname, metricname, period, pmetric )
1714
1715                                        if update_ret == 0:
1716
1717                                                debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) )
1718                                        else:
1719                                                debug_msg( 9, 'metric update failed' )
1720
1721                                        update_rets.append( create_ret )
1722                                        update_rets.append( update_ret )
1723
1724                                # Lets ignore errors here for now, we need to make sure last update time
1725                                # is correct!
1726                                #
1727                                #if not (1) in update_rets:
1728
1729                                self.memLastUpdate( hostname, metricname, metrics_to_store )
1730
1731                debug_msg( 5, "Leaving storeMetrics()")
1732
1733        def makeTimeSerial( self ):
1734                """Generate a time serial. Seconds since epoch"""
1735
1736                # Seconds since epoch
1737                mytime = int( time.time() )
1738
1739                return mytime
1740
1741        def makeRrdPath( self, host, metricname, timeserial ):
1742                """Make a RRD location/path and filename"""
1743
1744                rrd_dir         = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
1745                rrd_file        = '%s/%s.rrd'   %( rrd_dir, metricname )
1746
1747                return rrd_dir, rrd_file
1748
1749        def getLastRrdTimeSerial( self, host ):
1750                """Find the last timeserial (directory) for this host"""
1751
1752                newest_timeserial = 0
1753
1754                for dir in self.timeserials[ host ]:
1755
1756                        valid_dir = 1
1757
1758                        for letter in dir:
1759                                if letter not in string.digits:
1760                                        valid_dir = 0
1761
1762                        if valid_dir:
1763                                timeserial = dir
1764                                if timeserial > newest_timeserial:
1765                                        newest_timeserial = timeserial
1766
1767                if newest_timeserial:
1768                        return newest_timeserial
1769                else:
1770                        return 0
1771
1772        def determinePeriod( self, host, check_serial ):
1773                """Determine to which period (directory) this time(serial) belongs"""
1774
1775                period_serial = 0
1776
1777                if self.timeserials.has_key( host ):
1778
1779                        for serial in self.timeserials[ host ]:
1780
1781                                if check_serial >= serial and period_serial < serial:
1782
1783                                        period_serial = serial
1784
1785                return period_serial
1786
1787        def determineSerials( self, host, metricname, metriclist ):
1788                """
1789                Determine the correct serial and corresponding rrd to store
1790                for a list of metrics
1791                """
1792
1793                metric_serial_table = { }
1794
1795                for metric in metriclist:
1796
1797                        if metric['name'] == metricname:
1798
1799                                period          = self.determinePeriod( host, metric['time'] ) 
1800
1801                                archive_secs    = ARCHIVE_HOURS_PER_RRD * (60 * 60)
1802
1803                                if (int( metric['time'] ) - int( period ) ) > archive_secs:
1804
1805                                        # This one should get it's own new period
1806                                        period = metric['time']
1807
1808                                        if not self.timeserials.has_key( host ):
1809                                                self.timeserials[ host ] = [ ]
1810
1811                                        self.timeserials[ host ].append( period )
1812
1813                                if not metric_serial_table.has_key( period ):
1814
1815                                        metric_serial_table[ period ] = [ ]
1816
1817                                metric_serial_table[ period ].append( metric )
1818
1819                return metric_serial_table
1820
1821        def createCheck( self, host, metricname, timeserial ):
1822                """Check if an rrd allready exists for this metric, create if not"""
1823
1824                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1825               
1826                rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial )
1827
1828                if not os.path.exists( rrd_dir ):
1829
1830                        try:
1831                                os.makedirs( rrd_dir )
1832
1833                        except os.OSError, msg:
1834
1835                                if msg.find( 'File exists' ) != -1:
1836
1837                                        # Ignore exists errors
1838                                        pass
1839
1840                                else:
1841
1842                                        print msg
1843                                        return
1844
1845                        debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) )
1846
1847                if not os.path.exists( rrd_file ):
1848
1849                        interval        = self.config.getInterval( self.cluster )
1850                        heartbeat       = 8 * int( interval )
1851
1852                        params          = [ ]
1853
1854                        params.append( '--step' )
1855                        params.append( str( interval ) )
1856
1857                        params.append( '--start' )
1858                        params.append( str( int( timeserial ) - 1 ) )
1859
1860                        params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat )
1861                        params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) )
1862
1863                        self.rrdm.create( str(rrd_file), params )
1864
1865                        debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) )
1866
1867        def update( self, host, metricname, timeserial, metriclist ):
1868                """
1869                Update rrd file for host with metricname
1870                in directory timeserial with metriclist
1871                """
1872
1873                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
1874
1875                rrd_dir, rrd_file       = self.makeRrdPath( host, metricname, timeserial )
1876
1877                update_list             = self.makeUpdateList( host, metriclist )
1878
1879                if len( update_list ) > 0:
1880                        ret = self.rrdm.update( str(rrd_file), update_list )
1881
1882                        if ret:
1883                                return 1
1884               
1885                        debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) )
1886
1887                return 0
1888
1889def daemon():
1890        """daemonized threading"""
1891
1892        # Fork the first child
1893        #
1894        pid = os.fork()
1895
1896        if pid > 0:
1897
1898                sys.exit(0)  # end parent
1899
1900        # creates a session and sets the process group ID
1901        #
1902        os.setsid()
1903
1904        # Fork the second child
1905        #
1906        pid = os.fork()
1907
1908        if pid > 0:
1909
1910                sys.exit(0)  # end parent
1911
1912        write_pidfile()
1913
1914        # Go to the root directory and set the umask
1915        #
1916        os.chdir('/')
1917        os.umask(0)
1918
1919        sys.stdin.close()
1920        sys.stdout.close()
1921        sys.stderr.close()
1922
1923        os.open('/dev/null', os.O_RDWR)
1924        os.dup2(0, 1)
1925        os.dup2(0, 2)
1926
1927        run()
1928
1929def run():
1930        """Threading start"""
1931
1932        myXMLSource             = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
1933        myDataStore             = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )
1934
1935        myTorqueProcessor       = TorqueXMLProcessor( myXMLSource, myDataStore )
1936        myGangliaProcessor      = GangliaXMLProcessor( myXMLSource, myDataStore )
1937
1938        try:
1939                torque_xml_thread       = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )
1940                ganglia_xml_thread      = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )
1941
1942                torque_xml_thread.start()
1943                ganglia_xml_thread.start()
1944               
1945        except thread.error, msg:
1946                debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) )
1947                syslog.closelog()
1948                sys.exit(1)
1949               
1950        debug_msg( 0, 'main threading started.' )
1951
1952def main():
1953        """Program startup"""
1954
1955        global DAEMONIZE, USE_SYSLOG
1956
1957        if not processArgs( sys.argv[1:] ):
1958                sys.exit( 1 )
1959
1960        if( DAEMONIZE and USE_SYSLOG ):
1961                syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1962
1963        if DAEMONIZE:
1964                daemon()
1965        else:
1966                run()
1967
1968#
1969# Global functions
1970#
1971
1972def check_dir( directory ):
1973        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
1974
1975        if directory[-1] == '/':
1976                directory = directory[:-1]
1977
1978        return directory
1979
1980def reqtime2epoch( rtime ):
1981
1982        (hours, minutes, seconds )      = rtime.split( ':' )
1983
1984        etime   = int(seconds)
1985        etime   = etime + ( int(minutes) * 60 )
1986        etime   = etime + ( int(hours) * 60 * 60 )
1987
1988        return etime
1989
1990def debug_msg( level, msg ):
1991        """Only print msg if correct levels"""
1992
1993        if (not DAEMONIZE and DEBUG_LEVEL >= level):
1994                sys.stderr.write( printTime() + ' - ' + msg + '\n' )
1995       
1996        if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1997                syslog.syslog( msg )
1998
1999def printTime( ):
2000        """Print current time in human readable format"""
2001
2002        return time.strftime("%a %d %b %Y %H:%M:%S")
2003
2004def write_pidfile():
2005
2006        # Write pidfile if PIDFILE exists
2007        if PIDFILE:
2008
2009                pid     = os.getpid()
2010
2011                pidfile = open(PIDFILE, 'w')
2012
2013                pidfile.write( str( pid ) )
2014                pidfile.close()
2015
2016# Ooohh, someone started me! Let's go..
2017if __name__ == '__main__':
2018        main()
Note: See TracBrowser for help on using the repository browser.