Changeset 770
- Timestamp:
- 03/28/13 16:41:54 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/0.4/jobarchived/jobarchived.py
r500 r770 24 24 import getopt, syslog, ConfigParser, sys 25 25 26 VERSION='0. 3.1'26 VERSION='0.4+SVN' 27 27 28 28 def usage( ver ): 29 29 30 31 32 33 34 35 36 37 38 39 40 print 'Usage:jobarchived [OPTIONS]'41 42 print ' -c, --config=FILEThe configuration file to use (default: /etc/jobarchived.conf)'43 print ' -p, --pidfile=FILEUse pid file to store the process id'44 print ' -h, --helpPrint help and exit'45 print ' -v, --versionPrint version and exit'46 30 print 'jobarchived %s' %VERSION 31 32 if ver: 33 return 0 34 35 print 36 print 'Purpose:' 37 print ' The Job Archive Daemon (jobarchived) stores batch job information in a SQL database' 38 print ' and node statistics in a RRD archive' 39 print 40 print 'Usage: jobarchived [OPTIONS]' 41 print 42 print ' -c, --config=FILE The configuration file to use (default: /etc/jobarchived.conf)' 43 print ' -p, --pidfile=FILE Use pid file to store the process id' 44 print ' -h, --help Print help and exit' 45 print ' -v, --version Print version and exit' 46 print 47 47 48 48 def processArgs( args ): 49 49 50 SHORT_L 51 LONG_L 50 SHORT_L = 'p:hvc:' 51 LONG_L = [ 'help', 'config=', 'pidfile=', 'version' ] 52 52 53 53 config_filename = '/etc/jobarchived.conf' 54 54 55 56 57 PIDFILE= None55 global PIDFILE 56 57 PIDFILE = None 58 58 59 59 try: … … 72 72 config_filename = value 73 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 74 if opt in [ '--pidfile', '-p' ]: 75 76 PIDFILE = value 77 78 if opt in [ '--help', '-h' ]: 79 80 usage( False ) 81 sys.exit( 0 ) 82 83 if opt in [ '--version', '-v' ]: 84 85 usage( True ) 86 sys.exit( 0 ) 87 88 try: 89 return loadConfig( config_filename ) 90 91 except ConfigParser.NoOptionError, detail: 92 93 print detail 94 sys.exit( 1 ) 95 95 96 96 def loadConfig( filename ): 97 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 ARCHIVE_PATH= cfg.get( 'DEFAULT', 'ARCHIVE_PATH' )139 140 ARCHIVE_HOURS_PER_RRD= cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' )141 142 DEBUG_LEVEL= cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )143 144 USE_SYSLOG= cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )145 146 SYSLOG_LEVEL= cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )147 148 MODRRDTOOL= False149 150 151 152 153 154 MODRRDTOOL= True155 156 157 158 MODRRDTOOL= False159 160 161 162 RRDTOOL= cfg.get( 'DEFAULT', 'RRDTOOL' )163 164 165 166 SYSLOG_FACILITY= eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )167 168 169 170 171 172 173 GMETAD_CONF= cfg.get( 'DEFAULT', 'GMETAD_CONF' )174 175 ARCHIVE_XMLSOURCE= cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' )176 177 ARCHIVE_DATASOURCES 178 179 ARCHIVE_EXCLUDE_METRICS= getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) )180 181 JOB_SQL_DBASE= cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' )182 183 JOB_TIMEOUT= cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' )184 185 DAEMONIZE= cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )186 187 188 98 def getlist( cfg_string ): 99 100 my_list = [ ] 101 102 for item_txt in cfg_string.split( ',' ): 103 104 sep_char = None 105 106 item_txt = item_txt.strip() 107 108 for s_char in [ "'", '"' ]: 109 110 if item_txt.find( s_char ) != -1: 111 112 if item_txt.count( s_char ) != 2: 113 114 print 'Missing quote: %s' %item_txt 115 sys.exit( 1 ) 116 117 else: 118 119 sep_char = s_char 120 break 121 122 if sep_char: 123 124 item_txt = item_txt.split( sep_char )[1] 125 126 my_list.append( item_txt ) 127 128 return my_list 129 130 cfg = ConfigParser.ConfigParser() 131 132 cfg.read( filename ) 133 134 global DEBUG_LEVEL, USE_SYSLOG, SYSLOG_LEVEL, SYSLOG_FACILITY, GMETAD_CONF, ARCHIVE_XMLSOURCE 135 global ARCHIVE_DATASOURCES, ARCHIVE_PATH, ARCHIVE_HOURS_PER_RRD, ARCHIVE_EXCLUDE_METRICS 136 global JOB_SQL_DBASE, DAEMONIZE, RRDTOOL, JOB_TIMEOUT, MODRRDTOOL 137 138 ARCHIVE_PATH = cfg.get( 'DEFAULT', 'ARCHIVE_PATH' ) 139 140 ARCHIVE_HOURS_PER_RRD = cfg.getint( 'DEFAULT', 'ARCHIVE_HOURS_PER_RRD' ) 141 142 DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' ) 143 144 USE_SYSLOG = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' ) 145 146 SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' ) 147 148 MODRRDTOOL = False 149 150 try: 151 global rrdtool 152 import rrdtool 153 154 MODRRDTOOL = True 155 156 except ImportError: 157 158 MODRRDTOOL = False 159 160 print "ERROR: py-rrdtool import FAILED: failing back to DEPRECATED use of rrdtool binary. This will slow down jobarchived significantly!" 161 162 RRDTOOL = cfg.get( 'DEFAULT', 'RRDTOOL' ) 163 164 try: 165 166 SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) ) 167 168 except AttributeError, detail: 169 170 print 'Unknown syslog facility' 171 sys.exit( 1 ) 172 173 GMETAD_CONF = cfg.get( 'DEFAULT', 'GMETAD_CONF' ) 174 175 ARCHIVE_XMLSOURCE = cfg.get( 'DEFAULT', 'ARCHIVE_XMLSOURCE' ) 176 177 ARCHIVE_DATASOURCES = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_DATASOURCES' ) ) 178 179 ARCHIVE_EXCLUDE_METRICS = getlist( cfg.get( 'DEFAULT', 'ARCHIVE_EXCLUDE_METRICS' ) ) 180 181 JOB_SQL_DBASE = cfg.get( 'DEFAULT', 'JOB_SQL_DBASE' ) 182 183 JOB_TIMEOUT = cfg.getint( 'DEFAULT', 'JOB_TIMEOUT' ) 184 185 DAEMONIZE = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' ) 186 187 188 return True 189 189 190 190 # What XML data types not to store … … 209 209 210 210 try: 211 211 from pyPgSQL import PgSQL 212 212 213 213 except ImportError, details: 214 214 215 216 215 print "FATAL ERROR: pyPgSQL python module not found" 216 sys.exit( 1 ) 217 217 218 218 # Orginal from Andre van der Vlies <andre@vandervlies.xs4all.nl> for MySQL. Changed … … 354 354 class DataSQLStore: 355 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 job_values= [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ]457 458 insert_col_str= 'job_id'459 insert_val_str= "'%s'" %job_id460 update_str= None461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 node= '%s.%s' %( node, domain )533 id= self.getNodeId( node )534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 cleanjobs= [ ]572 timeoutjobs= [ ]573 574 jobtimeout_sec= JOB_TIMEOUT * (60 * 60)575 cur_time= time.time()576 577 578 579 job_id= row[0]580 job_requested_time= row[4]581 job_status= row[7]582 job_start_timestamp= row[8]583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 rtime_epoch= reqtime2epoch( job_requested_time )603 604 rtime_epoch= None605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 ( i, s, r )= j626 627 628 new_end_timestamp= int( s ) + r629 630 631 356 db_vars = None 357 dbc = None 358 359 def __init__( self, hostname, database ): 360 361 self.db_vars = InitVars(DataBaseName=database, 362 User='root', 363 Host=hostname, 364 Password='', 365 Dictionary='true') 366 367 try: 368 self.dbc = DB(self.db_vars) 369 except DBError, details: 370 debug_msg( 0, 'FATAL ERROR: Unable to connect to database!: ' +str(details) ) 371 sys.exit(1) 372 373 def setDatabase(self, statement): 374 ret = self.doDatabase('set', statement) 375 return ret 376 377 def getDatabase(self, statement): 378 ret = self.doDatabase('get', statement) 379 return ret 380 381 def doDatabase(self, type, statement): 382 383 debug_msg( 10, 'doDatabase(): %s: %s' %(type, statement) ) 384 try: 385 if type == 'set': 386 result = self.dbc.Set( statement ) 387 self.dbc.Commit() 388 elif type == 'get': 389 result = self.dbc.Get( statement ) 390 391 except DBError, detail: 392 operation = statement.split(' ')[0] 393 debug_msg( 0, 'FATAL ERROR: ' +operation+ ' on database failed while doing ['+statement+'] full msg: '+str(detail) ) 394 sys.exit(1) 395 396 debug_msg( 10, 'doDatabase(): result: %s' %(result) ) 397 return result 398 399 def getJobNodeId( self, job_id, node_id ): 400 401 id = self.getDatabase( "SELECT job_id,node_id FROM job_nodes WHERE job_id = '%s' AND node_id = '%s'" %(job_id, node_id) ) 402 if len( id ) > 0: 403 404 if len( id[0] ) > 0 and id[0] != '': 405 406 return 1 407 408 return 0 409 410 def getNodeId( self, hostname ): 411 412 id = self.getDatabase( "SELECT node_id FROM nodes WHERE node_hostname = '%s'" %hostname ) 413 414 if len( id ) > 0: 415 416 id = id[0][0] 417 418 return id 419 else: 420 return None 421 422 def getNodeIds( self, hostnames ): 423 424 ids = [ ] 425 426 for node in hostnames: 427 428 id = self.getNodeId( node ) 429 430 if id: 431 ids.append( id ) 432 433 return ids 434 435 def getJobId( self, jobid ): 436 437 id = self.getDatabase( "SELECT job_id FROM jobs WHERE job_id = '%s'" %jobid ) 438 439 if id: 440 id = id[0][0] 441 442 return id 443 else: 444 return None 445 446 def addJob( self, job_id, jobattrs ): 447 448 if not self.getJobId( job_id ): 449 450 self.mutateJob( 'insert', job_id, jobattrs ) 451 else: 452 self.mutateJob( 'update', job_id, jobattrs ) 453 454 def mutateJob( self, action, job_id, jobattrs ): 455 456 job_values = [ 'name', 'queue', 'owner', 'requested_time', 'requested_memory', 'ppn', 'status', 'start_timestamp', 'stop_timestamp' ] 457 458 insert_col_str = 'job_id' 459 insert_val_str = "'%s'" %job_id 460 update_str = None 461 462 debug_msg( 10, 'mutateJob(): %s %s' %(action,job_id)) 463 464 ids = [ ] 465 466 for valname, value in jobattrs.items(): 467 468 if valname in job_values and value != '': 469 470 column_name = 'job_' + valname 471 472 if action == 'insert': 473 474 if not insert_col_str: 475 insert_col_str = column_name 476 else: 477 insert_col_str = insert_col_str + ',' + column_name 478 479 if not insert_val_str: 480 insert_val_str = value 481 else: 482 insert_val_str = insert_val_str + ",'%s'" %value 483 484 elif action == 'update': 485 486 if not update_str: 487 update_str = "%s='%s'" %(column_name, value) 488 else: 489 update_str = update_str + ",%s='%s'" %(column_name, value) 490 491 elif valname == 'nodes' and value: 492 493 node_valid = 1 494 495 if len(value) == 1: 496 497 if jobattrs['status'] == 'Q': 498 499 node_valid = 0 500 501 else: 502 503 node_valid = 0 504 505 for node_char in str(value[0]): 506 507 if string.find( string.digits, node_char ) != -1 and not node_valid: 508 509 node_valid = 1 510 511 if node_valid: 512 513 ids = self.addNodes( value, jobattrs['domain'] ) 514 515 if action == 'insert': 516 517 self.setDatabase( "INSERT INTO jobs ( %s ) VALUES ( %s )" %( insert_col_str, insert_val_str ) ) 518 519 elif action == 'update': 520 521 self.setDatabase( "UPDATE jobs SET %s WHERE job_id=%s" %(update_str, job_id) ) 522 523 if len( ids ) > 0: 524 self.addJobNodes( job_id, ids ) 525 526 def addNodes( self, hostnames, domain ): 527 528 ids = [ ] 529 530 for node in hostnames: 531 532 node = '%s.%s' %( node, domain ) 533 id = self.getNodeId( node ) 534 535 if not id: 536 self.setDatabase( "INSERT INTO nodes ( node_hostname ) VALUES ( '%s' )" %node ) 537 id = self.getNodeId( node ) 538 539 ids.append( id ) 540 541 return ids 542 543 def addJobNodes( self, jobid, nodes ): 544 545 for node in nodes: 546 547 if not self.getJobNodeId( jobid, node ): 548 549 self.addJobNode( jobid, node ) 550 551 def addJobNode( self, jobid, nodeid ): 552 553 self.setDatabase( "INSERT INTO job_nodes (job_id,node_id) VALUES ( %s,%s )" %(jobid, nodeid) ) 554 555 def storeJobInfo( self, jobid, jobattrs ): 556 557 self.addJob( jobid, jobattrs ) 558 559 def checkStaleJobs( self ): 560 561 # Locate all jobs in the database that are not set to finished 562 # 563 q = "SELECT * from jobs WHERE job_status != 'F'" 564 565 r = self.getDatabase( q ) 566 567 if len( r ) == 0: 568 569 return None 570 571 cleanjobs = [ ] 572 timeoutjobs = [ ] 573 574 jobtimeout_sec = JOB_TIMEOUT * (60 * 60) 575 cur_time = time.time() 576 577 for row in r: 578 579 job_id = row[0] 580 job_requested_time = row[4] 581 job_status = row[7] 582 job_start_timestamp = row[8] 583 584 # If it was set to queued and we didn't see it started 585 # there's not point in keeping it around 586 # 587 if job_status == 'Q' or not job_start_timestamp: 588 589 cleanjobs.append( job_id ) 590 591 else: 592 593 start_timestamp = int( job_start_timestamp ) 594 595 # If it was set to running longer than JOB_TIMEOUT 596 # close the job: it probably finished while we were not running 597 # 598 if ( cur_time - start_timestamp ) > jobtimeout_sec: 599 600 if job_requested_time: 601 602 rtime_epoch = reqtime2epoch( job_requested_time ) 603 else: 604 rtime_epoch = None 605 606 timeoutjobs.append( (job_id, job_start_timestamp, rtime_epoch) ) 607 608 debug_msg( 1, 'Found ' + str( len( cleanjobs ) ) + ' stale jobs in database: deleting entries' ) 609 610 # Purge these from database 611 # 612 for j in cleanjobs: 613 614 q = "DELETE FROM jobs WHERE job_id = '" + str( j ) + "'" 615 self.setDatabase( q ) 616 617 debug_msg( 1, 'Found ' + str( len( timeoutjobs ) ) + ' timed out jobs in database: closing entries' ) 618 619 # Close these jobs in the database 620 # update the stop_timestamp to: start_timestamp + requested wallclock 621 # and set state: finished 622 # 623 for j in timeoutjobs: 624 625 ( i, s, r ) = j 626 627 if r: 628 new_end_timestamp = int( s ) + r 629 630 q = "UPDATE jobs SET job_status='F',job_stop_timestamp = '" + str( new_end_timestamp ) + "' WHERE job_id = '" + str(i) + "'" 631 self.setDatabase( q ) 632 632 633 633 class RRDMutator: 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 rrd_header= { }678 679 680 rrd_header= rrdtool.info( filename )681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 my_pipe= os.popen( self.binary + ' info "' + filename + '"' )696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 634 """A class for performing RRD mutations""" 635 636 binary = None 637 638 def __init__( self, binary=None ): 639 """Set alternate binary if supplied""" 640 641 if binary: 642 self.binary = binary 643 644 def create( self, filename, args ): 645 """Create a new rrd with args""" 646 647 global MODRRDTOOL 648 649 if MODRRDTOOL: 650 return self.perform( 'create', filename, args ) 651 else: 652 return self.perform( 'create', '"' + filename + '"', args ) 653 654 def update( self, filename, args ): 655 """Update a rrd with args""" 656 657 global MODRRDTOOL 658 659 if MODRRDTOOL: 660 return self.perform( 'update', filename, args ) 661 else: 662 return self.perform( 'update', '"' + filename + '"', args ) 663 664 def grabLastUpdate( self, filename ): 665 """Determine the last update time of filename rrd""" 666 667 global MODRRDTOOL 668 669 last_update = 0 670 671 # Use the py-rrdtool module if it's available on this system 672 # 673 if MODRRDTOOL: 674 675 debug_msg( 8, 'rrdtool.info( ' + filename + ' )' ) 676 677 rrd_header = { } 678 679 try: 680 rrd_header = rrdtool.info( filename ) 681 except rrdtool.error, msg: 682 debug_msg( 8, str( msg ) ) 683 684 if rrd_header.has_key( 'last_update' ): 685 return last_update 686 else: 687 return 0 688 689 # For backwards compatiblity: use the rrdtool binary if py-rrdtool is unavailable 690 # DEPRECATED (slow!) 691 # 692 else: 693 debug_msg( 8, self.binary + ' info ' + filename ) 694 695 my_pipe = os.popen( self.binary + ' info "' + filename + '"' ) 696 697 for line in my_pipe.readlines(): 698 699 if line.find( 'last_update') != -1: 700 701 last_update = line.split( ' = ' )[1] 702 703 if my_pipe: 704 705 my_pipe.close() 706 707 if last_update: 708 return last_update 709 else: 710 return 0 711 712 713 def perform( self, action, filename, args ): 714 """Perform action on rrd filename with args""" 715 716 global MODRRDTOOL 717 718 arg_string = None 719 720 if type( args ) is not ListType: 721 debug_msg( 8, 'Arguments needs to be of type List' ) 722 return 1 723 724 for arg in args: 725 726 if not arg_string: 727 728 arg_string = arg 729 else: 730 arg_string = arg_string + ' ' + arg 731 732 if MODRRDTOOL: 733 734 debug_msg( 8, 'rrdtool.' + action + "( " + filename + ' ' + arg_string + ")" ) 735 736 try: 737 debug_msg( 8, "filename '" + str(filename) + "' type "+ str(type(filename)) + " args " + str( args ) ) 738 739 if action == 'create': 740 741 rrdtool.create( str( filename ), *args ) 742 743 elif action == 'update': 744 745 rrdtool.update( str( filename ), *args ) 746 747 except rrdtool.error, msg: 748 749 error_msg = str( msg ) 750 debug_msg( 8, error_msg ) 751 return 1 752 753 else: 754 755 debug_msg( 8, self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 756 757 cmd = os.popen( self.binary + ' ' + action + ' ' + filename + ' ' + arg_string ) 758 lines = cmd.readlines() 759 760 cmd.close() 761 762 for line in lines: 763 764 if line.find( 'ERROR' ) != -1: 765 766 error_msg = string.join( line.split( ' ' )[1:] ) 767 debug_msg( 8, error_msg ) 768 return 1 769 770 return 0 771 771 772 772 class XMLProcessor: 773 774 775 776 777 778 773 """Skeleton class for XML processor's""" 774 775 def run( self ): 776 """Do main processing of XML here""" 777 778 pass 779 779 780 780 class TorqueXMLProcessor( XMLProcessor ): 781 782 783 784 785 786 self.myXMLSource= XMLSource787 self.myXMLHandler= TorqueXMLHandler( DataStore )788 self.myXMLError= XMLErrorHandler()789 790 self.config= GangliaConfigParser( GMETAD_CONF )791 792 793 794 795 796 797 798 799 800 801 802 my_data= self.myXMLSource.getData()803 804 805 806 807 808 809 810 811 812 813 814 781 """Main class for processing XML and acting with it""" 782 783 def __init__( self, XMLSource, DataStore ): 784 """Setup initial XML connection and handlers""" 785 786 self.myXMLSource = XMLSource 787 self.myXMLHandler = TorqueXMLHandler( DataStore ) 788 self.myXMLError = XMLErrorHandler() 789 790 self.config = GangliaConfigParser( GMETAD_CONF ) 791 792 def run( self ): 793 """Main XML processing""" 794 795 debug_msg( 1, 'torque_xml_thread(): started.' ) 796 797 while( 1 ): 798 799 #self.myXMLSource = self.mXMLGatherer.getFileObject() 800 debug_msg( 1, 'torque_xml_thread(): Retrieving XML data..' ) 801 802 my_data = self.myXMLSource.getData() 803 804 debug_msg( 1, 'torque_xml_thread(): Done retrieving.' ) 805 806 if my_data: 807 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 808 809 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 810 811 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 812 813 debug_msg( 1, 'torque_xml_thread(): Sleeping.. (%ss)' %(str( self.config.getLowestInterval() ) ) ) 814 time.sleep( self.config.getLowestInterval() ) 815 815 816 816 class TorqueXMLHandler( xml.sax.handler.ContentHandler ): 817 818 819 820 821 822 823 #self.ds= DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )824 self.ds= datastore825 self.jobs_processed= [ ]826 self.jobs_to_store= [ ]827 828 829 830 self.heartbeat= 0831 self.elementct= 0832 833 834 835 836 837 838 839 840 841 842 self.elementct+= 1843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 job_id= metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0]858 val= str( attrs.get( 'VAL', "" ) )859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 valname= myval.split( '=' )[0]877 value= myval.split( '=' )[1]878 879 880 881 882 883 884 885 886 self.jobAttrs[ job_id ]['stop_timestamp']= ''887 self.jobAttrs[ job_id ]= self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo )888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 self.jobs_processed= [ ]937 self.jobs_to_store= [ ]938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 817 """Parse Torque's jobinfo XML from our plugin""" 818 819 jobAttrs = { } 820 821 def __init__( self, datastore ): 822 823 #self.ds = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 824 self.ds = datastore 825 self.jobs_processed = [ ] 826 self.jobs_to_store = [ ] 827 828 def startDocument( self ): 829 830 self.heartbeat = 0 831 self.elementct = 0 832 833 def startElement( self, name, attrs ): 834 """ 835 This XML will be all gmetric XML 836 so there will be no specific start/end element 837 just one XML statement with all info 838 """ 839 840 jobinfo = { } 841 842 self.elementct += 1 843 844 if name == 'CLUSTER': 845 846 self.clustername = str( attrs.get( 'NAME', "" ) ) 847 848 elif name == 'METRIC' and self.clustername in ARCHIVE_DATASOURCES: 849 850 metricname = str( attrs.get( 'NAME', "" ) ) 851 852 if metricname == 'MONARCH-HEARTBEAT': 853 self.heartbeat = str( attrs.get( 'VAL', "" ) ) 854 855 elif metricname.find( 'MONARCH-JOB' ) != -1: 856 857 job_id = metricname.split( 'MONARCH-JOB-' )[1].split( '-' )[0] 858 val = str( attrs.get( 'VAL', "" ) ) 859 860 if not job_id in self.jobs_processed: 861 862 self.jobs_processed.append( job_id ) 863 864 check_change = 0 865 866 if self.jobAttrs.has_key( job_id ): 867 868 check_change = 1 869 870 valinfo = val.split( ' ' ) 871 872 for myval in valinfo: 873 874 if len( myval.split( '=' ) ) > 1: 875 876 valname = myval.split( '=' )[0] 877 value = myval.split( '=' )[1] 878 879 if valname == 'nodes': 880 value = value.split( ';' ) 881 882 jobinfo[ valname ] = value 883 884 if check_change: 885 if self.jobinfoChanged( self.jobAttrs, job_id, jobinfo ) and self.jobAttrs[ job_id ]['status'] in [ 'R', 'Q' ]: 886 self.jobAttrs[ job_id ]['stop_timestamp'] = '' 887 self.jobAttrs[ job_id ] = self.setJobAttrs( self.jobAttrs[ job_id ], jobinfo ) 888 if not job_id in self.jobs_to_store: 889 self.jobs_to_store.append( job_id ) 890 891 debug_msg( 10, 'jobinfo for job %s has changed' %job_id ) 892 else: 893 self.jobAttrs[ job_id ] = jobinfo 894 895 if not job_id in self.jobs_to_store: 896 self.jobs_to_store.append( job_id ) 897 898 debug_msg( 10, 'jobinfo for job %s has changed' %job_id ) 899 900 def endDocument( self ): 901 """When all metrics have gone, check if any jobs have finished""" 902 903 debug_msg( 1, "XML: Processed "+str(self.elementct)+ " elements - found "+str(len(self.jobs_to_store))+" (updated) jobs" ) 904 905 if self.heartbeat: 906 for jobid, jobinfo in self.jobAttrs.items(): 907 908 # This is an old job, not in current jobinfo list anymore 909 # it must have finished, since we _did_ get a new heartbeat 910 # 911 mytime = int( jobinfo['reported'] ) + int( jobinfo['poll_interval'] ) 912 913 if (mytime < self.heartbeat) and (jobid not in self.jobs_processed) and (jobinfo['status'] == 'R'): 914 915 if not jobid in self.jobs_processed: 916 self.jobs_processed.append( jobid ) 917 918 self.jobAttrs[ jobid ]['status'] = 'F' 919 self.jobAttrs[ jobid ]['stop_timestamp'] = str( self.heartbeat ) 920 921 if not jobid in self.jobs_to_store: 922 self.jobs_to_store.append( jobid ) 923 924 debug_msg( 1, 'torque_xml_thread(): Storing..' ) 925 926 for jobid in self.jobs_to_store: 927 if self.jobAttrs[ jobid ]['status'] in [ 'R', 'F' ]: 928 929 self.ds.storeJobInfo( jobid, self.jobAttrs[ jobid ] ) 930 931 if self.jobAttrs[ jobid ]['status'] == 'F': 932 del self.jobAttrs[ jobid ] 933 934 debug_msg( 1, 'torque_xml_thread(): Done storing.' ) 935 936 self.jobs_processed = [ ] 937 self.jobs_to_store = [ ] 938 939 def setJobAttrs( self, old, new ): 940 """ 941 Set new job attributes in old, but not lose existing fields 942 if old attributes doesn't have those 943 """ 944 945 for valname, value in new.items(): 946 old[ valname ] = value 947 948 return old 949 950 951 def jobinfoChanged( self, jobattrs, jobid, jobinfo ): 952 """ 953 Check if jobinfo has changed from jobattrs[jobid] 954 if it's report time is bigger than previous one 955 and it is report time is recent (equal to heartbeat) 956 """ 957 958 ignore_changes = [ 'reported' ] 959 960 if jobattrs.has_key( jobid ): 961 962 for valname, value in jobinfo.items(): 963 964 if valname not in ignore_changes: 965 966 if jobattrs[ jobid ].has_key( valname ): 967 968 if value != jobattrs[ jobid ][ valname ]: 969 970 if jobinfo['reported'] > jobattrs[ jobid ][ 'reported' ] and jobinfo['reported'] == self.heartbeat: 971 return True 972 973 else: 974 return True 975 976 return False 977 977 978 978 class GangliaXMLHandler( xml.sax.handler.ContentHandler ): 979 980 981 982 983 984 self.config= config985 self.clusters= { }986 self.ds= datastore987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 archive_dir= check_dir(ARCHIVE_PATH)1004 1005 hosts= [ ]1006 1007 1008 1009 dirlist= os.listdir( archive_dir )1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 self.XMLSource= str( attrs.get( 'SOURCE', "" ) )1040 self.gangliaVersion= str( attrs.get( 'VERSION', "" ) )1041 1042 1043 1044 1045 1046 self.gridName= str( attrs.get( 'NAME', "" ) )1047 self.time= str( attrs.get( 'LOCALTIME', "" ) )1048 1049 1050 1051 1052 1053 self.clusterName= str( attrs.get( 'NAME', "" ) )1054 self.time= str( attrs.get( 'LOCALTIME', "" ) )1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 self.hostName= str( attrs.get( 'NAME', "" ) )1065 self.hostIp= str( attrs.get( 'IP', "" ) )1066 self.hostReported= str( attrs.get( 'REPORTED', "" ) )1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 myMetric= { }1091 myMetric['name']= str( attrs.get( 'NAME', "" ) )1092 myMetric['val']= str( attrs.get( 'VAL', "" ) )1093 myMetric['time']= self.hostReported1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 979 """Parse Ganglia's XML""" 980 981 def __init__( self, config, datastore ): 982 """Setup initial variables and gather info on existing rrd archive""" 983 984 self.config = config 985 self.clusters = { } 986 self.ds = datastore 987 988 debug_msg( 1, 'Checking database..' ) 989 990 global DEBUG_LEVEL 991 992 if DEBUG_LEVEL <= 2: 993 self.ds.checkStaleJobs() 994 995 debug_msg( 1, 'Check done.' ) 996 debug_msg( 1, 'Checking rrd archive..' ) 997 self.gatherClusters() 998 debug_msg( 1, 'Check done.' ) 999 1000 def gatherClusters( self ): 1001 """Find all existing clusters in archive dir""" 1002 1003 archive_dir = check_dir(ARCHIVE_PATH) 1004 1005 hosts = [ ] 1006 1007 if os.path.exists( archive_dir ): 1008 1009 dirlist = os.listdir( archive_dir ) 1010 1011 for cfgcluster in ARCHIVE_DATASOURCES: 1012 1013 if cfgcluster not in dirlist: 1014 1015 # Autocreate a directory for this cluster 1016 # assume it is new 1017 # 1018 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), cfgcluster ) 1019 1020 os.mkdir( cluster_dir ) 1021 1022 dirlist.append( cfgcluster ) 1023 1024 for item in dirlist: 1025 1026 clustername = item 1027 1028 if not self.clusters.has_key( clustername ) and clustername in ARCHIVE_DATASOURCES: 1029 1030 self.clusters[ clustername ] = RRDHandler( self.config, clustername ) 1031 1032 debug_msg( 9, "Found "+str(len(self.clusters.keys()))+" clusters" ) 1033 1034 def startElement( self, name, attrs ): 1035 """Memorize appropriate data from xml start tags""" 1036 1037 if name == 'GANGLIA_XML': 1038 1039 self.XMLSource = str( attrs.get( 'SOURCE', "" ) ) 1040 self.gangliaVersion = str( attrs.get( 'VERSION', "" ) ) 1041 1042 debug_msg( 10, 'Found XML data: source %s version %s' %( self.XMLSource, self.gangliaVersion ) ) 1043 1044 elif name == 'GRID': 1045 1046 self.gridName = str( attrs.get( 'NAME', "" ) ) 1047 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1048 1049 debug_msg( 10, '`-Grid found: %s' %( self.gridName ) ) 1050 1051 elif name == 'CLUSTER': 1052 1053 self.clusterName = str( attrs.get( 'NAME', "" ) ) 1054 self.time = str( attrs.get( 'LOCALTIME', "" ) ) 1055 1056 if not self.clusters.has_key( self.clusterName ) and self.clusterName in ARCHIVE_DATASOURCES: 1057 1058 self.clusters[ self.clusterName ] = RRDHandler( self.config, self.clusterName ) 1059 1060 debug_msg( 10, ' |-Cluster found: %s' %( self.clusterName ) ) 1061 1062 elif name == 'HOST' and self.clusterName in ARCHIVE_DATASOURCES: 1063 1064 self.hostName = str( attrs.get( 'NAME', "" ) ) 1065 self.hostIp = str( attrs.get( 'IP', "" ) ) 1066 self.hostReported = str( attrs.get( 'REPORTED', "" ) ) 1067 1068 debug_msg( 10, ' | |-Host found: %s - ip %s reported %s' %( self.hostName, self.hostIp, self.hostReported ) ) 1069 1070 elif name == 'METRIC' and self.clusterName in ARCHIVE_DATASOURCES: 1071 1072 type = str( attrs.get( 'TYPE', "" ) ) 1073 1074 exclude_metric = False 1075 1076 for ex_metricstr in ARCHIVE_EXCLUDE_METRICS: 1077 1078 orig_name = str( attrs.get( 'NAME', "" ) ) 1079 1080 if string.lower( orig_name ) == string.lower( ex_metricstr ): 1081 1082 exclude_metric = True 1083 1084 elif re.match( ex_metricstr, orig_name ): 1085 1086 exclude_metric = True 1087 1088 if type not in UNSUPPORTED_ARCHIVE_TYPES and not exclude_metric: 1089 1090 myMetric = { } 1091 myMetric['name'] = str( attrs.get( 'NAME', "" ) ) 1092 myMetric['val'] = str( attrs.get( 'VAL', "" ) ) 1093 myMetric['time'] = self.hostReported 1094 1095 self.clusters[ self.clusterName ].memMetric( self.hostName, myMetric ) 1096 1097 debug_msg( 11, ' | | |-metric: %s:%s' %( myMetric['name'], myMetric['val'] ) ) 1098 1099 def storeMetrics( self ): 1100 """Store metrics of each cluster rrd handler""" 1101 1102 for clustername, rrdh in self.clusters.items(): 1103 1104 ret = rrdh.storeMetrics() 1105 1106 if ret: 1107 debug_msg( 9, 'An error occured while storing metrics for cluster %s' %clustername ) 1108 return 1 1109 1110 return 0 1111 1111 1112 1112 class XMLErrorHandler( xml.sax.handler.ErrorHandler ): 1113 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1114 def error( self, exception ): 1115 """Recoverable error""" 1116 1117 debug_msg( 0, 'Recoverable XML error ' + str( exception ) + ' ignored.' ) 1118 1119 def fatalError( self, exception ): 1120 """Non-recoverable error""" 1121 1122 exception_str = str( exception ) 1123 1124 # Ignore 'no element found' errors 1125 if exception_str.find( 'no element found' ) != -1: 1126 debug_msg( 0, 'No XML data found: Socket not (re)connected or datasource not available.' ) 1127 return 0 1128 1129 debug_msg( 0, 'FATAL ERROR: Non-recoverable XML error ' + str( exception ) ) 1130 sys.exit( 1 ) 1131 1132 def warning( self, exception ): 1133 """Warning""" 1134 1135 debug_msg( 0, 'Warning ' + str( exception ) ) 1136 1136 1137 1137 class XMLGatherer: 1138 1139 1140 s= None1141 fd= None1142 data= None1143 slot= None1144 1145 1146 1147 LAST_UPDATE= 01148 1149 1150 1151 MIN_UPDATE_INT= 101152 1153 1154 1155 update_now= False1156 1157 1158 1159 1160 self.host= host1161 self.port= port1162 1163 1164 1165 1166 1167 1168 1169 self.update_now= True1170 1171 1172 1173 self.data= None1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 self.update_now= False1203 1204 1205 1206 1207 1208 my_fp= self.s.makefile( 'r' )1209 my_data= my_fp.readlines()1210 my_data= string.join( my_data, '' )1211 1212 self.data= my_data1213 1214 self.LAST_UPDATE= time.time()1215 1216 1217 1218 self.update_now= False1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 cur_time= time.time()1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1138 """Setup a connection and file object to Ganglia's XML""" 1139 1140 s = None 1141 fd = None 1142 data = None 1143 slot = None 1144 1145 # Time since the last update 1146 # 1147 LAST_UPDATE = 0 1148 1149 # Minimum interval between updates 1150 # 1151 MIN_UPDATE_INT = 10 1152 1153 # Is a update occuring now 1154 # 1155 update_now = False 1156 1157 def __init__( self, host, port ): 1158 """Store host and port for connection""" 1159 1160 self.host = host 1161 self.port = port 1162 self.slot = threading.Lock() 1163 1164 self.retrieveData() 1165 1166 def retrieveData( self ): 1167 """Setup connection to XML source""" 1168 1169 self.update_now = True 1170 1171 self.slot.acquire() 1172 1173 self.data = None 1174 1175 for res in socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM ): 1176 1177 af, socktype, proto, canonname, sa = res 1178 1179 try: 1180 1181 self.s = socket.socket( af, socktype, proto ) 1182 1183 except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg: 1184 1185 self.s = None 1186 continue 1187 1188 try: 1189 1190 self.s.connect( sa ) 1191 1192 except ( socket.error, socket.gaierror, socket.herror, socket.timeout ), msg: 1193 1194 self.disconnect() 1195 continue 1196 1197 break 1198 1199 if self.s is None: 1200 1201 debug_msg( 0, 'FATAL ERROR: Could not open socket or unable to connect to datasource!' ) 1202 self.update_now = False 1203 #sys.exit( 1 ) 1204 1205 else: 1206 #self.s.send( '\n' ) 1207 1208 my_fp = self.s.makefile( 'r' ) 1209 my_data = my_fp.readlines() 1210 my_data = string.join( my_data, '' ) 1211 1212 self.data = my_data 1213 1214 self.LAST_UPDATE = time.time() 1215 1216 self.slot.release() 1217 1218 self.update_now = False 1219 1220 def disconnect( self ): 1221 """Close socket""" 1222 1223 if self.s: 1224 #self.s.shutdown( 2 ) 1225 self.s.close() 1226 self.s = None 1227 1228 def __del__( self ): 1229 """Kill the socket before we leave""" 1230 1231 self.disconnect() 1232 1233 def reGetData( self ): 1234 """Reconnect""" 1235 1236 while self.update_now: 1237 1238 # Must be another update in progress: 1239 # Wait until the update is complete 1240 # 1241 time.sleep( 1 ) 1242 1243 if self.s: 1244 self.disconnect() 1245 1246 self.retrieveData() 1247 1248 def getData( self ): 1249 1250 """Return the XML data""" 1251 1252 # If more than MIN_UPDATE_INT seconds passed since last data update 1253 # update the XML first before returning it 1254 # 1255 1256 cur_time = time.time() 1257 1258 if ( cur_time - self.LAST_UPDATE ) > self.MIN_UPDATE_INT: 1259 1260 self.reGetData() 1261 1262 while self.update_now: 1263 1264 # Must be another update in progress: 1265 # Wait until the update is complete 1266 # 1267 time.sleep( 1 ) 1268 1269 return self.data 1270 1271 def makeFileDescriptor( self ): 1272 """Make file descriptor that points to our socket connection""" 1273 1274 self.reconnect() 1275 1276 if self.s: 1277 self.fd = self.s.makefile( 'r' ) 1278 1279 def getFileObject( self ): 1280 """Connect, and return a file object""" 1281 1282 self.makeFileDescriptor() 1283 1284 if self.fd: 1285 return self.fd 1286 1286 1287 1287 class GangliaXMLProcessor( XMLProcessor ): 1288 1289 1290 1291 1292 1293 self.config= GangliaConfigParser( GMETAD_CONF )1294 1295 #self.myXMLGatherer= XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )1296 #self.myXMLSource= self.myXMLGatherer.getFileObject()1297 self.myXMLSource= XMLSource1298 self.ds= DataStore1299 self.myXMLHandler= GangliaXMLHandler( self.config, self.ds )1300 self.myXMLError= XMLErrorHandler()1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 time.sleep( 1 ) 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 time.sleep( float( self.config.getLowestInterval() ) ) 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 my_data= self.myXMLSource.getData()1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1288 """Main class for processing XML and acting with it""" 1289 1290 def __init__( self, XMLSource, DataStore ): 1291 """Setup initial XML connection and handlers""" 1292 1293 self.config = GangliaConfigParser( GMETAD_CONF ) 1294 1295 #self.myXMLGatherer = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 1296 #self.myXMLSource = self.myXMLGatherer.getFileObject() 1297 self.myXMLSource = XMLSource 1298 self.ds = DataStore 1299 self.myXMLHandler = GangliaXMLHandler( self.config, self.ds ) 1300 self.myXMLError = XMLErrorHandler() 1301 1302 def run( self ): 1303 """Main XML processing; start a xml and storethread""" 1304 1305 xml_thread = threading.Thread( None, self.processXML, 'xmlthread' ) 1306 store_thread = threading.Thread( None, self.storeMetrics, 'storethread' ) 1307 1308 while( 1 ): 1309 1310 if not xml_thread.isAlive(): 1311 # Gather XML at the same interval as gmetad 1312 1313 # threaded call to: self.processXML() 1314 # 1315 try: 1316 xml_thread = threading.Thread( None, self.processXML, 'xml_thread' ) 1317 xml_thread.start() 1318 except thread.error, msg: 1319 debug_msg( 0, 'ERROR: Unable to start xml_thread!: '+str(msg)) 1320 #return 1 1321 1322 if not store_thread.isAlive(): 1323 # Store metrics every .. sec 1324 1325 # threaded call to: self.storeMetrics() 1326 # 1327 try: 1328 store_thread = threading.Thread( None, self.storeMetrics, 'store_thread' ) 1329 store_thread.start() 1330 except thread.error, msg: 1331 debug_msg( 0, 'ERROR: Unable to start store_thread!: '+str(msg)) 1332 #return 1 1333 1334 # Just sleep a sec here, to prevent daemon from going mad. We're all threads here anyway 1335 time.sleep( 1 ) 1336 1337 def storeMetrics( self ): 1338 """Store metrics retained in memory to disk""" 1339 1340 global DEBUG_LEVEL 1341 1342 # Store metrics somewhere between every 360 and 640 seconds 1343 # 1344 if DEBUG_LEVEL > 2: 1345 #STORE_INTERVAL = 60 1346 STORE_INTERVAL = random.randint( 360, 640 ) 1347 else: 1348 STORE_INTERVAL = random.randint( 360, 640 ) 1349 1350 try: 1351 store_metric_thread = threading.Thread( None, self.storeThread, 'store_metric_thread' ) 1352 store_metric_thread.start() 1353 except thread.error, msg: 1354 debug_msg( 0, 'ERROR: Unable to start ganglia_store_thread()!: '+str(msg) ) 1355 return 1 1356 1357 debug_msg( 1, 'ganglia_store_thread(): started.' ) 1358 1359 debug_msg( 1, 'ganglia_store_thread(): Sleeping.. (%ss)' %STORE_INTERVAL ) 1360 time.sleep( STORE_INTERVAL ) 1361 debug_msg( 1, 'ganglia_store_thread(): Done sleeping.' ) 1362 1363 if store_metric_thread.isAlive(): 1364 1365 debug_msg( 1, 'ganglia_store_thread(): storemetricthread() still running, waiting to finish..' ) 1366 store_metric_thread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 1367 debug_msg( 1, 'ganglia_store_thread(): Done waiting.' ) 1368 1369 debug_msg( 1, 'ganglia_store_thread(): finished.' ) 1370 1371 return 0 1372 1373 def storeThread( self ): 1374 """Actual metric storing thread""" 1375 1376 debug_msg( 1, 'ganglia_store_metric_thread(): started.' ) 1377 debug_msg( 1, 'ganglia_store_metric_thread(): Storing data..' ) 1378 ret = self.myXMLHandler.storeMetrics() 1379 if ret > 0: 1380 debug_msg( 0, 'ganglia_store_metric_thread(): UNKNOWN ERROR %s while storing Metrics!' %str(ret) ) 1381 debug_msg( 1, 'ganglia_store_metric_thread(): Done storing.' ) 1382 debug_msg( 1, 'ganglia_store_metric_thread(): finished.' ) 1383 1384 return 0 1385 1386 def processXML( self ): 1387 """Process XML""" 1388 1389 try: 1390 parsethread = threading.Thread( None, self.parseThread, 'parsethread' ) 1391 parsethread.start() 1392 except thread.error, msg: 1393 debug_msg( 0, 'ERROR: Unable to start ganglia_xml_thread()!: ' + str(msg) ) 1394 return 1 1395 1396 debug_msg( 1, 'ganglia_xml_thread(): started.' ) 1397 1398 debug_msg( 1, 'ganglia_xml_thread(): Sleeping.. (%ss)' %self.config.getLowestInterval() ) 1399 time.sleep( float( self.config.getLowestInterval() ) ) 1400 debug_msg( 1, 'ganglia_xml_thread(): Done sleeping.' ) 1401 1402 if parsethread.isAlive(): 1403 1404 debug_msg( 1, 'ganglia_xml_thread(): parsethread() still running, waiting (%ss) to finish..' %PARSE_TIMEOUT ) 1405 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 1406 debug_msg( 1, 'ganglia_xml_thread(): Done waiting.' ) 1407 1408 debug_msg( 1, 'ganglia_xml_thread(): finished.' ) 1409 1410 return 0 1411 1412 def parseThread( self ): 1413 """Actual parsing thread""" 1414 1415 debug_msg( 1, 'ganglia_parse_thread(): started.' ) 1416 debug_msg( 1, 'ganglia_parse_thread(): Retrieving XML data..' ) 1417 1418 my_data = self.myXMLSource.getData() 1419 1420 debug_msg( 1, 'ganglia_parse_thread(): Done retrieving.' ) 1421 1422 if my_data: 1423 debug_msg( 1, 'ganglia_parse_thread(): Parsing XML..' ) 1424 xml.sax.parseString( my_data, self.myXMLHandler, self.myXMLError ) 1425 debug_msg( 1, 'ganglia_parse_thread(): Done parsing.' ) 1426 1427 debug_msg( 1, 'ganglia_parse_thread(): finished.' ) 1428 1429 return 0 1430 1430 1431 1431 class GangliaConfigParser: 1432 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 source= { }1453 source['name']= line.split( '"' )[1]1454 source_words= line.split( '"' )[2].split( ' ' )1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 # No interval found, use Ganglia's default 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1433 sources = [ ] 1434 1435 def __init__( self, config ): 1436 """Parse some stuff from our gmetad's config, such as polling interval""" 1437 1438 self.config = config 1439 self.parseValues() 1440 1441 def parseValues( self ): 1442 """Parse certain values from gmetad.conf""" 1443 1444 readcfg = open( self.config, 'r' ) 1445 1446 for line in readcfg.readlines(): 1447 1448 if line.count( '"' ) > 1: 1449 1450 if line.find( 'data_source' ) != -1 and line[0] != '#': 1451 1452 source = { } 1453 source['name'] = line.split( '"' )[1] 1454 source_words = line.split( '"' )[2].split( ' ' ) 1455 1456 for word in source_words: 1457 1458 valid_interval = 1 1459 1460 for letter in word: 1461 1462 if letter not in string.digits: 1463 1464 valid_interval = 0 1465 1466 if valid_interval and len(word) > 0: 1467 1468 source['interval'] = word 1469 debug_msg( 9, 'polling interval for %s = %s' %(source['name'], source['interval'] ) ) 1470 1471 # No interval found, use Ganglia's default 1472 if not source.has_key( 'interval' ): 1473 source['interval'] = 15 1474 debug_msg( 9, 'polling interval for %s defaulted to 15' %(source['name']) ) 1475 1476 self.sources.append( source ) 1477 1478 def getInterval( self, source_name ): 1479 """Return interval for source_name""" 1480 1481 for source in self.sources: 1482 1483 if source['name'] == source_name: 1484 1485 return source['interval'] 1486 1487 return None 1488 1489 def getLowestInterval( self ): 1490 """Return the lowest interval of all clusters""" 1491 1492 lowest_interval = 0 1493 1494 for source in self.sources: 1495 1496 if not lowest_interval or source['interval'] <= lowest_interval: 1497 1498 lowest_interval = source['interval'] 1499 1500 # Return 15 when nothing is found, so that the daemon won't go insane with 0 sec delays 1501 if lowest_interval: 1502 return lowest_interval 1503 else: 1504 return 15 1505 1505 1506 1506 class RRDHandler: 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 self.block= 01520 self.cluster= cluster1521 self.config= config1522 self.slot= threading.Lock()1523 1524 1525 1526 self.rrdm= RRDMutator()1527 1528 self.rrdm= RRDMutator( RRDTOOL )1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 host_dir= cluster_dir + '/' + host1553 dirlist= os.listdir( host_dir )1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 self.myMetrics[ host ]= { }1610 self.myMetrics[ host ][ metric['name'] ]= [ ]1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 update_list= [ ]1628 metric= None1629 1630 1631 1632 1633 1634 1635 1636 u_val= str( metric['time'] ) + ':' + str( metric['val'] )1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 count_values= 01691 count_metrics= 01692 count_bits= 01693 1694 for hostname, mymetrics in self.myMetrics.items(): 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 count_bits+= len( dmetric['time'] )1705 count_bits+= len( dmetric['val'] )1706 1707 count_bytes= count_bits / 81708 1709 1710 1711 1712 1713 1714 for hostname, mymetrics in self.myMetrics.items(): 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 create_ret = self.createCheck( hostname, metricname, period ) 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 rrd_dir = '%s/%s/%s/%s'%( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial )1786 rrd_file = '%s/%s.rrd'%( rrd_dir, metricname )1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 period = self.determinePeriod( host, metric['time'] ) 1841 1842 archive_secs= ARCHIVE_HOURS_PER_RRD * (60 * 60)1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 interval= self.config.getInterval( self.cluster )1891 heartbeat= 8 * int( interval )1892 1893 params= [ ]1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 rrd_dir, rrd_file= self.makeRrdPath( host, metricname, timeserial )1917 1918 update_list= self.makeUpdateList( host, metriclist )1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1507 """Class for handling RRD activity""" 1508 1509 myMetrics = { } 1510 lastStored = { } 1511 timeserials = { } 1512 slot = None 1513 1514 def __init__( self, config, cluster ): 1515 """Setup initial variables""" 1516 1517 global MODRRDTOOL 1518 1519 self.block = 0 1520 self.cluster = cluster 1521 self.config = config 1522 self.slot = threading.Lock() 1523 1524 if MODRRDTOOL: 1525 1526 self.rrdm = RRDMutator() 1527 else: 1528 self.rrdm = RRDMutator( RRDTOOL ) 1529 1530 global DEBUG_LEVEL 1531 1532 if DEBUG_LEVEL <= 2: 1533 self.gatherLastUpdates() 1534 1535 def gatherLastUpdates( self ): 1536 """Populate the lastStored list, containing timestamps of all last updates""" 1537 1538 cluster_dir = '%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster ) 1539 1540 hosts = [ ] 1541 1542 if os.path.exists( cluster_dir ): 1543 1544 dirlist = os.listdir( cluster_dir ) 1545 1546 for dir in dirlist: 1547 1548 hosts.append( dir ) 1549 1550 for host in hosts: 1551 1552 host_dir = cluster_dir + '/' + host 1553 dirlist = os.listdir( host_dir ) 1554 1555 for dir in dirlist: 1556 1557 if not self.timeserials.has_key( host ): 1558 1559 self.timeserials[ host ] = [ ] 1560 1561 self.timeserials[ host ].append( dir ) 1562 1563 last_serial = self.getLastRrdTimeSerial( host ) 1564 1565 if last_serial: 1566 1567 metric_dir = cluster_dir + '/' + host + '/' + last_serial 1568 1569 if os.path.exists( metric_dir ): 1570 1571 dirlist = os.listdir( metric_dir ) 1572 1573 for file in dirlist: 1574 1575 metricname = file.split( '.rrd' )[0] 1576 1577 if not self.lastStored.has_key( host ): 1578 1579 self.lastStored[ host ] = { } 1580 1581 self.lastStored[ host ][ metricname ] = self.rrdm.grabLastUpdate( metric_dir + '/' + file ) 1582 1583 def getClusterName( self ): 1584 """Return clustername""" 1585 1586 return self.cluster 1587 1588 def memMetric( self, host, metric ): 1589 """Store metric from host in memory""" 1590 1591 # <ATOMIC> 1592 # 1593 self.slot.acquire() 1594 1595 if self.myMetrics.has_key( host ): 1596 1597 if self.myMetrics[ host ].has_key( metric['name'] ): 1598 1599 for mymetric in self.myMetrics[ host ][ metric['name'] ]: 1600 1601 if mymetric['time'] == metric['time']: 1602 1603 # Allready have this metric, abort 1604 self.slot.release() 1605 return 1 1606 else: 1607 self.myMetrics[ host ][ metric['name'] ] = [ ] 1608 else: 1609 self.myMetrics[ host ] = { } 1610 self.myMetrics[ host ][ metric['name'] ] = [ ] 1611 1612 # Push new metric onto stack 1613 # atomic code; only 1 thread at a time may access the stack 1614 1615 self.myMetrics[ host ][ metric['name'] ].append( metric ) 1616 1617 self.slot.release() 1618 # 1619 # </ATOMIC> 1620 1621 def makeUpdateList( self, host, metriclist ): 1622 """ 1623 Make a list of update values for rrdupdate 1624 but only those that we didn't store before 1625 """ 1626 1627 update_list = [ ] 1628 metric = None 1629 1630 while len( metriclist ) > 0: 1631 1632 metric = metriclist.pop( 0 ) 1633 1634 if self.checkStoreMetric( host, metric ): 1635 1636 u_val = str( metric['time'] ) + ':' + str( metric['val'] ) 1637 #update_list.append( str('%s:%s') %( metric['time'], metric['val'] ) ) 1638 update_list.append( u_val ) 1639 1640 return update_list 1641 1642 def checkStoreMetric( self, host, metric ): 1643 """Check if supplied metric if newer than last one stored""" 1644 1645 if self.lastStored.has_key( host ): 1646 1647 if self.lastStored[ host ].has_key( metric['name'] ): 1648 1649 if metric['time'] <= self.lastStored[ host ][ metric['name'] ]: 1650 1651 # This is old 1652 return 0 1653 1654 return 1 1655 1656 def memLastUpdate( self, host, metricname, metriclist ): 1657 """ 1658 Memorize the time of the latest metric from metriclist 1659 but only if it wasn't allready memorized 1660 """ 1661 1662 if not self.lastStored.has_key( host ): 1663 self.lastStored[ host ] = { } 1664 1665 last_update_time = 0 1666 1667 for metric in metriclist: 1668 1669 if metric['name'] == metricname: 1670 1671 if metric['time'] > last_update_time: 1672 1673 last_update_time = metric['time'] 1674 1675 if self.lastStored[ host ].has_key( metricname ): 1676 1677 if last_update_time <= self.lastStored[ host ][ metricname ]: 1678 return 1 1679 1680 self.lastStored[ host ][ metricname ] = last_update_time 1681 1682 def storeMetrics( self ): 1683 """ 1684 Store all metrics from memory to disk 1685 and do it to the RRD's in appropriate timeperiod directory 1686 """ 1687 1688 debug_msg( 5, "Entering storeMetrics()") 1689 1690 count_values = 0 1691 count_metrics = 0 1692 count_bits = 0 1693 1694 for hostname, mymetrics in self.myMetrics.items(): 1695 1696 for metricname, mymetric in mymetrics.items(): 1697 1698 count_metrics += 1 1699 1700 for dmetric in mymetric: 1701 1702 count_values += 1 1703 1704 count_bits += len( dmetric['time'] ) 1705 count_bits += len( dmetric['val'] ) 1706 1707 count_bytes = count_bits / 8 1708 1709 debug_msg( 5, "size of cluster '" + self.cluster + "': " + 1710 str( len( self.myMetrics.keys() ) ) + " hosts " + 1711 str( count_metrics ) + " metrics " + str( count_values ) + " values " + 1712 str( count_bits ) + " bits " + str( count_bytes ) + " bytes " ) 1713 1714 for hostname, mymetrics in self.myMetrics.items(): 1715 1716 for metricname, mymetric in mymetrics.items(): 1717 1718 metrics_to_store = [ ] 1719 1720 # Pop metrics from stack for storing until none is left 1721 # atomic code: only 1 thread at a time may access myMetrics 1722 1723 # <ATOMIC> 1724 # 1725 self.slot.acquire() 1726 1727 while len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1728 1729 if len( self.myMetrics[ hostname ][ metricname ] ) > 0: 1730 1731 try: 1732 metrics_to_store.append( self.myMetrics[ hostname ][ metricname ].pop( 0 ) ) 1733 except IndexError, msg: 1734 1735 # Somehow sometimes myMetrics[ hostname ][ metricname ] 1736 # is still len 0 when the statement is executed. 1737 # Just ignore indexerror's.. 1738 pass 1739 1740 self.slot.release() 1741 # 1742 # </ATOMIC> 1743 1744 # Create a mapping table, each metric to the period where it should be stored 1745 # 1746 metric_serial_table = self.determineSerials( hostname, metricname, metrics_to_store ) 1747 1748 update_rets = [ ] 1749 1750 for period, pmetric in metric_serial_table.items(): 1751 1752 create_ret = self.createCheck( hostname, metricname, period ) 1753 1754 update_ret = self.update( hostname, metricname, period, pmetric ) 1755 1756 if update_ret == 0: 1757 1758 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) ) 1759 else: 1760 debug_msg( 9, 'metric update failed' ) 1761 1762 update_rets.append( create_ret ) 1763 update_rets.append( update_ret ) 1764 1765 # Lets ignore errors here for now, we need to make sure last update time 1766 # is correct! 1767 # 1768 #if not (1) in update_rets: 1769 1770 self.memLastUpdate( hostname, metricname, metrics_to_store ) 1771 1772 debug_msg( 5, "Leaving storeMetrics()") 1773 1774 def makeTimeSerial( self ): 1775 """Generate a time serial. Seconds since epoch""" 1776 1777 # Seconds since epoch 1778 mytime = int( time.time() ) 1779 1780 return mytime 1781 1782 def makeRrdPath( self, host, metricname, timeserial ): 1783 """Make a RRD location/path and filename""" 1784 1785 rrd_dir = '%s/%s/%s/%s' %( check_dir(ARCHIVE_PATH), self.cluster, host, timeserial ) 1786 rrd_file = '%s/%s.rrd' %( rrd_dir, metricname ) 1787 1788 return rrd_dir, rrd_file 1789 1790 def getLastRrdTimeSerial( self, host ): 1791 """Find the last timeserial (directory) for this host""" 1792 1793 newest_timeserial = 0 1794 1795 for dir in self.timeserials[ host ]: 1796 1797 valid_dir = 1 1798 1799 for letter in dir: 1800 if letter not in string.digits: 1801 valid_dir = 0 1802 1803 if valid_dir: 1804 timeserial = dir 1805 if timeserial > newest_timeserial: 1806 newest_timeserial = timeserial 1807 1808 if newest_timeserial: 1809 return newest_timeserial 1810 else: 1811 return 0 1812 1813 def determinePeriod( self, host, check_serial ): 1814 """Determine to which period (directory) this time(serial) belongs""" 1815 1816 period_serial = 0 1817 1818 if self.timeserials.has_key( host ): 1819 1820 for serial in self.timeserials[ host ]: 1821 1822 if check_serial >= serial and period_serial < serial: 1823 1824 period_serial = serial 1825 1826 return period_serial 1827 1828 def determineSerials( self, host, metricname, metriclist ): 1829 """ 1830 Determine the correct serial and corresponding rrd to store 1831 for a list of metrics 1832 """ 1833 1834 metric_serial_table = { } 1835 1836 for metric in metriclist: 1837 1838 if metric['name'] == metricname: 1839 1840 period = self.determinePeriod( host, metric['time'] ) 1841 1842 archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60) 1843 1844 if (int( metric['time'] ) - int( period ) ) > archive_secs: 1845 1846 # This one should get it's own new period 1847 period = metric['time'] 1848 1849 if not self.timeserials.has_key( host ): 1850 self.timeserials[ host ] = [ ] 1851 1852 self.timeserials[ host ].append( period ) 1853 1854 if not metric_serial_table.has_key( period ): 1855 1856 metric_serial_table[ period ] = [ ] 1857 1858 metric_serial_table[ period ].append( metric ) 1859 1860 return metric_serial_table 1861 1862 def createCheck( self, host, metricname, timeserial ): 1863 """Check if an rrd allready exists for this metric, create if not""" 1864 1865 debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) 1866 1867 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 1868 1869 if not os.path.exists( rrd_dir ): 1870 1871 try: 1872 os.makedirs( rrd_dir ) 1873 1874 except os.OSError, msg: 1875 1876 if msg.find( 'File exists' ) != -1: 1877 1878 # Ignore exists errors 1879 pass 1880 1881 else: 1882 1883 print msg 1884 return 1885 1886 debug_msg( 9, 'created dir %s' %( str(rrd_dir) ) ) 1887 1888 if not os.path.exists( rrd_file ): 1889 1890 interval = self.config.getInterval( self.cluster ) 1891 heartbeat = 8 * int( interval ) 1892 1893 params = [ ] 1894 1895 params.append( '--step' ) 1896 params.append( str( interval ) ) 1897 1898 params.append( '--start' ) 1899 params.append( str( int( timeserial ) - 1 ) ) 1900 1901 params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat ) 1902 params.append( 'RRA:AVERAGE:0.5:1:%s' %(ARCHIVE_HOURS_PER_RRD * 240) ) 1903 1904 self.rrdm.create( str(rrd_file), params ) 1905 1906 debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) ) 1907 1908 def update( self, host, metricname, timeserial, metriclist ): 1909 """ 1910 Update rrd file for host with metricname 1911 in directory timeserial with metriclist 1912 """ 1913 1914 debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) 1915 1916 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 1917 1918 update_list = self.makeUpdateList( host, metriclist ) 1919 1920 if len( update_list ) > 0: 1921 ret = self.rrdm.update( str(rrd_file), update_list ) 1922 1923 if ret: 1924 return 1 1925 1926 debug_msg( 9, 'updated rrd %s with %s' %( str(rrd_file), string.join( update_list ) ) ) 1927 1928 return 0 1929 1929 1930 1930 def daemon(): 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1931 """daemonized threading""" 1932 1933 # Fork the first child 1934 # 1935 pid = os.fork() 1936 1937 if pid > 0: 1938 1939 sys.exit(0) # end parent 1940 1941 # creates a session and sets the process group ID 1942 # 1943 os.setsid() 1944 1945 # Fork the second child 1946 # 1947 pid = os.fork() 1948 1949 if pid > 0: 1950 1951 sys.exit(0) # end parent 1952 1953 write_pidfile() 1954 1955 # Go to the root directory and set the umask 1956 # 1957 os.chdir('/') 1958 os.umask(0) 1959 1960 sys.stdin.close() 1961 sys.stdout.close() 1962 sys.stderr.close() 1963 1964 os.open('/dev/null', os.O_RDWR) 1965 os.dup2(0, 1) 1966 os.dup2(0, 2) 1967 1968 run() 1969 1969 1970 1970 def run(): 1971 1972 1973 config= GangliaConfigParser( GMETAD_CONF )1974 s_timeout= int( config.getLowestInterval() - 1 )1975 1976 1977 1978 myXMLSource= XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] )1979 myDataStore= DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] )1980 1981 myTorqueProcessor= TorqueXMLProcessor( myXMLSource, myDataStore )1982 myGangliaProcessor= GangliaXMLProcessor( myXMLSource, myDataStore )1983 1984 1985 torque_xml_thread= threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' )1986 ganglia_xml_thread= threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' )1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1971 """Threading start""" 1972 1973 config = GangliaConfigParser( GMETAD_CONF ) 1974 s_timeout = int( config.getLowestInterval() - 1 ) 1975 1976 socket.setdefaulttimeout( s_timeout ) 1977 1978 myXMLSource = XMLGatherer( ARCHIVE_XMLSOURCE.split( ':' )[0], ARCHIVE_XMLSOURCE.split( ':' )[1] ) 1979 myDataStore = DataSQLStore( JOB_SQL_DBASE.split( '/' )[0], JOB_SQL_DBASE.split( '/' )[1] ) 1980 1981 myTorqueProcessor = TorqueXMLProcessor( myXMLSource, myDataStore ) 1982 myGangliaProcessor = GangliaXMLProcessor( myXMLSource, myDataStore ) 1983 1984 try: 1985 torque_xml_thread = threading.Thread( None, myTorqueProcessor.run, 'torque_proc_thread' ) 1986 ganglia_xml_thread = threading.Thread( None, myGangliaProcessor.run, 'ganglia_proc_thread' ) 1987 1988 torque_xml_thread.start() 1989 ganglia_xml_thread.start() 1990 1991 except thread.error, msg: 1992 debug_msg( 0, 'FATAL ERROR: Unable to start main threads!: '+ str(msg) ) 1993 syslog.closelog() 1994 sys.exit(1) 1995 1996 debug_msg( 0, 'main threading started.' ) 1997 1997 1998 1998 def main(): 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 1999 """Program startup""" 2000 2001 global DAEMONIZE, USE_SYSLOG 2002 2003 if not processArgs( sys.argv[1:] ): 2004 sys.exit( 1 ) 2005 2006 if( DAEMONIZE and USE_SYSLOG ): 2007 syslog.openlog( 'jobarchived', syslog.LOG_NOWAIT, SYSLOG_FACILITY ) 2008 2009 if DAEMONIZE: 2010 daemon() 2011 else: 2012 run() 2013 2013 2014 2014 # … … 2017 2017 2018 2018 def check_dir( directory ): 2019 2020 2021 2022 2023 2024 2019 """Check if directory is a proper directory. I.e.: Does _not_ end with a '/'""" 2020 2021 if directory[-1] == '/': 2022 directory = directory[:-1] 2023 2024 return directory 2025 2025 2026 2026 def reqtime2epoch( rtime ): 2027 2027 2028 (hours, minutes, seconds )= rtime.split( ':' )2029 2030 etime= int(seconds)2031 etime= etime + ( int(minutes) * 60 )2032 etime= etime + ( int(hours) * 60 * 60 )2033 2034 2028 (hours, minutes, seconds ) = rtime.split( ':' ) 2029 2030 etime = int(seconds) 2031 etime = etime + ( int(minutes) * 60 ) 2032 etime = etime + ( int(hours) * 60 * 60 ) 2033 2034 return etime 2035 2035 2036 2036 def debug_msg( level, msg ): 2037 2038 2039 2040 2041 2042 2043 2037 """Only print msg if correct levels""" 2038 2039 if (not DAEMONIZE and DEBUG_LEVEL >= level): 2040 sys.stderr.write( printTime() + ' - ' + msg + '\n' ) 2041 2042 if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level): 2043 syslog.syslog( msg ) 2044 2044 2045 2045 def printTime( ): 2046 2047 2048 2046 """Print current time in human readable format""" 2047 2048 return time.strftime("%a %d %b %Y %H:%M:%S") 2049 2049 2050 2050 def write_pidfile(): 2051 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2052 # Write pidfile if PIDFILE exists 2053 if PIDFILE: 2054 2055 pid = os.getpid() 2056 2057 pidfile = open(PIDFILE, 'w') 2058 2059 pidfile.write( str( pid ) ) 2060 pidfile.close() 2061 2061 2062 2062 # Ooohh, someone started me! Let's go.. 2063 2063 # 2064 2064 if __name__ == '__main__': 2065 2065 main()
Note: See TracChangeset
for help on using the changeset viewer.