- Timestamp:
- 04/07/05 11:53:52 (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/togad.py
r46 r47 55 55 UNSUPPORTED_ARCHIVE_TYPES = [ 'string' ] 56 56 57 # Maximum time (in seconds) a parsethread may run 58 # 59 PARSE_TIMEOUT = 60 60 61 # Maximum time (in seconds) a storethread may run 62 # 63 STORE_TIMEOUT = 360 64 57 65 """ 58 66 This is TOrque-GAnglia's data Daemon … … 382 390 383 391 debug_msg( 7, self.printTime() + ' - storethread(): storemetricthread() still running, waiting to finish..' ) 384 storethread.join( 180) # Maximum time is for storing thread to finish392 storethread.join( STORE_TIMEOUT ) # Maximum time is for storing thread to finish 385 393 debug_msg( 7, self.printTime() + ' - storethread(): Done waiting.' ) 386 394 … … 414 422 415 423 debug_msg( 7, self.printTime() + ' - xmlthread(): parsethread() still running, waiting to finish..' ) 416 parsethread.join( 180) # Maximum time for XML thread to finish424 parsethread.join( PARSE_TIMEOUT ) # Maximum time for XML thread to finish 417 425 debug_msg( 7, self.printTime() + ' - xmlthread(): Done waiting.' ) 418 426 … … 507 515 myMetrics = { } 508 516 lastStored = { } 517 timeserials = { } 509 518 slot = None 510 519 … … 540 549 for host in hosts: 541 550 551 host_dir = cluster_dir + '/' + host 552 dirlist = os.listdir( host_dir ) 553 554 for dir in dirlist: 555 556 if not self.timeserials.has_key( host ): 557 558 self.timeserials[ host ] = [ ] 559 560 self.timeserials[ host ].append( dir ) 561 542 562 last_serial = self.getLastRrdTimeSerial( host ) 543 563 if last_serial: … … 589 609 # </atomic> 590 610 591 def makeUpdateList( self, host, metric name):611 def makeUpdateList( self, host, metriclist ): 592 612 593 613 update_list = [ ] 594 614 metric = None 595 615 596 while len( self.myMetrics[ host ][ metricname ]) > 0:616 while len( metriclist ) > 0: 597 617 598 618 # Kabouter pop 599 619 # 600 620 # <atomic> 601 self.slot.acquire()621 #self.slot.acquire() 602 622 603 623 # len might have changed since loop start 604 624 # 605 if len( self.myMetrics[ host ][ metricname ]) > 0:606 metric = self.myMetrics[ host ][ metricname ].pop( 0 )607 608 self.slot.release()625 if len( metriclist ) > 0: 626 metric = metriclist.pop( 0 ) 627 628 #self.slot.release() 609 629 # </atomic> 610 630 611 631 if metric: 612 if self.checkStoreMetric( host, metric name, metric):632 if self.checkStoreMetric( host, metric ): 613 633 update_list.append( '%s:%s' %( metric['time'], metric['val'] ) ) 614 #else:615 #print 'allready wrote metric %s with timestamp %s' %( metric['name'], metric['time'] )616 634 617 635 return update_list … … 621 639 if self.lastStored.has_key( host ): 622 640 623 if self.lastStored[ host ].has_key( metric name):624 625 if metric['time'] <= self.lastStored[ host ][ metric name]:641 if self.lastStored[ host ].has_key( metric['name'] ): 642 643 if metric['time'] <= self.lastStored[ host ][ metric['name'] ]: 626 644 627 645 # Allready wrote a value with this timestamp, skip tnx … … 631 649 self.lastStored[ host ] = { } 632 650 633 self.lastStored[ host ][ metric name] = metric['time']651 self.lastStored[ host ][ metric['name'] ] = metric['time'] 634 652 635 653 return 1 … … 641 659 for metricname, mymetric in mymetrics.items(): 642 660 643 mytime = self.makeTimeSerial() 644 correct_serial = self.checkNewRrdPeriod( hostname, mytime ) 645 self.createCheck( hostname, metricname, correct_serial ) 646 update_ret = self.update( hostname, metricname, correct_serial ) 647 648 if update_ret == 0: 649 650 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) ) 651 else: 652 debug_msg( 9, 'metric update failed' ) 653 654 #sys.exit(1) 661 #mytime = self.makeTimeSerial() 662 #serial = mymetric['time'] 663 #correct_serial = self.checkNewRrdPeriod( hostname, mytime ) 664 665 self.slot.acquire() 666 667 # Create a mapping table, each metric to the period where it should be stored 668 # 669 metric_serial_table = self.determineSerials( hostname, metricname, mymetric ) 670 self.myMetrics[ hostname ][ metricname ] = [ ] 671 672 self.slot.release() 673 674 for period, pmetric in metric_serial_table.items(): 675 676 self.createCheck( hostname, metricname, period ) 677 678 update_ret = self.update( hostname, metricname, period, pmetric ) 679 680 if update_ret == 0: 681 682 debug_msg( 9, 'stored metric %s for %s' %( hostname, metricname ) ) 683 else: 684 debug_msg( 9, 'metric update failed' ) 685 686 sys.exit(1) 655 687 656 688 def makeTimeSerial( self ): … … 686 718 """ 687 719 688 rrd_dir, rrd_file = self.makeRrdPath( host )689 690 720 newest_timeserial = 0 691 721 692 if os.path.exists( rrd_dir ): 693 694 dirlist = os.listdir( rrd_dir ) 695 696 for dir in dirlist: 697 698 valid_dir = 1 699 700 for letter in dir: 701 if letter not in string.digits: 702 valid_dir = 0 703 704 if valid_dir: 705 timeserial = dir 706 if timeserial > newest_timeserial: 707 newest_timeserial = timeserial 722 for dir in self.timeserials[ host ]: 723 724 valid_dir = 1 725 726 for letter in dir: 727 if letter not in string.digits: 728 valid_dir = 0 729 730 if valid_dir: 731 timeserial = dir 732 if timeserial > newest_timeserial: 733 newest_timeserial = timeserial 708 734 709 735 if newest_timeserial: … … 712 738 return 0 713 739 740 def determinePeriod( self, host, check_serial ): 741 742 period_serial = 0 743 744 for serial in self.timeserials[ host ]: 745 746 if check_serial >= serial and period_serial < serial: 747 748 period_serial = serial 749 750 return period_serial 751 752 def determineSerials( self, host, metricname, metriclist ): 753 """ 754 Determine the correct serial and corresponding rrd to store 755 for a list of metrics 756 """ 757 758 metric_serial_table = { } 759 760 for metric in metriclist: 761 762 if metric['name'] == metricname: 763 764 period = self.determinePeriod( host, metric['time'] ) 765 766 archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60) 767 768 if (metric['time'] - period) > archive_secs: 769 770 # This one should get it's own new period 771 period = metric['time'] 772 773 if not metric_serial_table.has_key( period ): 774 775 metric_serial_table = [ ] 776 777 metric_serial_table[ period ].append( metric ) 778 779 print metric_serial_table 780 781 return metric_serial_table 782 714 783 def checkNewRrdPeriod( self, host, current_timeserial ): 715 784 """ … … 729 798 archive_secs = ARCHIVE_HOURS_PER_RRD * (60 * 60) 730 799 731 if (current_timeserial - last_timeserial) > =archive_secs:800 if (current_timeserial - last_timeserial) > archive_secs: 732 801 serial = current_timeserial 733 802 else: … … 753 822 754 823 debug_msg( 9, 'rrdcreate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) 755 824 756 825 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 757 826 … … 763 832 764 833 interval = self.config.getInterval( self.cluster ) 765 heartbeat = 8 * int( interval)834 heartbeat = 8 * int( interval ) 766 835 767 836 params = [ ] … … 771 840 772 841 params.append( '--start' ) 773 params.append( str( int( self.getFirstTime( host, metricname )) - 1 ) )842 params.append( str( int( timeserial ) - 1 ) ) 774 843 775 844 params.append( 'DS:sum:GAUGE:%d:U:U' %heartbeat ) … … 780 849 debug_msg( 9, 'created rrd %s' %( str(rrd_file) ) ) 781 850 782 def update( self, host, metricname, timeserial ):851 def update( self, host, metricname, timeserial, metriclist ): 783 852 784 853 debug_msg( 9, 'rrdupdate: using timeserial %s for %s/%s' %( timeserial, host, metricname ) ) … … 786 855 rrd_dir, rrd_file = self.makeRrdPath( host, metricname, timeserial ) 787 856 788 update_list = self.makeUpdateList( host, metric name)857 update_list = self.makeUpdateList( host, metriclist ) 789 858 790 859 if len( update_list ) > 0:
Note: See TracChangeset
for help on using the changeset viewer.