Changeset 63 for trunk/daemon/togad.py
- Timestamp:
- 04/14/05 09:57:20 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r62 r63 9 9 import os.path 10 10 import time 11 import re12 11 import threading 13 import mutex14 12 import random 15 13 from types import * … … 30 28 GMETAD_CONF = '/etc/gmetad.conf' 31 29 32 # Wether or not to maintain a archive of all ganglia node data33 # Note: This will require a significant amount of hd space34 # depending on your cluster size35 #36 ARCHIVE = 037 38 30 # Where to grab XML data from 39 31 # Normally: local gmetad (port 8651) 40 32 # 41 ARCHIVE_SOURCE = "localhost:8651" 33 # Syntax: <hostname>:<port> 34 # 35 ARCHIVE_XMLSOURCE = "localhost:8651" 42 36 43 37 # List of data_source names to archive for 44 38 # 39 # Syntax: [ "<clustername>", "<clustername>" ] 40 # 45 41 ARCHIVE_DATASOURCES = [ "LISA Cluster" ] 46 42 … … 53 49 ARCHIVE_HOURS_PER_RRD = 12 54 50 55 # Wether or not to run a seperate Toga jobinfo server 56 # 57 TOGA_SERVER = 1 58 59 # On what interfaces to listen 60 # 61 TOGA_SERVER_IP = [ '127.0.0.1' ] 62 63 # On what port to listen 64 # 65 TOGA_SERVER_PORT = 9048 66 67 # Toga's SQL dbase name to use 68 # 69 TOGA_SERVER_SQL_DBASE = "toga" 51 # Toga's SQL dbase to use 52 # 53 # Syntax: <hostname>/<database> 54 # 55 TOGA_SQL_DBASE = "localhost/toga" 70 56 71 57 # Wether or not to run as a daemon in background … … 81 67 ### 82 68 # You'll only want to change anything below here unless you 83 # know what you are doing (i.e. your name is Ramon Bastiaans )69 # know what you are doing (i.e. your name is Ramon Bastiaans :D ) 84 70 ### 85 71 … … 100 86 """ 101 87 102 class TogaServer:103 104 sockets = [ ]105 106 107 108 def __init__( self ):109 110 s = None111 for host in TOGA_SERVER_IP:112 113 for res in socket.getaddrinfo( host, TOGA_SERVER_PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE ):114 115 af, socktype, proto, canonname, sa = res116 117 try:118 119 s = socket.socket( af, socktype, proto )120 121 except socket.error, msg:122 123 s = None124 continue125 126 try:127 s.bind( sa )128 s.listen( 1 )129 130 except socket.error, msg:131 132 s.close()133 s = None134 continue135 break136 137 if not self.s:138 139 debug_msg( 6, 'Could not open socket' )140 return None141 142 else:143 144 self.sockets.append( s )145 146 def run( self ):147 148 for s in self.sockets:149 150 while( 1 ):151 152 conn, addr = s.accept()153 pid = os.fork()154 155 if pid == 0:156 157 debug_msg( 6, 'New connection to %s' %addr[0] )158 159 leesme = conn.makefile( 'r' )160 for line in leesme.readlines():161 print line162 163 conn.close()164 conn.shutdown( 2 )165 166 sys.exit( 0 )167 168 88 class RRDMutator: 169 " A class for handling .rrd mutations"89 """A class for performing RRD mutations""" 170 90 171 91 binary = '/usr/bin/rrdtool' 172 92 173 93 def __init__( self, binary=None ): 94 """Set alternate binary if supplied""" 174 95 175 96 if binary: … … 177 98 178 99 def create( self, filename, args ): 100 """Create a new rrd with args""" 101 179 102 return self.perform( 'create', '"' + filename + '"', args ) 180 103 181 104 def update( self, filename, args ): 105 """Update a rrd with args""" 106 182 107 return self.perform( 'update', '"' + filename + '"', args ) 183 108 184 109 def grabLastUpdate( self, filename ): 110 """Determine the last update time of filename rrd""" 185 111 186 112 last_update = 0 … … 200 126 201 127 def perform( self, action, filename, args ): 128 """Perform action on rrd filename with args""" 202 129 203 130 arg_string = None … … 227 154 return 0 228 155 156 class TorqueXMLHandler( ContentHandler ): 157 """Parse Torque's jobinfo XML from our plugin""" 158 159 def __init__( self ): 160 161 pass 162 163 def startElement( self, name, attrs ): 164 """ 165 This XML will be all gmetric XML 166 so there will be no specific start/end element 167 just one XML statement with all info 168 """ 169 170 heartbeat = 0 171 172 if name == 'METRIC': 173 174 metricname = attrss.get( 'NAME', "" ) 175 176 if metricname == 'TOGA-HEARTBEAT': 177 178 if not self.heartbeat: 179 self.heartbeat = attrs.get( 'VAL', "" ) 180 181 if metricname.find( 'TOGA-JOB' ) != -1: 182 183 job_id = name.split( 'TOGA-JOB-' )[1] 184 val = attrs.get( 'VAL', "" ) 185 186 valinfo = val.split( ' ' ) 187 188 for myval in valinfo: 189 190 name = valinfo.split( '=' )[0] 191 value = valinfo.split( '=' )[1] 192 229 193 class GangliaXMLHandler( ContentHandler ): 230 " Parse Ganglia's XML"194 """Parse Ganglia's XML""" 231 195 232 196 def __init__( self, config ): 197 """Setup initial variables and gather info on existing rrd archive""" 198 233 199 self.config = config 234 200 self.clusters = { } … … 238 204 239 205 def gatherClusters( self ): 206 """Find all existing clusters in archive dir""" 240 207 241 208 archive_dir = check_dir(ARCHIVE_PATH) … … 256 223 257 224 def startElement( self, name, attrs ): 258 " Store appropriate data from xml start tags"225 """Memorize appropriate data from xml start tags""" 259 226 260 227 if name == 'GANGLIA_XML': … … 307 274 308 275 def storeMetrics( self ): 276 """Store metrics of each cluster rrd handler""" 309 277 310 278 for clustername, rrdh in self.clusters.items(): … … 319 287 320 288 class GangliaXMLGatherer: 321 " Setup a connection and file object to Ganglia's XML"289 """Setup a connection and file object to Ganglia's XML""" 322 290 323 291 s = None 324 292 325 293 def __init__( self, host, port ): 326 " Store host and port for connection"294 """Store host and port for connection""" 327 295 328 296 self.host = host … … 331 299 332 300 def connect( self ): 333 " Setup connection to XML source"301 """Setup connection to XML source""" 334 302 335 303 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): … … 364 332 365 333 def disconnect( self ): 366 " Close socket"334 """Close socket""" 367 335 368 336 if self.s: … … 371 339 372 340 def __del__( self ): 373 " Kill the socket before we leave"341 """Kill the socket before we leave""" 374 342 375 343 self.disconnect() 376 344 377 345 def getFileObject( self ): 378 " Connect, and return a file object"346 """Connect, and return a file object""" 379 347 380 348 if self.s: … … 388 356 389 357 class GangliaXMLProcessor: 358 """Main class for processing XML and acting with it""" 390 359 391 360 def __init__( self ): 392 " Setup initial XML connection and handlers"361 """Setup initial XML connection and handlers""" 393 362 394 363 self.config = GangliaConfigParser( GMETAD_CONF ) 395 364 396 self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_ SOURCE.split( ':' )[0], ARCHIVE_SOURCE.split( ':' )[1] )365 self.myXMLGatherer = GangliaXMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 397 366 self.myParser = make_parser() 398 367 self.myHandler = GangliaXMLHandler( self.config ) … … 400 369 401 370 def daemon( self ): 402 "Run as daemon forever" 403 404 self.DAEMON = 1 371 """Run as daemon forever""" 405 372 406 373 # Fork the first child … … 440 407 441 408 def printTime( self ): 442 " Print current time in human readable format"409 """Print current time in human readable format for logging""" 443 410 444 411 return time.strftime("%a %d %b %Y %H:%M:%S") 445 412 446 413 def run( self ): 447 " Main thread"414 """Main XML processing; start a xml and storethread""" 448 415 449 416 xmlthread = threading.Thread( None, self.processXML, 'xmlthread' ) … … 472 439 473 440 def storeMetrics( self ): 474 " Store metrics retained in memory to disk"441 """Store metrics retained in memory to disk""" 475 442 476 443 debug_msg( 7, self.printTime() + ' - storethread(): started.' ) 477 444 478 # Store metrics somewhere between every 60 and 180 seconds445 # Store metrics somewhere between every 360 and 640 seconds 479 446 # 480 447 STORE_INTERVAL = random.randint( 360, 640 ) … … 498 465 499 466 def storeThread( self ): 467 """Actual metric storing thread""" 500 468 501 469 debug_msg( 7, self.printTime() + ' - storemetricthread(): started.' ) … … 508 476 509 477 def processXML( self ): 510 " Process XML"478 """Process XML""" 511 479 512 480 debug_msg( 7, self.printTime() + ' - xmlthread(): started.' ) … … 530 498 531 499 def parseThread( self ): 500 """Actual parsing thread""" 532 501 533 502 debug_msg( 7, self.printTime() + ' - parsethread(): started.' ) … … 544 513 545 514 def __init__( self, config ): 515 """Parse some stuff from our gmetad's config, such as polling interval""" 546 516 547 517 self.config = config … … 549 519 550 520 def parseValues( self ): 551 " Parse certain values from gmetad.conf"521 """Parse certain values from gmetad.conf""" 552 522 553 523 readcfg = open( self.config, 'r' ) … … 586 556 587 557 def getInterval( self, source_name ): 558 """Return interval for source_name""" 588 559 589 560 for source in self.sources: … … 596 567 597 568 def getLowestInterval( self ): 569 """Return the lowest interval of all clusters""" 598 570 599 571 lowest_interval = 0 … … 612 584 613 585 class RRDHandler: 586 """Class for handling RRD activity""" 614 587 615 588 myMetrics = { } … … 619 592 620 593 def __init__( self, config, cluster ): 594 """Setup initial variables""" 621 595 self.block = 0 622 596 self.cluster = cluster … … 627 601 628 602 def gatherLastUpdates( self ): 629 " Populate the lastStored list, containing timestamps of all last updates"603 """Populate the lastStored list, containing timestamps of all last updates""" 630 604 631 605 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster ) … … 673 647 674 648 def getClusterName( self ): 649 """Return clustername""" 650 675 651 return self.cluster 676 652 677 653 def memMetric( self, host, metric ): 654 """Store metric from host in memory""" 678 655 679 656 if self.myMetrics.has_key( host ): … … 693 670 self.myMetrics[ host ][ metric['name'] ] = [ ] 694 671 672 # Push new metric onto stack 673 # atomic code; only 1 thread at a time may access the stack 674 695 675 # <ATOMIC> 696 676 # … … 704 684 705 685 def makeUpdateList( self, host, metriclist ): 686 """ 687 Make a list of update values for rrdupdate 688 but only those that we didn't store before 689 """ 706 690 707 691 update_list = [ ] … … 718 702 719 703 def checkStoreMetric( self, host, metric ): 704 """Check if supplied metric if newer than last one stored""" 720 705 721 706 if self.lastStored.has_key( host ): … … 731 716 732 717 def memLastUpdate( self, host, metricname, metriclist ): 718 """ 719 Memorize the time of the latest metric from metriclist 720 but only if it wasn't allready memorized 721 """ 733 722 734 723 if not self.lastStored.has_key( host ): … … 753 742 754 743 def storeMetrics( self ): 744 """ 745 Store all metrics from memory to disk 746 and do it to the RRD's in appropriate timeperiod directory 747 """ 755 748 756 749 for hostname, mymetrics in self.myMetrics.items(): … … 759 752 760 753 metrics_to_store = [ ] 754 755 # Pop metrics from stack for storing until none is left 756 # atomic code: only 1 thread at a time may access myMetrics 761 757 762 758 # <ATOMIC> … … 798 794 799 795 def makeTimeSerial( self ): 800 " Generate a time serial. Seconds since epoch"796 """Generate a time serial. Seconds since epoch""" 801 797 802 798 # Seconds since epoch … … 806 802 807 803 def makeRrdPath( self, host, metricname, timeserial ): 808 """ 809 Make a RRD location/path and filename 810 If a metric or timeserial are supplied the complete locations 811 will be made, else just the host directory 812 """ 804 """Make a RRD location/path and filename""" 813 805 814 806 rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial ) … … 818 810 819 811 def getLastRrdTimeSerial( self, host ): 820 """ 821 Find the last timeserial (directory) for this host 822 This is determined once every host 823 """ 812 """Find the last timeserial (directory) for this host""" 824 813 825 814 newest_timeserial = 0 … … 844 833 845 834 def determinePeriod( self, host, check_serial ): 835 """Determine to which period (directory) this time(serial) belongs""" 846 836 847 837 period_serial = 0 … … 892 882 893 883 def createCheck( self, host, metricname, timeserial ): 894 " Check if an .rrd allready exists for this metric, create if not"884 """Check if an rrd allready exists for this metric, create if not""" 895 885 896 886 debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) … … 938 928 939 929 def update( self, host, metricname, timeserial, metriclist ): 930 """ 931 Update rrd file for host with metricname 932 in directory timeserial with metriclist 933 """ 940 934 941 935 debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) … … 956 950 957 951 def main(): 958 "Program startup" 959 960 if TOGA_SERVER: 961 962 myServer = TogaServer() 963 964 if DAEMONIZE: 965 myServer.daemon() 966 else: 967 myServer.run() 968 969 if ARCHIVE: 970 971 myProcessor = GangliaXMLProcessor() 972 973 if DAEMONIZE: 974 myProcessor.daemon() 975 else: 976 myProcessor.run() 952 """Program startup""" 953 954 myProcessor = GangliaXMLProcessor() 955 956 if DAEMONIZE: 957 myProcessor.daemon() 958 else: 959 myProcessor.run() 977 960 978 961 def check_dir( directory ): 979 " Check if directory is a proper directory. I.e.: Does _not_ end with a '/'"962 """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'""" 980 963 981 964 if directory[-1] == '/': … … 985 968 986 969 def debug_msg( level, msg ): 970 """Only print msg if it is not below our debug level""" 987 971 988 972 if (DEBUG_LEVEL >= level): … … 990 974 991 975 def printTime( ): 992 " Print current time in human readable format"976 """Print current time in human readable format""" 993 977 994 978 return time.strftime("%a %d %b %Y %H:%M:%S") 995 979 996 # Let's go980 # Ooohh, someone started me! Let's go.. 997 981 if __name__ == '__main__': 998 982 main()
Note: See TracChangeset
for help on using the changeset viewer.