source: branches/1.1/jobmond/jobmond.py @ 947

Last change on this file since 947 was 947, checked in by ramonb, 10 years ago
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 65.7 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
22# SVN $Id: jobmond.py 947 2014-01-20 13:23:53Z ramonb $
23#
24
25# vi :set ts=4
26
27import sys, getopt, ConfigParser, time, os, socket, string, re
28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
29from xml.sax.handler import feature_namespaces
30from collections import deque
31from glob import glob
32
33VERSION='__VERSION__'
34
35def usage( ver ):
36
37    print 'jobmond %s' %VERSION
38
39    if ver:
40        return 0
41
42    print
43    print 'Purpose:'
44    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
45    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
46    print
47    print 'Usage:    jobmond [OPTIONS]'
48    print
49    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
50    print '  -p, --pidfile=FILE    Use pid file to store the process id'
51    print '  -h, --help        Print help and exit'
52    print '  -v, --version      Print version and exit'
53    print
54
55def processArgs( args ):
56
57    SHORT_L      = 'p:hvc:'
58    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
59
60    global PIDFILE, JOBMOND_CONF
61    PIDFILE      = None
62
63    JOBMOND_CONF = '/etc/jobmond.conf'
64
65    try:
66
67        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
68
69    except getopt.GetoptError, detail:
70
71        print detail
72        usage( False )
73        sys.exit( 1 )
74
75    for opt, value in opts:
76
77        if opt in [ '--config', '-c' ]:
78       
79            JOBMOND_CONF = value
80
81        if opt in [ '--pidfile', '-p' ]:
82
83            PIDFILE      = value
84       
85        if opt in [ '--help', '-h' ]:
86 
87            usage( False )
88            sys.exit( 0 )
89
90        if opt in [ '--version', '-v' ]:
91
92            usage( True )
93            sys.exit( 0 )
94
95    return loadConfig( JOBMOND_CONF )
96
97html_escape_table = {
98    "&": "_",
99    '"': "_",
100    "'": "_",
101    ">": "_",
102    "<": "_",
103}
104
105# If this is too slow:
106# - re.sub(r'[\&|\>\<\'\"]','_', variable )
107# - re.sub(r'[\&\>\<\'\"]','_', variable )
108# - and even faster: compile regex
109
110def html_escape(text):
111    """Produce entities within text."""
112    return string.join((html_escape_table.get(c,c) for c in text), '')
113
114class GangliaConfigParser:
115
116    def __init__( self, filename ):
117
118        self.conf_lijst   = [ ]
119        self.conf_dict    = { }
120        self.filename     = filename
121        self.file_pointer = file( filename, 'r' )
122        self.lexx         = shlex.shlex( self.file_pointer )
123        self.lexx.whitespace_split = True
124
125        self.parse()
126
127    def __del__( self ):
128
129        """
130        Cleanup: close file descriptor
131        """
132
133        self.file_pointer.close()
134        del self.lexx
135        del self.conf_lijst
136
137    def removeQuotes( self, value ):
138
139        clean_value = value
140        clean_value = clean_value.replace( "'", "" )
141        clean_value = clean_value.replace( '"', '' )
142        clean_value = clean_value.strip()
143
144        return clean_value
145
146    def removeBraces( self, value ):
147
148        clean_value = value
149        clean_value = clean_value.replace( "(", "" )
150        clean_value = clean_value.replace( ')', '' )
151        clean_value = clean_value.strip()
152
153        return clean_value
154
155    def parse( self ):
156
157        """
158        Parse self.filename using shlex scanning.
159        - Removes /* comments */
160        - Traverses (recursively) through all include () statements
161        - Stores complete valid config tokens in self.conf_list
162
163        i.e.:
164            ['globals',
165             '{',
166             'daemonize',
167             '=',
168             'yes',
169             'setuid',
170             '=',
171             'yes',
172             'user',
173             '=',
174             'ganglia',
175             'debug_level',
176             '=',
177             '0',
178             <etc> ]
179        """
180
181        t = 'bogus'
182        c = False
183        i = False
184
185        while t != self.lexx.eof:
186            #print 'get token'
187            t = self.lexx.get_token()
188
189            if len( t ) >= 2:
190
191                if len( t ) >= 4:
192
193                    if t[:2] == '/*' and t[-2:] == '*/':
194
195                        #print 'comment line'
196                        #print 'skipping: %s' %t
197                        continue
198
199                if t == '/*' or t[:2] == '/*':
200                    c = True
201                    #print 'comment start'
202                    #print 'skipping: %s' %t
203                    continue
204
205                if t == '*/' or t[-2:] == '*/':
206                    c = False
207                    #print 'skipping: %s' %t
208                    #print 'comment end'
209                    continue
210
211            if c:
212                #print 'skipping: %s' %t
213                continue
214
215            if t == 'include':
216                i = True
217                #print 'include start'
218                #print 'skipping: %s' %t
219                continue
220
221            if i:
222
223                #print 'include start: %s' %t
224
225                t2 = self.removeQuotes( t )
226                t2 = self.removeBraces( t )
227
228                for in_file in glob( self.removeQuotes(t2) ):
229
230                    #print 'including file: %s' %in_file
231                    parse_infile = GangliaConfigParser( in_file )
232
233                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
234
235                    del parse_infile
236
237                i = False
238                #print 'include end'
239                #print 'skipping: %s' %t
240                continue
241
242            #print 'keep: %s' %t
243            self.conf_lijst.append( self.removeQuotes(t) )
244
245    def getConfLijst( self ):
246
247        return self.conf_lijst
248
249    def confListToDict( self, parent_list=None ):
250
251        """
252        Recursively traverses a conf_list and creates dictionary from it
253        """
254
255        new_dict = { }
256        count    = 0
257        skip     = 0
258
259        if not parent_list:
260            parent_list = self.conf_lijst
261
262        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
263
264        for n, c in enumerate( parent_list ):
265
266            count = count + 1
267
268            #print 'CL: n %d c %s' %(n, c)
269
270            if skip > 0:
271
272                #print '- skipped'
273                skip = skip - 1
274                continue
275
276            if (n+1) <= (len( parent_list )-1):
277
278                if parent_list[(n+1)] == '{':
279
280                    if not new_dict.has_key( c ):
281                        new_dict[ c ] = [ ]
282
283                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
284                    new_dict[ c ].append( temp_new_dict )
285
286                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
287
288                    if not new_dict.has_key( c ):
289                        new_dict[ c ] = [ ]
290
291                    new_dict[ c ].append( parent_list[ (n+2) ] )
292
293                    skip = 2
294
295                if parent_list[n] == '}':
296
297                    #print 'leaving confListToDict(): new dict = %s' %new_dict
298                    return (new_dict, count)
299
300    def makeConfDict( self ):
301
302        """
303        Walks through self.conf_list and creates a dictionary based upon config values
304
305        i.e.:
306            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
307                                                         'ip': ['"127.0.0.1"'],
308                                                         'mask': ['32']}]}],
309                                    'port': ['8649']}],
310            'udp_recv_channel': [{'port': ['8649']}],
311            'udp_send_channel': [{'host': ['145.101.32.3'],
312                                  'port': ['8649']},
313                                 {'host': ['145.101.32.207'],
314                                  'port': ['8649']}]}
315        """
316
317        new_dict = { }
318        skip     = 0
319
320        #print 'entering makeConfDict()'
321
322        for n, c in enumerate( self.conf_lijst ):
323
324            #print 'M: n %d c %s' %(n, c)
325
326            if skip > 0:
327
328                #print '- skipped'
329                skip = skip - 1
330                continue
331
332            if (n+1) <= (len( self.conf_lijst )-1):
333
334                if self.conf_lijst[(n+1)] == '{':
335
336                    if not new_dict.has_key( c ):
337                        new_dict[ c ] = [ ]
338
339                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
340                    new_dict[ c ].append( temp_new_dict )
341
342                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
343
344                    if not new_dict.has_key( c ):
345                        new_dict[ c ] = [ ]
346
347                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
348
349                    skip = 2
350
351        self.conf_dict = new_dict
352        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
353
354    def checkConfDict( self ):
355
356        if len( self.conf_lijst ) == 0:
357
358            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
359
360        if len( self.conf_dict ) == 0:
361
362            self.makeConfDict()
363
364    def getConfDict( self ):
365
366        self.checkConfDict()
367        return self.conf_dict
368
369    def getUdpSendChannels( self ):
370
371        self.checkConfDict()
372
373        udp_send_channels = [ ] # IP:PORT
374
375        if not self.conf_dict.has_key( 'udp_send_channel' ):
376            return None
377
378        for u in self.conf_dict[ 'udp_send_channel' ]:
379
380            if u.has_key( 'mcast_join' ):
381
382                ip = u['mcast_join'][0]
383
384            elif u.has_key( 'host' ):
385
386                ip = u['host'][0]
387
388            port = u['port'][0]
389
390            udp_send_channels.append( ( ip, port ) )
391
392        if len( udp_send_channels ) == 0:
393            return None
394
395        return udp_send_channels
396
397    def getSectionLastOption( self, section, option ):
398
399        """
400        Get last option set in a config section that could be set multiple times in multiple (include) files.
401
402        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
403        """
404
405        self.checkConfDict()
406        value = None
407
408        if not self.conf_dict.has_key( section ):
409
410            return None
411
412        # Could be set multiple times in multiple (include) files: get last one set
413        for c in self.conf_dict[ section ]:
414
415                if c.has_key( option ):
416
417                    value = c[ option ][0]
418
419        return value
420
421    def getClusterName( self ):
422
423        return self.getSectionLastOption( 'cluster', 'name' )
424
425    def getVal( self, section, option ):
426
427        return self.getSectionLastOption( section, option )
428
429    def getInt( self, section, valname ):
430
431        value    = self.getVal( section, valname )
432
433        if not value:
434            return None
435
436        return int( value )
437
438    def getStr( self, section, valname ):
439
440        value    = self.getVal( section, valname )
441
442        if not value:
443            return None
444
445        return str( value )
446
447def findGmetric():
448
449    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
450
451        guess    = '%s/%s' %( dir, 'gmetric' )
452
453        if os.path.exists( guess ):
454
455            return guess
456
457    return False
458
459def loadConfig( filename ):
460
461    def getlist( cfg_string ):
462
463        my_list = [ ]
464
465        for item_txt in cfg_string.split( ',' ):
466
467            sep_char = None
468
469            item_txt = item_txt.strip()
470
471            for s_char in [ "'", '"' ]:
472
473                if item_txt.find( s_char ) != -1:
474
475                    if item_txt.count( s_char ) != 2:
476
477                        print 'Missing quote: %s' %item_txt
478                        sys.exit( 1 )
479
480                    else:
481
482                        sep_char = s_char
483                        break
484
485            if sep_char:
486
487                item_txt = item_txt.split( sep_char )[1]
488
489            my_list.append( item_txt )
490
491        return my_list
492
493    if not os.path.isfile( JOBMOND_CONF ):
494
495        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
496        sys.exit( 1 )
497
498    try:
499        f = open( JOBMOND_CONF, 'r' )
500    except IOError, detail:
501        print "Cannot read config file: '%s'" %JOBMOND_CONF
502        sys.exit( 1 )
503    else:
504        f.close()
505
506    cfg        = ConfigParser.ConfigParser()
507
508    cfg.read( filename )
509
510    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
511    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
512    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
513    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
514    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
515
516    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
517
518    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
519
520    SYSLOG_LEVEL    = -1
521    SYSLOG_FACILITY = None
522
523    try:
524        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
525
526    except ConfigParser.NoOptionError:
527
528        USE_SYSLOG  = True
529
530        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
531
532    if USE_SYSLOG:
533
534        try:
535            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
536
537        except ConfigParser.NoOptionError:
538
539            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
540            SYSLOG_LEVEL = 0
541
542        try:
543
544            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
545
546        except ConfigParser.NoOptionError:
547
548            SYSLOG_FACILITY = syslog.LOG_DAEMON
549
550            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
551
552    try:
553
554        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
555
556    except ConfigParser.NoOptionError:
557
558        # Not required for all API's: only pbs api allows remote connections
559        BATCH_SERVER = None
560
561    try:
562   
563        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
564
565    except ConfigParser.NoOptionError:
566
567        # Backwards compatibility for old configs
568        #
569
570        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
571        api_guess           = 'pbs'
572   
573    try:
574
575        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
576
577    except ConfigParser.NoOptionError:
578
579        # Not specified: assume /etc/ganglia/gmond.conf
580        #
581        GMOND_CONF          = '/etc/ganglia/gmond.conf'
582
583    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
584    GMETRIC_TARGET          = None
585
586    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
587
588    if not GMOND_UDP_SEND_CHANNELS:
589
590        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
591
592        # Couldn't figure it out: let's see if it's in our jobmond.conf
593        #
594        try:
595
596            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
597
598        # Guess not: now just give up
599       
600        except ConfigParser.NoOptionError:
601
602            GMETRIC_TARGET    = None
603
604            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
605
606            gmetric_bin    = findGmetric()
607
608            if gmetric_bin:
609
610                GMETRIC_BINARY     = gmetric_bin
611            else:
612                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
613
614                try:
615
616                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
617
618                except ConfigParser.NoOptionError:
619
620                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
621                    sys.exit( 1 )
622
623    #TODO: is this really still needed or should be automatic
624    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
625
626    try:
627        BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
628
629    except ConfigParser.NoOptionError:
630
631        BATCH_HOST_TRANSLATE = [ ]
632        pass
633
634    try:
635
636        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
637
638    except ConfigParser.NoOptionError, detail:
639
640        print "FATAL ERROR: BATCH_API not set"
641        sys.exit( 1 )
642
643    try:
644
645        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
646
647    except ConfigParser.NoOptionError, detail:
648
649        QUEUE        = None
650
651    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
652
653    return True
654
655def fqdn_parts (fqdn):
656
657    """Return pair of host and domain for fully-qualified domain name arg."""
658
659    parts = fqdn.split (".")
660
661    return (parts[0], string.join(parts[1:], "."))
662
663class DataProcessor:
664
665    """Class for processing of data"""
666
667    binary = None
668
669    def __init__( self, binary=None ):
670
671        """Remember alternate binary location if supplied"""
672
673        global GMETRIC_BINARY, GMOND_CONF
674
675        if binary:
676            self.binary = binary
677
678        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
679            self.binary = GMETRIC_BINARY
680
681        # Timeout for XML
682        #
683        # From ganglia's documentation:
684        #
685        # 'A metric will be deleted DMAX seconds after it is received, and
686        # DMAX=0 means eternal life.'
687
688        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
689
690        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
691
692            incompatible = self.checkGmetricVersion()
693
694            if incompatible:
695
696                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
697                sys.exit( 1 )
698
699        self.gmetric_send_instances = { }
700
701
702    def __del__( self ):
703
704        for send_instance in self.gmetric_send_instances:
705
706            del send_instance
707
708    def checkGmetricVersion( self ):
709
710        """
711        Check version of gmetric is at least 3.3.8
712        for the syntax we use
713        """
714
715        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
716
717        incompatible    = 0
718
719        gfp        = os.popen( self.binary + ' --version' )
720        lines      = gfp.readlines()
721
722        gfp.close()
723
724        for line in lines:
725
726            line = line.split( ' ' )
727
728            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
729           
730                gmetric_version    = line[1].split( '\n' )[0]
731
732                version_major    = int( gmetric_version.split( '.' )[0] )
733                version_minor    = int( gmetric_version.split( '.' )[1] )
734                version_patch    = int( gmetric_version.split( '.' )[2] )
735
736                incompatible    = 0
737
738                if version_major < 3:
739
740                    incompatible = 1
741               
742                elif version_major == 3:
743
744                    if version_minor < 3:
745
746                        incompatible = 1
747
748                    elif version_patch < 8:
749
750                        incompatible = 1
751
752        return incompatible
753
754    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
755
756        """Call gmetric binary and multicast"""
757
758        cmd = self.binary
759
760        if GMOND_UDP_SEND_CHANNELS:
761
762            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
763
764                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
765
766                debug_msg( 10, printTime() + ' ' + metric_debug)
767
768                if not self.gmetric_send_instances.has_key( (c_ip, c_port) ):
769
770                    self.gmetric_send_instances[ (c_ip, c_port) ] = Gmetric( c_ip, c_port )
771
772                self.gmetric_send_instances[ (c_ip, c_port) ].send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
773
774        elif GMETRIC_TARGET:
775
776            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
777            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
778
779            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
780
781            debug_msg( 10, printTime() + ' ' + metric_debug)
782
783            if not self.gmetric_send_instances.has_key( (GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT) ):
784
785                self.gmetric_send_instances[ (GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT) ] = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
786
787            self.gmetric_send_instances[ (GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT) ].send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
788
789        else:
790            try:
791                cmd = cmd + ' -c' + GMOND_CONF
792
793            except NameError:
794
795                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
796
797            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
798
799            if len( units ) > 0:
800
801                cmd = cmd + ' -u"' + units + '"'
802
803            debug_msg( 10, printTime() + ' ' + cmd )
804
805            os.system( cmd )
806
807class DataGatherer:
808
809    """Skeleton class for batch system DataGatherer"""
810
811    def printJobs( self, jobs ):
812
813        """Print a jobinfo overview"""
814
815        for name, attrs in self.jobs.items():
816
817            print 'job %s' %(name)
818
819            for name, val in attrs.items():
820
821                print '\t%s = %s' %( name, val )
822
823    def printJob( self, jobs, job_id ):
824
825        """Print job with job_id from jobs"""
826
827        print 'job %s' %(job_id)
828
829        for name, val in jobs[ job_id ].items():
830
831            print '\t%s = %s' %( name, val )
832
833    def getAttr( self, attrs, name ):
834
835        """Return certain attribute from dictionary, if exists"""
836
837        if attrs.has_key( name ):
838
839            return attrs[ name ]
840        else:
841            return ''
842
843    def jobDataChanged( self, jobs, job_id, attrs ):
844
845        """Check if job with attrs and job_id in jobs has changed"""
846
847        if jobs.has_key( job_id ):
848
849            oldData = jobs[ job_id ]   
850        else:
851            return 1
852
853        for name, val in attrs.items():
854
855            if oldData.has_key( name ):
856
857                if oldData[ name ] != attrs[ name ]:
858
859                    return 1
860
861            else:
862                return 1
863
864        return 0
865
866    def submitJobData( self ):
867
868        """Submit job info list"""
869
870        global BATCH_API
871
872        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
873
874        running_jobs = 0
875        queued_jobs  = 0
876
877        # Count how many running/queued jobs we found
878        #
879        for jobid, jobattrs in self.jobs.items():
880
881            if jobattrs[ 'status' ] == 'Q':
882
883                queued_jobs += 1
884
885            elif jobattrs[ 'status' ] == 'R':
886
887                running_jobs += 1
888
889        # Report running/queued jobs as seperate metric for a nice RRD graph
890        #
891        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
892        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
893
894        # Report down/offline nodes in batch (PBS only ATM)
895        #
896        if BATCH_API in [ 'pbs', 'slurm' ]:
897
898            domain        = fqdn_parts( socket.getfqdn() )[1]
899
900            downed_nodes  = list()
901            offline_nodes = list()
902       
903            l        = ['state']
904
905            nodelist = self.getNodeData()
906
907            for name, node in nodelist.items():
908
909                if ( node[ 'state' ].find( "down" ) != -1 ):
910
911                    downed_nodes.append( name )
912
913                if ( node[ 'state' ].find( "offline" ) != -1 ):
914
915                    offline_nodes.append( name )
916
917            downnodeslist    = do_nodelist( downed_nodes )
918            offlinenodeslist = do_nodelist( offline_nodes )
919
920            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
921            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
922            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
923            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
924
925        # Now let's spread the knowledge
926        #
927        for jobid, jobattrs in self.jobs.items():
928
929            # Make gmetric values for each job: respect max gmetric value length
930            #
931            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
932            metric_increment    = 0
933
934            # If we have more job info than max gmetric value length allows, split it up
935            # amongst multiple metrics
936            #
937            for val in gmetric_val:
938
939                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
940                self.dp.multicastGmetric( metric_name, val )
941
942                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
943                #
944                metric_increment    = metric_increment + 1
945
946    def compileGmetricVal( self, jobid, jobattrs ):
947
948        """Create a val string for gmetric of jobinfo"""
949
950        gval_lists    = [ ]
951        val_list    = { }
952
953        for val_name, val_value in jobattrs.items():
954
955            # These are our own metric names, i.e.: status, start_timestamp, etc
956            #
957            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
958
959            # These are their corresponding values
960            #
961            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
962
963            if val_name == 'nodes' and jobattrs['status'] == 'R':
964
965                node_str = None
966
967                for node in val_value:
968
969                    if node_str:
970
971                        node_str = node_str + ';' + node
972                    else:
973                        node_str = node
974
975                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
976                    #
977                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
978
979                        # It's too big, we need to make a new gmetric for the additional info
980                        #
981                        val_list[ val_name ]    = node_str
982
983                        gval_lists.append( val_list )
984
985                        val_list        = { }
986                        node_str        = None
987
988                val_list[ val_name ]    = node_str
989
990                gval_lists.append( val_list )
991
992                val_list        = { }
993
994            elif val_value != '':
995
996                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
997                #
998                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
999
1000                    # It's too big, we need to make a new gmetric for the additional info
1001                    #
1002                    gval_lists.append( val_list )
1003
1004                    val_list        = { }
1005
1006                val_list[ val_name ]    = val_value
1007
1008        if len( val_list ) > 0:
1009
1010            gval_lists.append( val_list )
1011
1012        str_list    = [ ]
1013
1014        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
1015        #
1016        for val_list in gval_lists:
1017
1018            my_val_str    = None
1019
1020            for val_name, val_value in val_list.items():
1021
1022                if type(val_value) == list:
1023
1024                    val_value    = val_value.join( ',' )
1025
1026                if my_val_str:
1027
1028                    try:
1029                        # fixme: It's getting
1030                        # ('nodes', None) items
1031                        my_val_str = my_val_str + ' ' + val_name + '=' + html_escape(val_value)
1032                    except:
1033                        pass
1034
1035                else:
1036                    my_val_str = val_name + '=' + html_escape(val_value)
1037
1038            str_list.append( my_val_str )
1039
1040        return str_list
1041
1042    def daemon( self ):
1043
1044        """Run as daemon forever"""
1045
1046        # Fork the first child
1047        #
1048        pid = os.fork()
1049        if pid > 0:
1050            sys.exit(0)  # end parent
1051
1052        # creates a session and sets the process group ID
1053        #
1054        os.setsid()
1055
1056        # Fork the second child
1057        #
1058        pid = os.fork()
1059        if pid > 0:
1060            sys.exit(0)  # end parent
1061
1062        write_pidfile()
1063
1064        # Go to the root directory and set the umask
1065        #
1066        os.chdir('/')
1067        os.umask(0)
1068
1069        sys.stdin.close()
1070        sys.stdout.close()
1071        sys.stderr.close()
1072
1073        os.open('/dev/null', os.O_RDWR)
1074        os.dup2(0, 1)
1075        os.dup2(0, 2)
1076
1077        self.run()
1078
1079    def run( self ):
1080
1081        """Main thread"""
1082
1083        while ( 1 ):
1084       
1085            self.getJobData()
1086            self.submitJobData()
1087            time.sleep( BATCH_POLL_INTERVAL )   
1088
1089# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1090# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1091# with 6.2.  See also the fixmes.
1092
1093class NoJobs (Exception):
1094    """Exception raised by empty job list in qstat output."""
1095    pass
1096
1097class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
1098    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
1099
1100    def __init__(self):
1101        self.value = ""
1102        self.joblist = []
1103        self.job = {}
1104        self.queue = ""
1105        self.in_joblist = False
1106        self.lrequest = False
1107        self.eltq = deque()
1108        xml.sax.handler.ContentHandler.__init__(self)
1109
1110    # The structure of the output is as follows (for SGE 6.0).  It's
1111    # similar for 6.1, but radically different for 6.2, and is
1112    # undocumented generally.  Unfortunately it's voluminous, and probably
1113    # doesn't scale to large clusters/queues.
1114
1115    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1116    #   <djob_info>
1117    #     <qmaster_response>  <!-- job -->
1118    #       ...
1119    #       <JB_ja_template> 
1120    #     <ulong_sublist>
1121    #     ...         <!-- start_time, state ... -->
1122    #     </ulong_sublist>
1123    #       </JB_ja_template> 
1124    #       <JB_ja_tasks>
1125    #     <ulong_sublist>
1126    #       ...       <!-- task info
1127    #     </ulong_sublist>
1128    #     ...
1129    #       </JB_ja_tasks>
1130    #       ...
1131    #     </qmaster_response>
1132    #   </djob_info>
1133    #   <messages>
1134    #   ...
1135
1136    # NB.  We might treat each task as a separate job, like
1137    # straight qstat output, but the web interface expects jobs to
1138    # be identified by integers, not, say, <job number>.<task>.
1139
1140    # So, I lied.  If the job list is empty, we get invalid XML
1141    # like this, which we need to defend against:
1142
1143    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1144    #   <>
1145    #     <ST_name>*</ST_name>
1146    #   </>
1147    # </unknown_jobs>
1148
1149    def startElement(self, name, attrs):
1150        self.value = ""
1151        if name == "djob_info":    # job list
1152            self.in_joblist = True
1153        # The job container is "qmaster_response" in SGE 6.0
1154        # and 6.1, but "element" in 6.2.  This is only the very
1155        # start of what's necessary for 6.2, though (sigh).
1156        elif (name == "qmaster_response" or name == "element") \
1157                and self.eltq[-1] == "djob_info": # job
1158            self.job = {"job_state": "U", "slots": 0,
1159                    "nodes": [], "queued_timestamp": "",
1160                    "queued_timestamp": "", "queue": "",
1161                    "ppn": "0", "RN_max": 0,
1162                    # fixme in endElement
1163                    "requested_memory": 0, "requested_time": 0
1164                    }
1165            self.joblist.append(self.job)
1166        elif name == "qstat_l_requests": # resource request
1167            self.lrequest = True
1168        elif name == "unknown_jobs":
1169            raise NoJobs
1170        self.eltq.append (name)
1171
1172    def characters(self, ch):
1173        self.value += ch
1174
1175    def endElement(self, name): 
1176        """Snarf job elements contents into job dictionary.
1177           Translate keys if appropriate."""
1178
1179        name_trans = {
1180          "JB_job_number": "number",
1181          "JB_job_name": "name", "JB_owner": "owner",
1182          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1183          "JB_submission_time": "queued_timestamp"
1184          }
1185        value = self.value
1186        self.eltq.pop ()
1187
1188        if name == "djob_info":
1189            self.in_joblist = False
1190            self.job = {}
1191        elif name == "JAT_master_queue":
1192            self.job["queue"] = value.split("@")[0]
1193        elif name == "JG_qhostname":
1194            if not (value in self.job["nodes"]):
1195                self.job["nodes"].append(value)
1196        elif name == "JG_slots": # slots in use
1197            self.job["slots"] += int(value)
1198        elif name == "RN_max": # requested slots (tasks or parallel)
1199            self.job["RN_max"] = max (self.job["RN_max"],
1200                          int(value))
1201        elif name == "JAT_state": # job state (bitwise or)
1202            value = int (value)
1203            # Status values from sge_jobL.h
1204            #define JIDLE           0x00000000
1205            #define JHELD           0x00000010
1206            #define JMIGRATING          0x00000020
1207            #define JQUEUED         0x00000040
1208            #define JRUNNING        0x00000080
1209            #define JSUSPENDED          0x00000100
1210            #define JTRANSFERING        0x00000200
1211            #define JDELETED        0x00000400
1212            #define JWAITING        0x00000800
1213            #define JEXITING        0x00001000
1214            #define JWRITTEN        0x00002000
1215            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1216            #define JFINISHED           0x00010000
1217            if value & 0x80:
1218                self.job["status"] = "R"
1219            elif value & 0x40:
1220                self.job["status"] = "Q"
1221            else:
1222                self.job["status"] = "O" # `other'
1223        elif name == "CE_name" and self.lrequest and self.value in \
1224                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1225            # We're in a container for an interesting resource
1226            # request; record which type.
1227            self.lrequest = self.value
1228        elif name == "CE_doubleval" and self.lrequest:
1229            # if we're in a container for an interesting
1230            # resource request, use the maxmimum of the hard
1231            # and soft requests to record the requested CPU
1232            # or core.  Fixme:  I'm not sure if this logic is
1233            # right.
1234            if self.lrequest in ("h_core", "s_core"):
1235                self.job["requested_memory"] = \
1236                    max (float (value),
1237                     self.job["requested_memory"])
1238            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1239            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1240                self.job["requested_time"] = \
1241                    max (float (value),
1242                     self.job["requested_time"])
1243        elif name == "qstat_l_requests":
1244            self.lrequest = False
1245        elif self.job and self.in_joblist:
1246            if name in name_trans:
1247                name = name_trans[name]
1248                self.job[name] = value
1249
1250# Abstracted from PBS original.
1251# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
1252#
1253def do_nodelist( nodes ):
1254
1255    """Translate node list as appropriate."""
1256
1257    nodeslist        = [ ]
1258    my_domain        = fqdn_parts( socket.getfqdn() )[1]
1259
1260    for node in nodes:
1261
1262        host        = node.split( '/' )[0] # not relevant for SGE
1263        h, host_domain    = fqdn_parts(host)
1264
1265        if host_domain == my_domain:
1266
1267            host    = h
1268
1269        if nodeslist.count( host ) == 0:
1270
1271            for translate_pattern in BATCH_HOST_TRANSLATE:
1272
1273                if translate_pattern.find( '/' ) != -1:
1274
1275                    translate_orig    = \
1276                        translate_pattern.split( '/' )[1]
1277                    translate_new    = \
1278                        translate_pattern.split( '/' )[2]
1279                    host = re.sub( translate_orig,
1280                               translate_new, host )
1281            if not host in nodeslist:
1282                nodeslist.append( host )
1283    return nodeslist
1284
1285class SLURMDataGatherer( DataGatherer ):
1286
1287    global pyslurm
1288
1289    """This is the DataGatherer for SLURM"""
1290
1291    def __init__( self ):
1292
1293        """Setup appropriate variables"""
1294
1295        self.jobs       = { }
1296        self.timeoffset = 0
1297        self.dp         = DataProcessor()
1298
1299    def getNodeData( self ):
1300
1301        slurm_type  = pyslurm.node()
1302
1303        slurm_nodes = slurm_type.get()
1304
1305        nodedict    = { }
1306
1307        for node, attrs in slurm_nodes.items():
1308
1309            ( num_state, name_state ) = attrs['node_state'] 
1310
1311            if name_state == 'DOWN':
1312
1313                nodedict[ node ] = { 'state' : 'down' }
1314
1315            elif name_state == 'DRAIN':
1316
1317                nodedict[ node ] = { 'state' : 'offline' }
1318
1319        return nodedict
1320
1321    def getJobData( self ):
1322
1323        """Gather all data on current jobs"""
1324
1325        joblist            = {}
1326
1327        self.cur_time  = time.time()
1328
1329        slurm_type = pyslurm.job()
1330        joblist    = slurm_type.get()
1331
1332        jobs_processed    = [ ]
1333
1334        for name, attrs in joblist.items():
1335            display_queue = 1
1336            job_id        = name
1337
1338            name          = self.getAttr( attrs, 'name' )
1339            queue         = self.getAttr( attrs, 'partition' )
1340
1341            if QUEUE:
1342                for q in QUEUE:
1343                    if q == queue:
1344                        display_queue = 1
1345                        break
1346                    else:
1347                        display_queue = 0
1348                        continue
1349            if display_queue == 0:
1350                continue
1351
1352            owner_uid        = attrs[ 'user_id' ]
1353            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1354
1355            requested_time   = self.getAttr( attrs, 'time_limit' )
1356            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
1357
1358            if min_memory == 0:
1359
1360                requested_memory = ''
1361
1362            else:
1363                requested_memory = min_memory
1364
1365            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
1366
1367            if min_cpus == 0:
1368
1369                ppn = ''
1370
1371            else:
1372                ppn = min_cpus
1373
1374            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1375
1376            status = 'Q'
1377
1378            if status_long == 'RUNNING':
1379
1380                status = 'R'
1381
1382            elif status_long == 'COMPLETED':
1383
1384                continue
1385
1386            jobs_processed.append( job_id )
1387
1388            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1389
1390            start_timestamp = ''
1391            nodeslist       = ''
1392
1393            if status == 'R':
1394
1395                start_timestamp = self.getAttr( attrs, 'start_time' )
1396                nodes           = attrs[ 'nodes' ]
1397
1398                if not nodes:
1399
1400                    # This should not happen
1401
1402                    # Something wrong: running but 'nodes' returned empty by pyslurm
1403                    # Possible pyslurm bug: abort/quit/warning
1404
1405                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1406
1407                    print err_msg
1408                    debug_msg( 0, err_msg )
1409                    sys.exit(1)
1410
1411                my_nodelist = [ ]
1412
1413                slurm_hostlist  = pyslurm.hostlist()
1414                slurm_hostlist.create( nodes )
1415                slurm_hostlist.uniq()
1416
1417                while slurm_hostlist.count() > 0:
1418
1419                    my_nodelist.append( slurm_hostlist.pop() )
1420
1421                slurm_hostlist.destroy()
1422
1423                del slurm_hostlist
1424
1425                nodeslist       = do_nodelist( my_nodelist )
1426
1427                if DETECT_TIME_DIFFS:
1428
1429                    # If a job start if later than our current date,
1430                    # that must mean the Torque server's time is later
1431                    # than our local time.
1432               
1433                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1434
1435                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1436
1437            elif status == 'Q':
1438
1439                nodeslist       = str( attrs[ 'num_nodes' ] )
1440
1441            else:
1442                start_timestamp = ''
1443                nodeslist       = ''
1444
1445            myAttrs                = { }
1446
1447            myAttrs[ 'name' ]             = str( name )
1448            myAttrs[ 'queue' ]            = str( queue )
1449            myAttrs[ 'owner' ]            = str( owner )
1450            myAttrs[ 'requested_time' ]   = str( requested_time )
1451            myAttrs[ 'requested_memory' ] = str( requested_memory )
1452            myAttrs[ 'ppn' ]              = str( ppn )
1453            myAttrs[ 'status' ]           = str( status )
1454            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1455            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1456            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1457            myAttrs[ 'nodes' ]            = nodeslist
1458            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1459            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1460
1461            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1462
1463                self.jobs[ job_id ] = myAttrs
1464
1465        for id, attrs in self.jobs.items():
1466
1467            if id not in jobs_processed:
1468
1469                # This one isn't there anymore; toedeledoki!
1470                #
1471                del self.jobs[ id ]
1472
1473class SgeDataGatherer(DataGatherer):
1474
1475    jobs = {}
1476
1477    def __init__( self ):
1478        self.jobs = {}
1479        self.timeoffset = 0
1480        self.dp = DataProcessor()
1481
1482    def getJobData( self ):
1483        """Gather all data on current jobs in SGE"""
1484
1485        import popen2
1486
1487        self.cur_time = 0
1488        queues = ""
1489        if QUEUE:    # only for specific queues
1490            # Fixme:  assumes queue names don't contain single
1491            # quote or comma.  Don't know what the SGE rules are.
1492            queues = " -q '" + string.join (QUEUE, ",") + "'"
1493        # Note the comment in SgeQstatXMLParser about scaling with
1494        # this method of getting data.  I haven't found better one.
1495        # Output with args `-xml -ext -f -r' is easier to parse
1496        # in some ways, harder in others, but it doesn't provide
1497        # the submission time (at least SGE 6.0).  The pipeline
1498        # into sed corrects bogus XML observed with a configuration
1499        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1500        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
1501sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
1502                           + queues, True)
1503        qstatparser = SgeQstatXMLParser()
1504        parse_err = 0
1505        try:
1506            xml.sax.parse(piping.fromchild, qstatparser)
1507        except NoJobs:
1508            pass
1509        except:
1510            parse_err = 1
1511        if piping.wait():
1512            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
1513            return None
1514        elif parse_err:
1515            debug_msg(10, "Bad XML output from qstat"())
1516            exit (1)
1517        for f in piping.fromchild, piping.tochild, piping.childerr:
1518            f.close()
1519        self.cur_time = time.time()
1520        jobs_processed = []
1521        for job in qstatparser.joblist:
1522            job_id = job["number"]
1523            if job["status"] in [ 'Q', 'R' ]:
1524                jobs_processed.append(job_id)
1525            if job["status"] == "R":
1526                job["nodes"] = do_nodelist (job["nodes"])
1527                # Fixme: why is job["nodes"] sometimes null?
1528                try:
1529                    # Fixme: Is this sensible?  The
1530                    # PBS-type PPN isn't something you use
1531                    # with SGE.
1532                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
1533                except:
1534                    job["ppn"] = 0
1535                if DETECT_TIME_DIFFS:
1536                    # If a job start is later than our
1537                    # current date, that must mean
1538                    # the SGE server's time is later
1539                    # than our local time.
1540                    start_timestamp = int (job["start_timestamp"])
1541                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
1542
1543                        self.timeoffset    = start_timestamp - int(self.cur_time)
1544            else:
1545                # fixme: Note sure what this should be:
1546                job["ppn"] = job["RN_max"]
1547                job["nodes"] = "1"
1548
1549            myAttrs = {}
1550            for attr in ["name", "queue", "owner",
1551                     "requested_time", "status",
1552                     "requested_memory", "ppn",
1553                     "start_timestamp", "queued_timestamp"]:
1554                myAttrs[attr] = str(job[attr])
1555            myAttrs["nodes"] = job["nodes"]
1556            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
1557            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1558            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
1559
1560            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
1561                self.jobs[job_id] = myAttrs
1562        for id, attrs in self.jobs.items():
1563            if id not in jobs_processed:
1564                del self.jobs[id]
1565
1566# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1567# Requres LSFObject http://sourceforge.net/projects/lsfobject
1568#
1569class LsfDataGatherer(DataGatherer):
1570
1571    """This is the DataGatherer for LSf"""
1572
1573    global lsfObject
1574
1575    def __init__( self ):
1576
1577        self.jobs = { }
1578        self.timeoffset = 0
1579        self.dp = DataProcessor()
1580        self.initLsfQuery()
1581
1582    def _countDuplicatesInList( self, dupedList ):
1583
1584        countDupes    = { }
1585
1586        for item in dupedList:
1587
1588            if not countDupes.has_key( item ):
1589
1590                countDupes[ item ]    = 1
1591            else:
1592                countDupes[ item ]    = countDupes[ item ] + 1
1593
1594        dupeCountList    = [ ]
1595
1596        for item, count in countDupes.items():
1597
1598            dupeCountList.append( ( item, count ) )
1599
1600        return dupeCountList
1601#
1602#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1603#print _countDuplicatesInList(lst)
1604#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1605########################
1606
1607    def initLsfQuery( self ):
1608        self.pq = None
1609        self.pq = lsfObject.jobInfoEntObject()
1610
1611    def getJobData( self, known_jobs="" ):
1612        """Gather all data on current jobs in LSF"""
1613        if len( known_jobs ) > 0:
1614            jobs = known_jobs
1615        else:
1616            jobs = { }
1617        joblist = {}
1618        joblist = self.pq.getJobInfo()
1619        nodelist = ''
1620
1621        self.cur_time = time.time()
1622
1623        jobs_processed = [ ]
1624
1625        for name, attrs in joblist.items():
1626            job_id = str(name)
1627            jobs_processed.append( job_id )
1628            name = self.getAttr( attrs, 'jobName' )
1629            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1630            owner = self.getAttr( attrs, 'user' )
1631
1632### THIS IS THE rLimit List index values
1633#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1634#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1635#define LSF_RLIMIT_DATA     2        /* data size */
1636#define LSF_RLIMIT_STACK    3        /* stack size */
1637#define LSF_RLIMIT_CORE     4        /* core file size */
1638#define LSF_RLIMIT_RSS      5        /* resident set size */
1639#define LSF_RLIMIT_NOFILE   6        /* open files */
1640#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1641#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
1642#define LSF_RLIMIT_SWAP     8
1643#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1644#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1645#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1646#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
1647
1648            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1649            if requested_time == -1: 
1650                requested_time = ""
1651            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1652            if requested_memory == -1: 
1653                requested_memory = ""
1654# This tries to get proc per node. We don't support this right now
1655            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1656            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1657            if requested_cpus == None or requested_cpus == "":
1658                requested_cpus = 1
1659
1660            if QUEUE:
1661                for q in QUEUE:
1662                    if q == queue:
1663                        display_queue = 1
1664                        break
1665                    else:
1666                        display_queue = 0
1667                        continue
1668            if display_queue == 0:
1669                continue
1670
1671            runState = self.getAttr( attrs, 'status' )
1672            if runState == 4:
1673                status = 'R'
1674            else:
1675                status = 'Q'
1676            queued_timestamp = self.getAttr( attrs, 'submitTime' )
1677
1678            if status == 'R':
1679                start_timestamp = self.getAttr( attrs, 'startTime' )
1680                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1681                nodelist = nodesCpu.keys()
1682
1683                if DETECT_TIME_DIFFS:
1684
1685                    # If a job start if later than our current date,
1686                    # that must mean the Torque server's time is later
1687                    # than our local time.
1688
1689                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
1690
1691                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1692
1693            elif status == 'Q':
1694                start_timestamp = ''
1695                count_mynodes = 0
1696                numeric_node = 1
1697                nodelist = ''
1698
1699            myAttrs = { }
1700            if name == "":
1701                myAttrs['name'] = "none"
1702            else:
1703                myAttrs['name'] = name
1704
1705            myAttrs[ 'owner' ]        = owner
1706            myAttrs[ 'requested_time' ]    = str(requested_time)
1707            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1708            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1709            myAttrs[ 'ppn' ]        = str( ppn )
1710            myAttrs[ 'status' ]        = status
1711            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1712            myAttrs[ 'queue' ]        = str(queue)
1713            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1714            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1715            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1716            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1717            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
1718
1719            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1720                jobs[ job_id ] = myAttrs
1721
1722                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
1723
1724        for id, attrs in jobs.items():
1725            if id not in jobs_processed:
1726                # This one isn't there anymore
1727                #
1728                del jobs[ id ]
1729        self.jobs=jobs
1730
1731
1732class PbsDataGatherer( DataGatherer ):
1733
1734    """This is the DataGatherer for PBS and Torque"""
1735
1736    global PBSQuery, PBSError
1737
1738    def __init__( self ):
1739
1740        """Setup appropriate variables"""
1741
1742        self.jobs       = { }
1743        self.timeoffset = 0
1744        self.dp         = DataProcessor()
1745
1746        self.initPbsQuery()
1747
1748    def initPbsQuery( self ):
1749
1750        self.pq = None
1751
1752        try:
1753
1754            if( BATCH_SERVER ):
1755
1756                self.pq = PBSQuery( BATCH_SERVER )
1757            else:
1758                self.pq = PBSQuery()
1759
1760        except PBSError, details:
1761            print 'Cannot connect to pbs server'
1762            print details
1763            sys.exit( 1 )
1764
1765        try:
1766            # TODO: actually use new data structure
1767            self.pq.old_data_structure()
1768
1769        except AttributeError:
1770
1771            # pbs_query is older
1772            #
1773            pass
1774
1775    def getNodeData( self ):
1776
1777        nodedict = { }
1778
1779        try:
1780            nodedict = self.pq.getnodes()
1781
1782        except PBSError, detail:
1783
1784            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1785
1786        return nodedict
1787
1788    def getJobData( self ):
1789
1790        """Gather all data on current jobs in Torque"""
1791
1792        joblist            = {}
1793        self.cur_time      = 0
1794
1795        try:
1796            joblist        = self.pq.getjobs()
1797            self.cur_time  = time.time()
1798
1799        except (PBSError, TypeError), detail:
1800
1801            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1802            return None
1803
1804        jobs_processed    = [ ]
1805
1806        for name, attrs in joblist.items():
1807            display_queue = 1
1808            job_id        = name.split( '.' )[0]
1809
1810            name          = self.getAttr( attrs, 'Job_Name' )
1811            queue         = self.getAttr( attrs, 'queue' )
1812
1813            if QUEUE:
1814                for q in QUEUE:
1815                    if q == queue:
1816                        display_queue = 1
1817                        break
1818                    else:
1819                        display_queue = 0
1820                        continue
1821            if display_queue == 0:
1822                continue
1823
1824
1825            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
1826            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1827            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
1828
1829            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
1830
1831            ppn = ''
1832
1833            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
1834
1835                mynoderequest_fields = mynoderequest.split( ':' )
1836
1837                for mynoderequest_field in mynoderequest_fields:
1838
1839                    if mynoderequest_field.find( 'ppn' ) != -1:
1840
1841                        ppn = mynoderequest_field.split( 'ppn=' )[1]
1842
1843            status = self.getAttr( attrs, 'job_state' )
1844
1845            if status in [ 'Q', 'R' ]:
1846
1847                jobs_processed.append( job_id )
1848
1849            queued_timestamp = self.getAttr( attrs, 'ctime' )
1850
1851            if status == 'R':
1852
1853                start_timestamp = self.getAttr( attrs, 'mtime' )
1854                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
1855
1856                nodeslist       = do_nodelist( nodes )
1857
1858                if DETECT_TIME_DIFFS:
1859
1860                    # If a job start if later than our current date,
1861                    # that must mean the Torque server's time is later
1862                    # than our local time.
1863               
1864                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1865
1866                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1867
1868            elif status == 'Q':
1869
1870                # 'mynodequest' can be a string in the following syntax according to the
1871                # Torque Administator's manual:
1872                #
1873                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1874                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1875                # etc
1876                #
1877
1878                #
1879                # For now we only count the amount of nodes request and ignore properties
1880                #
1881
1882                start_timestamp = ''
1883                count_mynodes   = 0
1884
1885                for node in mynoderequest.split( '+' ):
1886
1887                    # Just grab the {node_count|hostname} part and ignore properties
1888                    #
1889                    nodepart     = node.split( ':' )[0]
1890
1891                    # Let's assume a node_count value
1892                    #
1893                    numeric_node = 1
1894
1895                    # Chop the value up into characters
1896                    #
1897                    for letter in nodepart:
1898
1899                        # If this char is not a digit (0-9), this must be a hostname
1900                        #
1901                        if letter not in string.digits:
1902
1903                            numeric_node = 0
1904
1905                    # If this is a hostname, just count this as one (1) node
1906                    #
1907                    if not numeric_node:
1908
1909                        count_mynodes = count_mynodes + 1
1910                    else:
1911
1912                        # If this a number, it must be the node_count
1913                        # and increase our count with it's value
1914                        #
1915                        try:
1916                            count_mynodes = count_mynodes + int( nodepart )
1917
1918                        except ValueError, detail:
1919
1920                            # When we arrive here I must be bugged or very confused
1921                            # THIS SHOULD NOT HAPPEN!
1922                            #
1923                            debug_msg( 10, str( detail ) )
1924                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1925                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1926                            debug_msg( 10, 'job = ' + str( name ) )
1927                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1928                       
1929                nodeslist       = str( count_mynodes )
1930            else:
1931                start_timestamp = ''
1932                nodeslist       = ''
1933
1934            myAttrs                = { }
1935
1936            myAttrs[ 'name' ]             = str( name )
1937            myAttrs[ 'queue' ]            = str( queue )
1938            myAttrs[ 'owner' ]            = str( owner )
1939            myAttrs[ 'requested_time' ]   = str( requested_time )
1940            myAttrs[ 'requested_memory' ] = str( requested_memory )
1941            myAttrs[ 'ppn' ]              = str( ppn )
1942            myAttrs[ 'status' ]           = str( status )
1943            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1944            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1945            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1946            myAttrs[ 'nodes' ]            = nodeslist
1947            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1948            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1949
1950            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1951
1952                self.jobs[ job_id ] = myAttrs
1953
1954        for id, attrs in self.jobs.items():
1955
1956            if id not in jobs_processed:
1957
1958                # This one isn't there anymore; toedeledoki!
1959                #
1960                del self.jobs[ id ]
1961
1962GMETRIC_DEFAULT_TYPE    = 'string'
1963GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1964GMETRIC_DEFAULT_PORT    = '8649'
1965GMETRIC_DEFAULT_UNITS   = ''
1966
1967class Gmetric:
1968
1969    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1970
1971    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1972    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1973    protocol        = ( 'udp', 'multicast' )
1974
1975    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1976               
1977        global GMETRIC_DEFAULT_TYPE
1978
1979        self.prot       = self.checkHostProtocol( host )
1980        self.data_msg   = xdrlib.Packer()
1981        self.meta_msg   = xdrlib.Packer()
1982        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1983
1984        if self.prot not in self.protocol:
1985
1986            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1987
1988        if self.prot == 'multicast':
1989
1990            # Set multicast options
1991            #
1992            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1993
1994        self.hostport   = ( host, int( port ) )
1995        self.slopestr   = 'both'
1996        self.tmax       = 60
1997
1998    def __del__( self ):
1999
2000        self.socket.close()
2001
2002    def checkHostProtocol( self, ip ):
2003
2004        """Detect if a ip adress is a multicast address"""
2005
2006        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
2007        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
2008
2009        ip_fields               = ip.split( '.' )
2010
2011        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
2012
2013            return 'multicast'
2014        else:
2015            return 'udp'
2016
2017    def send( self, name, value, dmax, typestr = '', units = '' ):
2018
2019        if len( units ) == 0:
2020            units       = GMETRIC_DEFAULT_UNITS
2021
2022        if len( typestr ) == 0:
2023            typestr     = GMETRIC_DEFAULT_TYPE
2024
2025        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
2026
2027        meta_rt = self.socket.sendto( meta_msg, self.hostport )
2028        data_rt = self.socket.sendto( data_msg, self.hostport )
2029
2030        return ( meta_rt, data_rt )
2031
2032    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
2033
2034        hostname = "unset"
2035
2036        if slopestr not in self.slope:
2037
2038            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
2039
2040        if typestr not in self.type:
2041
2042            raise ValueError( "Type must be one of: " + str( self.type ) )
2043
2044        if len( name ) == 0:
2045
2046            raise ValueError( "Name must be non-empty" )
2047
2048        self.meta_msg.reset()
2049        self.meta_msg.pack_int( 128 )
2050
2051        if not spoof:
2052            self.meta_msg.pack_string( hostname )
2053        else:
2054            self.meta_msg.pack_string( spoof )
2055
2056        self.meta_msg.pack_string( name )
2057
2058        if not spoof:
2059            self.meta_msg.pack_int( 0 )
2060        else:
2061            self.meta_msg.pack_int( 1 )
2062           
2063        self.meta_msg.pack_string( typestr )
2064        self.meta_msg.pack_string( name )
2065        self.meta_msg.pack_string( unitstr )
2066        self.meta_msg.pack_int( self.slope[ slopestr ] )
2067        self.meta_msg.pack_uint( int( tmax ) )
2068        self.meta_msg.pack_uint( int( dmax ) )
2069
2070        if not group:
2071            self.meta_msg.pack_int( 0 )
2072        else:
2073            self.meta_msg.pack_int( 1 )
2074            self.meta_msg.pack_string( "GROUP" )
2075            self.meta_msg.pack_string( group )
2076
2077        self.data_msg.reset()
2078        self.data_msg.pack_int( 128+5 )
2079
2080        if not spoof:
2081            self.data_msg.pack_string( hostname )
2082        else:
2083            self.data_msg.pack_string( spoof )
2084
2085        self.data_msg.pack_string( name )
2086
2087        if not spoof:
2088            self.data_msg.pack_int( 0 )
2089        else:
2090            self.data_msg.pack_int( 1 )
2091
2092        self.data_msg.pack_string( "%s" )
2093        self.data_msg.pack_string( str( value ) )
2094
2095        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2096
2097def printTime( ):
2098
2099    """Print current time/date in human readable format for log/debug"""
2100
2101    return time.strftime("%a, %d %b %Y %H:%M:%S")
2102
2103def debug_msg( level, msg ):
2104
2105    """Print msg if at or above current debug level"""
2106
2107    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
2108
2109    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2110        sys.stderr.write( msg + '\n' )
2111
2112    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2113        syslog.syslog( msg )
2114
2115def write_pidfile():
2116
2117    # Write pidfile if PIDFILE is set
2118    #
2119    if PIDFILE:
2120
2121        pid    = os.getpid()
2122
2123        pidfile    = open( PIDFILE, 'w' )
2124
2125        pidfile.write( str( pid ) )
2126        pidfile.close()
2127
2128def main():
2129
2130    """Application start"""
2131
2132    global PBSQuery, PBSError, lsfObject, pyslurm
2133    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
2134
2135    if not processArgs( sys.argv[1:] ):
2136
2137        sys.exit( 1 )
2138
2139    # Load appropriate DataGatherer depending on which BATCH_API is set
2140    # and any required modules for the Gatherer
2141    #
2142    if BATCH_API == 'pbs':
2143
2144        try:
2145            from PBSQuery import PBSQuery, PBSError
2146
2147        except ImportError, details:
2148
2149            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
2150            print details
2151            sys.exit( 1 )
2152
2153        gather = PbsDataGatherer()
2154
2155    elif BATCH_API == 'sge':
2156
2157        if BATCH_SERVER != 'localhost':
2158
2159            # Print and log, but continue execution
2160            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2161            print err_msg
2162            debug_msg( 0, err_msg )
2163
2164        # Tested with SGE 6.0u11.
2165        #
2166        gather = SgeDataGatherer()
2167
2168    elif BATCH_API == 'lsf':
2169
2170        if BATCH_SERVER != 'localhost':
2171
2172            # Print and log, but continue execution
2173            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2174            print err_msg
2175            debug_msg( 0, err_msg )
2176
2177        try:
2178            from lsfObject import lsfObject
2179        except:
2180            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2181            sys.exit( 1 )
2182
2183        gather = LsfDataGatherer()
2184
2185    elif BATCH_API == 'slurm':
2186
2187        if BATCH_SERVER != 'localhost':
2188
2189            # Print and log, but continue execution
2190            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2191            print err_msg
2192            debug_msg( 0, err_msg )
2193
2194        try:
2195            import pyslurm
2196        except:
2197            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
2198            sys.exit( 1 )
2199
2200        gather = SLURMDataGatherer()
2201
2202    else:
2203        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
2204
2205        sys.exit( 1 )
2206
2207    if( DAEMONIZE and USE_SYSLOG ):
2208
2209        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2210
2211    if DAEMONIZE:
2212
2213        gather.daemon()
2214    else:
2215        gather.run()
2216
2217# wh00t? someone started me! :)
2218#
2219if __name__ == '__main__':
2220    main()
Note: See TracBrowser for help on using the repository browser.