Changeset 63


Ignore:
Timestamp:
04/14/05 09:57:20 (17 years ago)
Author:
bastiaans
Message:

daemon/togad.py:

Miscellanious code cleanup and additional pydocs

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/daemon/togad.py

    r62 r63  
    99import os.path
    1010import time
    11 import re
    1211import threading
    13 import mutex
    1412import random
    1513from types import *
     
    3028GMETAD_CONF = '/etc/gmetad.conf'
    3129
    32 # Wether or not to maintain a archive of all ganglia node data
    33 # Note: This will require a significant amount of hd space
    34 #       depending on your cluster size
    35 #
    36 ARCHIVE = 0
    37 
    3830# Where to grab XML data from
    3931# Normally: local gmetad (port 8651)
    4032#
    41 ARCHIVE_SOURCE = "localhost:8651"
     33# Syntax: <hostname>:<port>
     34#
     35ARCHIVE_XMLSOURCE = "localhost:8651"
    4236
    4337# List of data_source names to archive for
    4438#
     39# Syntax: [ "<clustername>", "<clustername>" ]
     40#
    4541ARCHIVE_DATASOURCES = [ "LISA Cluster" ]
    4642
     
    5349ARCHIVE_HOURS_PER_RRD = 12
    5450
    55 # Wether or not to run a seperate Toga jobinfo server
    56 #
    57 TOGA_SERVER = 1
    58 
    59 # On what interfaces to listen
    60 #
    61 TOGA_SERVER_IP = [ '127.0.0.1' ]
    62 
    63 # On what port to listen
    64 #
    65 TOGA_SERVER_PORT = 9048
    66 
    67 # Toga's SQL dbase name to use
    68 #
    69 TOGA_SERVER_SQL_DBASE = "toga"
     51# Toga's SQL dbase to use
     52#
     53# Syntax: <hostname>/<database>
     54#
     55TOGA_SQL_DBASE = "localhost/toga"
    7056
    7157# Wether or not to run as a daemon in background
     
    8167###
    8268# You'll only want to change anything below here unless you
    83 # know what you are doing (i.e. your name is Ramon Bastiaans)
     69# know what you are doing (i.e. your name is Ramon Bastiaans :D )
    8470###
    8571
     
    10086"""
    10187
    102 class TogaServer:
    103 
    104         sockets = [ ]
    105 
    106 
    107 
    108         def __init__( self ):
    109 
    110                 s = None
    111                 for host in TOGA_SERVER_IP:
    112 
    113                         for res in socket.getaddrinfo( host, TOGA_SERVER_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE ):
    114 
    115                                 af, socktype, proto, canonname, sa = res
    116 
    117                                 try:
    118 
    119                                         s = socket.socket( af, socktype, proto )
    120 
    121                                 except socket.error, msg:
    122 
    123                                         s = None
    124                                         continue
    125 
    126                                 try:
    127                                         s.bind( sa )
    128                                         s.listen( 1 )
    129 
    130                                 except socket.error, msg:
    131 
    132                                         s.close()
    133                                         s = None
    134                                         continue
    135                                         break
    136 
    137                                 if not self.s:
    138 
    139                                         debug_msg( 6, 'Could not open socket' )
    140                                         return None
    141 
    142                                 else:
    143 
    144                                         self.sockets.append( s )
    145 
    146         def run( self ):
    147 
    148                 for s in self.sockets:
    149 
    150                         while( 1 ):
    151 
    152                                 conn, addr = s.accept()
    153                                 pid = os.fork()
    154 
    155                                 if pid == 0:
    156 
    157                                         debug_msg( 6, 'New connection to %s' %addr[0] )
    158 
    159                                         leesme = conn.makefile( 'r' )
    160                                         for line in leesme.readlines():
    161                                                 print line
    162                                        
    163                                         conn.close()
    164                                         conn.shutdown( 2 )
    165 
    166                                         sys.exit( 0 )
    167 
    16888class RRDMutator:
    169         "A class for handling .rrd mutations"
     89        """A class for performing RRD mutations"""
    17090
    17191        binary = '/usr/bin/rrdtool'
    17292
    17393        def __init__( self, binary=None ):
     94                """Set alternate binary if supplied"""
    17495
    17596                if binary:
     
    17798
    17899        def create( self, filename, args ):
     100                """Create a new rrd with args"""
     101
    179102                return self.perform( 'create', '"' + filename + '"', args )
    180103
    181104        def update( self, filename, args ):
     105                """Update a rrd with args"""
     106
    182107                return self.perform( 'update', '"' + filename + '"', args )
    183108
    184109        def grabLastUpdate( self, filename ):
     110                """Determine the last update time of filename rrd"""
    185111
    186112                last_update = 0
     
    200126
    201127        def perform( self, action, filename, args ):
     128                """Perform action on rrd filename with args"""
    202129
    203130                arg_string = None
     
    227154                return 0
    228155
     156class TorqueXMLHandler( ContentHandler ):
     157        """Parse Torque's jobinfo XML from our plugin"""
     158
     159        def __init__( self ):
     160
     161                pass
     162
     163        def startElement( self, name, attrs ):
     164                """
     165                This XML will be all gmetric XML
     166                so there will be no specific start/end element
     167                just one XML statement with all info
     168                """
     169               
     170                heartbeat = 0
     171
     172                if name == 'METRIC':
     173
     174                        metricname = attrss.get( 'NAME', "" )
     175
     176                        if metricname == 'TOGA-HEARTBEAT':
     177
     178                                if not self.heartbeat:
     179                                        self.heartbeat = attrs.get( 'VAL', "" )
     180
     181                        if metricname.find( 'TOGA-JOB' ) != -1:
     182
     183                                job_id = name.split( 'TOGA-JOB-' )[1]
     184                                val = attrs.get( 'VAL', "" )
     185
     186                                valinfo = val.split( ' ' )
     187
     188                                for myval in valinfo:
     189
     190                                        name = valinfo.split( '=' )[0]
     191                                        value = valinfo.split( '=' )[1]
     192
    229193class GangliaXMLHandler( ContentHandler ):
    230         "Parse Ganglia's XML"
     194        """Parse Ganglia's XML"""
    231195
    232196        def __init__( self, config ):
     197                """Setup initial variables and gather info on existing rrd archive"""
     198
    233199                self.config = config
    234200                self.clusters = { }
     
    238204
    239205        def gatherClusters( self ):
     206                """Find all existing clusters in archive dir"""
    240207
    241208                archive_dir = check_dir(ARCHIVE_PATH)
     
    256223
    257224        def startElement( self, name, attrs ):
    258                 "Store appropriate data from xml start tags"
     225                """Memorize appropriate data from xml start tags"""
    259226
    260227                if name == 'GANGLIA_XML':
     
    307274
    308275        def storeMetrics( self ):
     276                """Store metrics of each cluster rrd handler"""
    309277
    310278                for clustername, rrdh in self.clusters.items():
     
    319287
    320288class GangliaXMLGatherer:
    321         "Setup a connection and file object to Ganglia's XML"
     289        """Setup a connection and file object to Ganglia's XML"""
    322290
    323291        s = None
    324292
    325293        def __init__( self, host, port ):
    326                 "Store host and port for connection"
     294                """Store host and port for connection"""
    327295
    328296                self.host = host
     
    331299
    332300        def connect( self ):
    333                 "Setup connection to XML source"
     301                """Setup connection to XML source"""
    334302
    335303                for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ):
     
    364332
    365333        def disconnect( self ):
    366                 "Close socket"
     334                """Close socket"""
    367335
    368336                if self.s:
     
    371339
    372340        def __del__( self ):
    373                 "Kill the socket before we leave"
     341                """Kill the socket before we leave"""
    374342
    375343                self.disconnect()
    376344
    377345        def getFileObject( self ):
    378                 "Connect, and return a file object"
     346                """Connect, and return a file object"""
    379347
    380348                if self.s:
     
    388356
    389357class GangliaXMLProcessor:
     358        """Main class for processing XML and acting with it"""
    390359
    391360        def __init__( self ):
    392                 "Setup initial XML connection and handlers"
     361                """Setup initial XML connection and handlers"""
    393362
    394363                self.config = GangliaConfigParser( GMETAD_CONF )
    395364
    396                 self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_SOURCE.split( ':' )[0], ARCHIVE_SOURCE.split( ':' )[1] )
     365                self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )
    397366                self.myParser = make_parser()   
    398367                self.myHandler = GangliaXMLHandler( self.config )
     
    400369
    401370        def daemon( self ):
    402                 "Run as daemon forever"
    403 
    404                 self.DAEMON = 1
     371                """Run as daemon forever"""
    405372
    406373                # Fork the first child
     
    440407
    441408        def printTime( self ):
    442                 "Print current time in human readable format"
     409                """Print current time in human readable format for logging"""
    443410
    444411                return time.strftime("%a %d %b %Y %H:%M:%S")
    445412
    446413        def run( self ):
    447                 "Main thread"
     414                """Main XML processing; start a xml and storethread"""
    448415
    449416                xmlthread = threading.Thread( None, self.processXML, 'xmlthread' )
     
    472439
    473440        def storeMetrics( self ):
    474                 "Store metrics retained in memory to disk"
     441                """Store metrics retained in memory to disk"""
    475442
    476443                debug_msg( 7, self.printTime() + ' - storethread(): started.' )
    477444
    478                 # Store metrics somewhere between every 60 and 180 seconds
     445                # Store metrics somewhere between every 360 and 640 seconds
    479446                #
    480447                STORE_INTERVAL = random.randint( 360, 640 )
     
    498465
    499466        def storeThread( self ):
     467                """Actual metric storing thread"""
    500468
    501469                debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' )
     
    508476
    509477        def processXML( self ):
    510                 "Process XML"
     478                """Process XML"""
    511479
    512480                debug_msg( 7, self.printTime() + ' - xmlthread(): started.' )
     
    530498
    531499        def parseThread( self ):
     500                """Actual parsing thread"""
    532501
    533502                debug_msg( 7, self.printTime() + ' - parsethread(): started.' )
     
    544513
    545514        def __init__( self, config ):
     515                """Parse some stuff from our gmetad's config, such as polling interval"""
    546516
    547517                self.config = config
     
    549519
    550520        def parseValues( self ):
    551                 "Parse certain values from gmetad.conf"
     521                """Parse certain values from gmetad.conf"""
    552522
    553523                readcfg = open( self.config, 'r' )
     
    586556
    587557        def getInterval( self, source_name ):
     558                """Return interval for source_name"""
    588559
    589560                for source in self.sources:
     
    596567
    597568        def getLowestInterval( self ):
     569                """Return the lowest interval of all clusters"""
    598570
    599571                lowest_interval = 0
     
    612584
    613585class RRDHandler:
     586        """Class for handling RRD activity"""
    614587
    615588        myMetrics = { }
     
    619592
    620593        def __init__( self, config, cluster ):
     594                """Setup initial variables"""
    621595                self.block = 0
    622596                self.cluster = cluster
     
    627601
    628602        def gatherLastUpdates( self ):
    629                 "Populate the lastStored list, containing timestamps of all last updates"
     603                """Populate the lastStored list, containing timestamps of all last updates"""
    630604
    631605                cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster )
     
    673647
    674648        def getClusterName( self ):
     649                """Return clustername"""
     650
    675651                return self.cluster
    676652
    677653        def memMetric( self, host, metric ):
     654                """Store metric from host in memory"""
    678655
    679656                if self.myMetrics.has_key( host ):
     
    693670                        self.myMetrics[ host ][ metric['name'] ] = [ ]
    694671
     672                # Push new metric onto stack
     673                # atomic code; only 1 thread at a time may access the stack
     674
    695675                # <ATOMIC>
    696676                #
     
    704684
    705685        def makeUpdateList( self, host, metriclist ):
     686                """
     687                Make a list of update values for rrdupdate
     688                but only those that we didn't store before
     689                """
    706690
    707691                update_list = [ ]
     
    718702
    719703        def checkStoreMetric( self, host, metric ):
     704                """Check if supplied metric if newer than last one stored"""
    720705
    721706                if self.lastStored.has_key( host ):
     
    731716
    732717        def memLastUpdate( self, host, metricname, metriclist ):
     718                """
     719                Memorize the time of the latest metric from metriclist
     720                but only if it wasn't allready memorized
     721                """
    733722
    734723                if not self.lastStored.has_key( host ):
     
    753742
    754743        def storeMetrics( self ):
     744                """
     745                Store all metrics from memory to disk
     746                and do it to the RRD's in appropriate timeperiod directory
     747                """
    755748
    756749                for hostname, mymetrics in self.myMetrics.items():     
     
    759752
    760753                                metrics_to_store = [ ]
     754
     755                                # Pop metrics from stack for storing until none is left
     756                                # atomic code: only 1 thread at a time may access myMetrics
    761757
    762758                                # <ATOMIC>
     
    798794
    799795        def makeTimeSerial( self ):
    800                 "Generate a time serial. Seconds since epoch"
     796                """Generate a time serial. Seconds since epoch"""
    801797
    802798                # Seconds since epoch
     
    806802
    807803        def makeRrdPath( self, host, metricname, timeserial ):
    808                 """
    809                 Make a RRD location/path and filename
    810                 If a metric or timeserial are supplied the complete locations
    811                 will be made, else just the host directory
    812                 """
     804                """Make a RRD location/path and filename"""
    813805
    814806                rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )
     
    818810
    819811        def getLastRrdTimeSerial( self, host ):
    820                 """
    821                 Find the last timeserial (directory) for this host
    822                 This is determined once every host
    823                 """
     812                """Find the last timeserial (directory) for this host"""
    824813
    825814                newest_timeserial = 0
     
    844833
    845834        def determinePeriod( self, host, check_serial ):
     835                """Determine to which period (directory) this time(serial) belongs"""
    846836
    847837                period_serial = 0
     
    892882
    893883        def createCheck( self, host, metricname, timeserial ):
    894                 "Check if an .rrd allready exists for this metric, create if not"
     884                """Check if an rrd allready exists for this metric, create if not"""
    895885
    896886                debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
     
    938928
    939929        def update( self, host, metricname, timeserial, metriclist ):
     930                """
     931                Update rrd file for host with metricname
     932                in directory timeserial with metriclist
     933                """
    940934
    941935                debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) )
     
    956950
    957951def main():
    958         "Program startup"
    959 
    960         if TOGA_SERVER:
    961        
    962                 myServer = TogaServer()
    963 
    964                 if DAEMONIZE:
    965                         myServer.daemon()
    966                 else:
    967                         myServer.run()
    968 
    969         if ARCHIVE:
    970 
    971                 myProcessor = GangliaXMLProcessor()
    972 
    973                 if DAEMONIZE:
    974                         myProcessor.daemon()
    975                 else:
    976                         myProcessor.run()
     952        """Program startup"""
     953
     954        myProcessor = GangliaXMLProcessor()
     955
     956        if DAEMONIZE:
     957                myProcessor.daemon()
     958        else:
     959                myProcessor.run()
    977960
    978961def check_dir( directory ):
    979         "Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"
     962        """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"""
    980963
    981964        if directory[-1] == '/':
     
    985968
    986969def debug_msg( level, msg ):
     970        """Only print msg if it is not below our debug level"""
    987971
    988972        if (DEBUG_LEVEL >= level):
     
    990974
    991975def printTime( ):
    992         "Print current time in human readable format"
     976        """Print current time in human readable format"""
    993977
    994978        return time.strftime("%a %d %b %Y %H:%M:%S")
    995979
    996 # Let's go
     980# Ooohh, someone started me! Let's go..
    997981if __name__ == '__main__':
    998982        main()
Note: See TracChangeset for help on using the changeset viewer.