source: branches/1.1/jobmond/jobmond.py @ 944

Last change on this file since 944 was 944, checked in by ramonb, 10 years ago

jobmond/jobmond.py:

  • cgi escaping was not working: still errors occured
  • now just replace special characters with underscores
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 65.0 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
22# SVN $Id: jobmond.py 944 2014-01-16 14:28:14Z ramonb $
23#
24
25# vi :set ts=4
26
27import sys, getopt, ConfigParser, time, os, socket, string, re
28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
29from xml.sax.handler import feature_namespaces
30from collections import deque
31from glob import glob
32
33VERSION='__VERSION__'
34
35def usage( ver ):
36
37    print 'jobmond %s' %VERSION
38
39    if ver:
40        return 0
41
42    print
43    print 'Purpose:'
44    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
45    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
46    print
47    print 'Usage:    jobmond [OPTIONS]'
48    print
49    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
50    print '  -p, --pidfile=FILE    Use pid file to store the process id'
51    print '  -h, --help        Print help and exit'
52    print '  -v, --version      Print version and exit'
53    print
54
55def processArgs( args ):
56
57    SHORT_L      = 'p:hvc:'
58    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
59
60    global PIDFILE, JOBMOND_CONF
61    PIDFILE      = None
62
63    JOBMOND_CONF = '/etc/jobmond.conf'
64
65    try:
66
67        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
68
69    except getopt.GetoptError, detail:
70
71        print detail
72        usage( False )
73        sys.exit( 1 )
74
75    for opt, value in opts:
76
77        if opt in [ '--config', '-c' ]:
78       
79            JOBMOND_CONF = value
80
81        if opt in [ '--pidfile', '-p' ]:
82
83            PIDFILE      = value
84       
85        if opt in [ '--help', '-h' ]:
86 
87            usage( False )
88            sys.exit( 0 )
89
90        if opt in [ '--version', '-v' ]:
91
92            usage( True )
93            sys.exit( 0 )
94
95    return loadConfig( JOBMOND_CONF )
96
97html_escape_table = {
98    "&": "_",
99    '"': "_",
100    "'": "_",
101    ">": "_",
102    "<": "_",
103}
104
105# If this is too slow:
106# - re.sub(r'[\&|\>\<\'\"]','_', variable )
107# - re.sub(r'[\&\>\<\'\"]','_', variable )
108# - and even faster: compile regex
109
110def html_escape(text):
111    """Produce entities within text."""
112    return string.join((html_escape_table.get(c,c) for c in text), '')
113
114class GangliaConfigParser:
115
116    def __init__( self, filename ):
117
118        self.conf_lijst   = [ ]
119        self.conf_dict    = { }
120        self.filename     = filename
121        self.file_pointer = file( filename, 'r' )
122        self.lexx         = shlex.shlex( self.file_pointer )
123        self.lexx.whitespace_split = True
124
125        self.parse()
126
127    def __del__( self ):
128
129        """
130        Cleanup: close file descriptor
131        """
132
133        self.file_pointer.close()
134        del self.lexx
135        del self.conf_lijst
136
137    def removeQuotes( self, value ):
138
139        clean_value = value
140        clean_value = clean_value.replace( "'", "" )
141        clean_value = clean_value.replace( '"', '' )
142        clean_value = clean_value.strip()
143
144        return clean_value
145
146    def removeBraces( self, value ):
147
148        clean_value = value
149        clean_value = clean_value.replace( "(", "" )
150        clean_value = clean_value.replace( ')', '' )
151        clean_value = clean_value.strip()
152
153        return clean_value
154
155    def parse( self ):
156
157        """
158        Parse self.filename using shlex scanning.
159        - Removes /* comments */
160        - Traverses (recursively) through all include () statements
161        - Stores complete valid config tokens in self.conf_list
162
163        i.e.:
164            ['globals',
165             '{',
166             'daemonize',
167             '=',
168             'yes',
169             'setuid',
170             '=',
171             'yes',
172             'user',
173             '=',
174             'ganglia',
175             'debug_level',
176             '=',
177             '0',
178             <etc> ]
179        """
180
181        t = 'bogus'
182        c = False
183        i = False
184
185        while t != self.lexx.eof:
186            #print 'get token'
187            t = self.lexx.get_token()
188
189            if len( t ) >= 2:
190
191                if len( t ) >= 4:
192
193                    if t[:2] == '/*' and t[-2:] == '*/':
194
195                        #print 'comment line'
196                        #print 'skipping: %s' %t
197                        continue
198
199                if t == '/*' or t[:2] == '/*':
200                    c = True
201                    #print 'comment start'
202                    #print 'skipping: %s' %t
203                    continue
204
205                if t == '*/' or t[-2:] == '*/':
206                    c = False
207                    #print 'skipping: %s' %t
208                    #print 'comment end'
209                    continue
210
211            if c:
212                #print 'skipping: %s' %t
213                continue
214
215            if t == 'include':
216                i = True
217                #print 'include start'
218                #print 'skipping: %s' %t
219                continue
220
221            if i:
222
223                #print 'include start: %s' %t
224
225                t2 = self.removeQuotes( t )
226                t2 = self.removeBraces( t )
227
228                for in_file in glob( self.removeQuotes(t2) ):
229
230                    #print 'including file: %s' %in_file
231                    parse_infile = GangliaConfigParser( in_file )
232
233                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
234
235                    del parse_infile
236
237                i = False
238                #print 'include end'
239                #print 'skipping: %s' %t
240                continue
241
242            #print 'keep: %s' %t
243            self.conf_lijst.append( self.removeQuotes(t) )
244
245    def getConfLijst( self ):
246
247        return self.conf_lijst
248
249    def confListToDict( self, parent_list=None ):
250
251        """
252        Recursively traverses a conf_list and creates dictionary from it
253        """
254
255        new_dict = { }
256        count    = 0
257        skip     = 0
258
259        if not parent_list:
260            parent_list = self.conf_lijst
261
262        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
263
264        for n, c in enumerate( parent_list ):
265
266            count = count + 1
267
268            #print 'CL: n %d c %s' %(n, c)
269
270            if skip > 0:
271
272                #print '- skipped'
273                skip = skip - 1
274                continue
275
276            if (n+1) <= (len( parent_list )-1):
277
278                if parent_list[(n+1)] == '{':
279
280                    if not new_dict.has_key( c ):
281                        new_dict[ c ] = [ ]
282
283                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
284                    new_dict[ c ].append( temp_new_dict )
285
286                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
287
288                    if not new_dict.has_key( c ):
289                        new_dict[ c ] = [ ]
290
291                    new_dict[ c ].append( parent_list[ (n+2) ] )
292
293                    skip = 2
294
295                if parent_list[n] == '}':
296
297                    #print 'leaving confListToDict(): new dict = %s' %new_dict
298                    return (new_dict, count)
299
300    def makeConfDict( self ):
301
302        """
303        Walks through self.conf_list and creates a dictionary based upon config values
304
305        i.e.:
306            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
307                                                         'ip': ['"127.0.0.1"'],
308                                                         'mask': ['32']}]}],
309                                    'port': ['8649']}],
310            'udp_recv_channel': [{'port': ['8649']}],
311            'udp_send_channel': [{'host': ['145.101.32.3'],
312                                  'port': ['8649']},
313                                 {'host': ['145.101.32.207'],
314                                  'port': ['8649']}]}
315        """
316
317        new_dict = { }
318        skip     = 0
319
320        #print 'entering makeConfDict()'
321
322        for n, c in enumerate( self.conf_lijst ):
323
324            #print 'M: n %d c %s' %(n, c)
325
326            if skip > 0:
327
328                #print '- skipped'
329                skip = skip - 1
330                continue
331
332            if (n+1) <= (len( self.conf_lijst )-1):
333
334                if self.conf_lijst[(n+1)] == '{':
335
336                    if not new_dict.has_key( c ):
337                        new_dict[ c ] = [ ]
338
339                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
340                    new_dict[ c ].append( temp_new_dict )
341
342                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
343
344                    if not new_dict.has_key( c ):
345                        new_dict[ c ] = [ ]
346
347                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
348
349                    skip = 2
350
351        self.conf_dict = new_dict
352        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
353
354    def checkConfDict( self ):
355
356        if len( self.conf_lijst ) == 0:
357
358            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
359
360        if len( self.conf_dict ) == 0:
361
362            self.makeConfDict()
363
364    def getConfDict( self ):
365
366        self.checkConfDict()
367        return self.conf_dict
368
369    def getUdpSendChannels( self ):
370
371        self.checkConfDict()
372
373        udp_send_channels = [ ] # IP:PORT
374
375        if not self.conf_dict.has_key( 'udp_send_channel' ):
376            return None
377
378        for u in self.conf_dict[ 'udp_send_channel' ]:
379
380            if u.has_key( 'mcast_join' ):
381
382                ip = u['mcast_join'][0]
383
384            elif u.has_key( 'host' ):
385
386                ip = u['host'][0]
387
388            port = u['port'][0]
389
390            udp_send_channels.append( ( ip, port ) )
391
392        if len( udp_send_channels ) == 0:
393            return None
394
395        return udp_send_channels
396
397    def getSectionLastOption( self, section, option ):
398
399        """
400        Get last option set in a config section that could be set multiple times in multiple (include) files.
401
402        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
403        """
404
405        self.checkConfDict()
406        value = None
407
408        if not self.conf_dict.has_key( section ):
409
410            return None
411
412        # Could be set multiple times in multiple (include) files: get last one set
413        for c in self.conf_dict[ section ]:
414
415                if c.has_key( option ):
416
417                    value = c[ option ][0]
418
419        return value
420
421    def getClusterName( self ):
422
423        return self.getSectionLastOption( 'cluster', 'name' )
424
425    def getVal( self, section, option ):
426
427        return self.getSectionLastOption( section, option )
428
429    def getInt( self, section, valname ):
430
431        value    = self.getVal( section, valname )
432
433        if not value:
434            return None
435
436        return int( value )
437
438    def getStr( self, section, valname ):
439
440        value    = self.getVal( section, valname )
441
442        if not value:
443            return None
444
445        return str( value )
446
447def findGmetric():
448
449    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
450
451        guess    = '%s/%s' %( dir, 'gmetric' )
452
453        if os.path.exists( guess ):
454
455            return guess
456
457    return False
458
459def loadConfig( filename ):
460
461    def getlist( cfg_string ):
462
463        my_list = [ ]
464
465        for item_txt in cfg_string.split( ',' ):
466
467            sep_char = None
468
469            item_txt = item_txt.strip()
470
471            for s_char in [ "'", '"' ]:
472
473                if item_txt.find( s_char ) != -1:
474
475                    if item_txt.count( s_char ) != 2:
476
477                        print 'Missing quote: %s' %item_txt
478                        sys.exit( 1 )
479
480                    else:
481
482                        sep_char = s_char
483                        break
484
485            if sep_char:
486
487                item_txt = item_txt.split( sep_char )[1]
488
489            my_list.append( item_txt )
490
491        return my_list
492
493    if not os.path.isfile( JOBMOND_CONF ):
494
495        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
496        sys.exit( 1 )
497
498    try:
499        f = open( JOBMOND_CONF, 'r' )
500    except IOError, detail:
501        print "Cannot read config file: '%s'" %JOBMOND_CONF
502        sys.exit( 1 )
503    else:
504        f.close()
505
506    cfg        = ConfigParser.ConfigParser()
507
508    cfg.read( filename )
509
510    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
511    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
512    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
513    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
514    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
515
516    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
517
518    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
519
520    SYSLOG_LEVEL    = -1
521    SYSLOG_FACILITY = None
522
523    try:
524        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
525
526    except ConfigParser.NoOptionError:
527
528        USE_SYSLOG  = True
529
530        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
531
532    if USE_SYSLOG:
533
534        try:
535            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
536
537        except ConfigParser.NoOptionError:
538
539            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
540            SYSLOG_LEVEL = 0
541
542        try:
543
544            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
545
546        except ConfigParser.NoOptionError:
547
548            SYSLOG_FACILITY = syslog.LOG_DAEMON
549
550            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
551
552    try:
553
554        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
555
556    except ConfigParser.NoOptionError:
557
558        # Not required for all API's: only pbs api allows remote connections
559        BATCH_SERVER = None
560
561    try:
562   
563        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
564
565    except ConfigParser.NoOptionError:
566
567        # Backwards compatibility for old configs
568        #
569
570        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
571        api_guess           = 'pbs'
572   
573    try:
574
575        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
576
577    except ConfigParser.NoOptionError:
578
579        # Not specified: assume /etc/ganglia/gmond.conf
580        #
581        GMOND_CONF          = '/etc/ganglia/gmond.conf'
582
583    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
584    GMETRIC_TARGET          = None
585
586    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
587
588    if not GMOND_UDP_SEND_CHANNELS:
589
590        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
591
592        # Couldn't figure it out: let's see if it's in our jobmond.conf
593        #
594        try:
595
596            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
597
598        # Guess not: now just give up
599       
600        except ConfigParser.NoOptionError:
601
602            GMETRIC_TARGET    = None
603
604            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
605
606            gmetric_bin    = findGmetric()
607
608            if gmetric_bin:
609
610                GMETRIC_BINARY     = gmetric_bin
611            else:
612                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
613
614                try:
615
616                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
617
618                except ConfigParser.NoOptionError:
619
620                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
621                    sys.exit( 1 )
622
623    #TODO: is this really still needed or should be automatic
624    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
625
626    try:
627        BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
628
629    except ConfigParser.NoOptionError:
630
631        BATCH_HOST_TRANSLATE = [ ]
632        pass
633
634    try:
635
636        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
637
638    except ConfigParser.NoOptionError, detail:
639
640        print "FATAL ERROR: BATCH_API not set"
641        sys.exit( 1 )
642
643    try:
644
645        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
646
647    except ConfigParser.NoOptionError, detail:
648
649        QUEUE        = None
650
651    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
652
653    return True
654
655def fqdn_parts (fqdn):
656
657    """Return pair of host and domain for fully-qualified domain name arg."""
658
659    parts = fqdn.split (".")
660
661    return (parts[0], string.join(parts[1:], "."))
662
663class DataProcessor:
664
665    """Class for processing of data"""
666
667    binary = None
668
669    def __init__( self, binary=None ):
670
671        """Remember alternate binary location if supplied"""
672
673        global GMETRIC_BINARY, GMOND_CONF
674
675        if binary:
676            self.binary = binary
677
678        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
679            self.binary = GMETRIC_BINARY
680
681        # Timeout for XML
682        #
683        # From ganglia's documentation:
684        #
685        # 'A metric will be deleted DMAX seconds after it is received, and
686        # DMAX=0 means eternal life.'
687
688        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
689
690        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
691
692            incompatible = self.checkGmetricVersion()
693
694            if incompatible:
695
696                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
697                sys.exit( 1 )
698
699    def checkGmetricVersion( self ):
700
701        """
702        Check version of gmetric is at least 3.3.8
703        for the syntax we use
704        """
705
706        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
707
708        incompatible    = 0
709
710        gfp        = os.popen( self.binary + ' --version' )
711        lines      = gfp.readlines()
712
713        gfp.close()
714
715        for line in lines:
716
717            line = line.split( ' ' )
718
719            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
720           
721                gmetric_version    = line[1].split( '\n' )[0]
722
723                version_major    = int( gmetric_version.split( '.' )[0] )
724                version_minor    = int( gmetric_version.split( '.' )[1] )
725                version_patch    = int( gmetric_version.split( '.' )[2] )
726
727                incompatible    = 0
728
729                if version_major < 3:
730
731                    incompatible = 1
732               
733                elif version_major == 3:
734
735                    if version_minor < 3:
736
737                        incompatible = 1
738
739                    elif version_patch < 8:
740
741                        incompatible = 1
742
743        return incompatible
744
745    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
746
747        """Call gmetric binary and multicast"""
748
749        cmd = self.binary
750
751        if GMOND_UDP_SEND_CHANNELS:
752
753            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
754
755                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
756
757                debug_msg( 10, printTime() + ' ' + metric_debug)
758
759                gm = Gmetric( c_ip, c_port )
760
761                gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
762
763        elif GMETRIC_TARGET:
764
765            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
766            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
767
768            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
769
770            debug_msg( 10, printTime() + ' ' + metric_debug)
771
772            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
773
774            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
775
776        else:
777            try:
778                cmd = cmd + ' -c' + GMOND_CONF
779
780            except NameError:
781
782                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
783
784            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
785
786            if len( units ) > 0:
787
788                cmd = cmd + ' -u"' + units + '"'
789
790            debug_msg( 10, printTime() + ' ' + cmd )
791
792            os.system( cmd )
793
794class DataGatherer:
795
796    """Skeleton class for batch system DataGatherer"""
797
798    def printJobs( self, jobs ):
799
800        """Print a jobinfo overview"""
801
802        for name, attrs in self.jobs.items():
803
804            print 'job %s' %(name)
805
806            for name, val in attrs.items():
807
808                print '\t%s = %s' %( name, val )
809
810    def printJob( self, jobs, job_id ):
811
812        """Print job with job_id from jobs"""
813
814        print 'job %s' %(job_id)
815
816        for name, val in jobs[ job_id ].items():
817
818            print '\t%s = %s' %( name, val )
819
820    def getAttr( self, attrs, name ):
821
822        """Return certain attribute from dictionary, if exists"""
823
824        if attrs.has_key( name ):
825
826            return attrs[ name ]
827        else:
828            return ''
829
830    def jobDataChanged( self, jobs, job_id, attrs ):
831
832        """Check if job with attrs and job_id in jobs has changed"""
833
834        if jobs.has_key( job_id ):
835
836            oldData = jobs[ job_id ]   
837        else:
838            return 1
839
840        for name, val in attrs.items():
841
842            if oldData.has_key( name ):
843
844                if oldData[ name ] != attrs[ name ]:
845
846                    return 1
847
848            else:
849                return 1
850
851        return 0
852
853    def submitJobData( self ):
854
855        """Submit job info list"""
856
857        global BATCH_API
858
859        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
860
861        running_jobs = 0
862        queued_jobs  = 0
863
864        # Count how many running/queued jobs we found
865        #
866        for jobid, jobattrs in self.jobs.items():
867
868            if jobattrs[ 'status' ] == 'Q':
869
870                queued_jobs += 1
871
872            elif jobattrs[ 'status' ] == 'R':
873
874                running_jobs += 1
875
876        # Report running/queued jobs as seperate metric for a nice RRD graph
877        #
878        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
879        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
880
881        # Report down/offline nodes in batch (PBS only ATM)
882        #
883        if BATCH_API in [ 'pbs', 'slurm' ]:
884
885            domain        = fqdn_parts( socket.getfqdn() )[1]
886
887            downed_nodes  = list()
888            offline_nodes = list()
889       
890            l        = ['state']
891
892            nodelist = self.getNodeData()
893
894            for name, node in nodelist.items():
895
896                if ( node[ 'state' ].find( "down" ) != -1 ):
897
898                    downed_nodes.append( name )
899
900                if ( node[ 'state' ].find( "offline" ) != -1 ):
901
902                    offline_nodes.append( name )
903
904            downnodeslist    = do_nodelist( downed_nodes )
905            offlinenodeslist = do_nodelist( offline_nodes )
906
907            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
908            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
909            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
910            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
911
912        # Now let's spread the knowledge
913        #
914        for jobid, jobattrs in self.jobs.items():
915
916            # Make gmetric values for each job: respect max gmetric value length
917            #
918            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
919            metric_increment    = 0
920
921            # If we have more job info than max gmetric value length allows, split it up
922            # amongst multiple metrics
923            #
924            for val in gmetric_val:
925
926                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
927                self.dp.multicastGmetric( metric_name, val )
928
929                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
930                #
931                metric_increment    = metric_increment + 1
932
933    def compileGmetricVal( self, jobid, jobattrs ):
934
935        """Create a val string for gmetric of jobinfo"""
936
937        gval_lists    = [ ]
938        val_list    = { }
939
940        for val_name, val_value in jobattrs.items():
941
942            # These are our own metric names, i.e.: status, start_timestamp, etc
943            #
944            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
945
946            # These are their corresponding values
947            #
948            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
949
950            if val_name == 'nodes' and jobattrs['status'] == 'R':
951
952                node_str = None
953
954                for node in val_value:
955
956                    if node_str:
957
958                        node_str = node_str + ';' + node
959                    else:
960                        node_str = node
961
962                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
963                    #
964                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
965
966                        # It's too big, we need to make a new gmetric for the additional info
967                        #
968                        val_list[ val_name ]    = node_str
969
970                        gval_lists.append( val_list )
971
972                        val_list        = { }
973                        node_str        = None
974
975                val_list[ val_name ]    = node_str
976
977                gval_lists.append( val_list )
978
979                val_list        = { }
980
981            elif val_value != '':
982
983                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
984                #
985                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
986
987                    # It's too big, we need to make a new gmetric for the additional info
988                    #
989                    gval_lists.append( val_list )
990
991                    val_list        = { }
992
993                val_list[ val_name ]    = val_value
994
995        if len( val_list ) > 0:
996
997            gval_lists.append( val_list )
998
999        str_list    = [ ]
1000
1001        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
1002        #
1003        for val_list in gval_lists:
1004
1005            my_val_str    = None
1006
1007            for val_name, val_value in val_list.items():
1008
1009                if type(val_value) == list:
1010
1011                    val_value    = val_value.join( ',' )
1012
1013                if my_val_str:
1014
1015                    try:
1016                        # fixme: It's getting
1017                        # ('nodes', None) items
1018                        my_val_str = my_val_str + ' ' + val_name + '=' + html_escape(val_value)
1019                    except:
1020                        pass
1021
1022                else:
1023                    my_val_str = val_name + '=' + html_escape(val_value)
1024
1025            str_list.append( my_val_str )
1026
1027        return str_list
1028
1029    def daemon( self ):
1030
1031        """Run as daemon forever"""
1032
1033        # Fork the first child
1034        #
1035        pid = os.fork()
1036        if pid > 0:
1037            sys.exit(0)  # end parent
1038
1039        # creates a session and sets the process group ID
1040        #
1041        os.setsid()
1042
1043        # Fork the second child
1044        #
1045        pid = os.fork()
1046        if pid > 0:
1047            sys.exit(0)  # end parent
1048
1049        write_pidfile()
1050
1051        # Go to the root directory and set the umask
1052        #
1053        os.chdir('/')
1054        os.umask(0)
1055
1056        sys.stdin.close()
1057        sys.stdout.close()
1058        sys.stderr.close()
1059
1060        os.open('/dev/null', os.O_RDWR)
1061        os.dup2(0, 1)
1062        os.dup2(0, 2)
1063
1064        self.run()
1065
1066    def run( self ):
1067
1068        """Main thread"""
1069
1070        while ( 1 ):
1071       
1072            self.getJobData()
1073            self.submitJobData()
1074            time.sleep( BATCH_POLL_INTERVAL )   
1075
1076# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1077# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1078# with 6.2.  See also the fixmes.
1079
1080class NoJobs (Exception):
1081    """Exception raised by empty job list in qstat output."""
1082    pass
1083
1084class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
1085    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
1086
1087    def __init__(self):
1088        self.value = ""
1089        self.joblist = []
1090        self.job = {}
1091        self.queue = ""
1092        self.in_joblist = False
1093        self.lrequest = False
1094        self.eltq = deque()
1095        xml.sax.handler.ContentHandler.__init__(self)
1096
1097    # The structure of the output is as follows (for SGE 6.0).  It's
1098    # similar for 6.1, but radically different for 6.2, and is
1099    # undocumented generally.  Unfortunately it's voluminous, and probably
1100    # doesn't scale to large clusters/queues.
1101
1102    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1103    #   <djob_info>
1104    #     <qmaster_response>  <!-- job -->
1105    #       ...
1106    #       <JB_ja_template> 
1107    #     <ulong_sublist>
1108    #     ...         <!-- start_time, state ... -->
1109    #     </ulong_sublist>
1110    #       </JB_ja_template> 
1111    #       <JB_ja_tasks>
1112    #     <ulong_sublist>
1113    #       ...       <!-- task info
1114    #     </ulong_sublist>
1115    #     ...
1116    #       </JB_ja_tasks>
1117    #       ...
1118    #     </qmaster_response>
1119    #   </djob_info>
1120    #   <messages>
1121    #   ...
1122
1123    # NB.  We might treat each task as a separate job, like
1124    # straight qstat output, but the web interface expects jobs to
1125    # be identified by integers, not, say, <job number>.<task>.
1126
1127    # So, I lied.  If the job list is empty, we get invalid XML
1128    # like this, which we need to defend against:
1129
1130    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1131    #   <>
1132    #     <ST_name>*</ST_name>
1133    #   </>
1134    # </unknown_jobs>
1135
1136    def startElement(self, name, attrs):
1137        self.value = ""
1138        if name == "djob_info":    # job list
1139            self.in_joblist = True
1140        # The job container is "qmaster_response" in SGE 6.0
1141        # and 6.1, but "element" in 6.2.  This is only the very
1142        # start of what's necessary for 6.2, though (sigh).
1143        elif (name == "qmaster_response" or name == "element") \
1144                and self.eltq[-1] == "djob_info": # job
1145            self.job = {"job_state": "U", "slots": 0,
1146                    "nodes": [], "queued_timestamp": "",
1147                    "queued_timestamp": "", "queue": "",
1148                    "ppn": "0", "RN_max": 0,
1149                    # fixme in endElement
1150                    "requested_memory": 0, "requested_time": 0
1151                    }
1152            self.joblist.append(self.job)
1153        elif name == "qstat_l_requests": # resource request
1154            self.lrequest = True
1155        elif name == "unknown_jobs":
1156            raise NoJobs
1157        self.eltq.append (name)
1158
1159    def characters(self, ch):
1160        self.value += ch
1161
1162    def endElement(self, name): 
1163        """Snarf job elements contents into job dictionary.
1164           Translate keys if appropriate."""
1165
1166        name_trans = {
1167          "JB_job_number": "number",
1168          "JB_job_name": "name", "JB_owner": "owner",
1169          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1170          "JB_submission_time": "queued_timestamp"
1171          }
1172        value = self.value
1173        self.eltq.pop ()
1174
1175        if name == "djob_info":
1176            self.in_joblist = False
1177            self.job = {}
1178        elif name == "JAT_master_queue":
1179            self.job["queue"] = value.split("@")[0]
1180        elif name == "JG_qhostname":
1181            if not (value in self.job["nodes"]):
1182                self.job["nodes"].append(value)
1183        elif name == "JG_slots": # slots in use
1184            self.job["slots"] += int(value)
1185        elif name == "RN_max": # requested slots (tasks or parallel)
1186            self.job["RN_max"] = max (self.job["RN_max"],
1187                          int(value))
1188        elif name == "JAT_state": # job state (bitwise or)
1189            value = int (value)
1190            # Status values from sge_jobL.h
1191            #define JIDLE           0x00000000
1192            #define JHELD           0x00000010
1193            #define JMIGRATING          0x00000020
1194            #define JQUEUED         0x00000040
1195            #define JRUNNING        0x00000080
1196            #define JSUSPENDED          0x00000100
1197            #define JTRANSFERING        0x00000200
1198            #define JDELETED        0x00000400
1199            #define JWAITING        0x00000800
1200            #define JEXITING        0x00001000
1201            #define JWRITTEN        0x00002000
1202            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1203            #define JFINISHED           0x00010000
1204            if value & 0x80:
1205                self.job["status"] = "R"
1206            elif value & 0x40:
1207                self.job["status"] = "Q"
1208            else:
1209                self.job["status"] = "O" # `other'
1210        elif name == "CE_name" and self.lrequest and self.value in \
1211                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1212            # We're in a container for an interesting resource
1213            # request; record which type.
1214            self.lrequest = self.value
1215        elif name == "CE_doubleval" and self.lrequest:
1216            # if we're in a container for an interesting
1217            # resource request, use the maxmimum of the hard
1218            # and soft requests to record the requested CPU
1219            # or core.  Fixme:  I'm not sure if this logic is
1220            # right.
1221            if self.lrequest in ("h_core", "s_core"):
1222                self.job["requested_memory"] = \
1223                    max (float (value),
1224                     self.job["requested_memory"])
1225            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1226            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1227                self.job["requested_time"] = \
1228                    max (float (value),
1229                     self.job["requested_time"])
1230        elif name == "qstat_l_requests":
1231            self.lrequest = False
1232        elif self.job and self.in_joblist:
1233            if name in name_trans:
1234                name = name_trans[name]
1235                self.job[name] = value
1236
1237# Abstracted from PBS original.
1238# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
1239#
1240def do_nodelist( nodes ):
1241
1242    """Translate node list as appropriate."""
1243
1244    nodeslist        = [ ]
1245    my_domain        = fqdn_parts( socket.getfqdn() )[1]
1246
1247    for node in nodes:
1248
1249        host        = node.split( '/' )[0] # not relevant for SGE
1250        h, host_domain    = fqdn_parts(host)
1251
1252        if host_domain == my_domain:
1253
1254            host    = h
1255
1256        if nodeslist.count( host ) == 0:
1257
1258            for translate_pattern in BATCH_HOST_TRANSLATE:
1259
1260                if translate_pattern.find( '/' ) != -1:
1261
1262                    translate_orig    = \
1263                        translate_pattern.split( '/' )[1]
1264                    translate_new    = \
1265                        translate_pattern.split( '/' )[2]
1266                    host = re.sub( translate_orig,
1267                               translate_new, host )
1268            if not host in nodeslist:
1269                nodeslist.append( host )
1270    return nodeslist
1271
1272class SLURMDataGatherer( DataGatherer ):
1273
1274    global pyslurm
1275
1276    """This is the DataGatherer for SLURM"""
1277
1278    def __init__( self ):
1279
1280        """Setup appropriate variables"""
1281
1282        self.jobs       = { }
1283        self.timeoffset = 0
1284        self.dp         = DataProcessor()
1285
1286    def getNodeData( self ):
1287
1288        slurm_type  = pyslurm.node()
1289
1290        slurm_nodes = slurm_type.get()
1291
1292        nodedict    = { }
1293
1294        for node, attrs in slurm_nodes.items():
1295
1296            ( num_state, name_state ) = attrs['node_state'] 
1297
1298            if name_state == 'DOWN':
1299
1300                nodedict[ node ] = { 'state' : 'down' }
1301
1302            elif name_state == 'DRAIN':
1303
1304                nodedict[ node ] = { 'state' : 'offline' }
1305
1306        return nodedict
1307
1308    def getJobData( self ):
1309
1310        """Gather all data on current jobs"""
1311
1312        joblist            = {}
1313
1314        self.cur_time  = time.time()
1315
1316        slurm_type = pyslurm.job()
1317        joblist    = slurm_type.get()
1318
1319        jobs_processed    = [ ]
1320
1321        for name, attrs in joblist.items():
1322            display_queue = 1
1323            job_id        = name
1324
1325            name          = self.getAttr( attrs, 'name' )
1326            queue         = self.getAttr( attrs, 'partition' )
1327
1328            if QUEUE:
1329                for q in QUEUE:
1330                    if q == queue:
1331                        display_queue = 1
1332                        break
1333                    else:
1334                        display_queue = 0
1335                        continue
1336            if display_queue == 0:
1337                continue
1338
1339            owner_uid        = attrs[ 'user_id' ]
1340            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1341
1342            requested_time   = self.getAttr( attrs, 'time_limit' )
1343            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
1344
1345            if min_memory == 0:
1346
1347                requested_memory = ''
1348
1349            else:
1350                requested_memory = min_memory
1351
1352            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
1353
1354            if min_cpus == 0:
1355
1356                ppn = ''
1357
1358            else:
1359                ppn = min_cpus
1360
1361            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1362
1363            status = 'Q'
1364
1365            if status_long == 'RUNNING':
1366
1367                status = 'R'
1368
1369            elif status_long == 'COMPLETED':
1370
1371                continue
1372
1373            jobs_processed.append( job_id )
1374
1375            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1376
1377            start_timestamp = ''
1378            nodeslist       = ''
1379
1380            if status == 'R':
1381
1382                start_timestamp = self.getAttr( attrs, 'start_time' )
1383                nodes           = attrs[ 'nodes' ]
1384
1385                if not nodes:
1386
1387                    # This should not happen
1388
1389                    # Something wrong: running but 'nodes' returned empty by pyslurm
1390                    # Possible pyslurm bug: abort/quit/warning
1391
1392                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1393
1394                    print err_msg
1395                    debug_msg( 0, err_msg )
1396                    sys.exit(1)
1397
1398                my_nodelist = [ ]
1399
1400                slurm_hostlist  = pyslurm.hostlist()
1401                slurm_hostlist.create( nodes )
1402                slurm_hostlist.uniq()
1403
1404                while slurm_hostlist.count() > 0:
1405
1406                    my_nodelist.append( slurm_hostlist.pop() )
1407
1408                slurm_hostlist.destroy()
1409
1410                del slurm_hostlist
1411
1412                nodeslist       = do_nodelist( my_nodelist )
1413
1414                if DETECT_TIME_DIFFS:
1415
1416                    # If a job start if later than our current date,
1417                    # that must mean the Torque server's time is later
1418                    # than our local time.
1419               
1420                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1421
1422                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1423
1424            elif status == 'Q':
1425
1426                nodeslist       = str( attrs[ 'num_nodes' ] )
1427
1428            else:
1429                start_timestamp = ''
1430                nodeslist       = ''
1431
1432            myAttrs                = { }
1433
1434            myAttrs[ 'name' ]             = str( name )
1435            myAttrs[ 'queue' ]            = str( queue )
1436            myAttrs[ 'owner' ]            = str( owner )
1437            myAttrs[ 'requested_time' ]   = str( requested_time )
1438            myAttrs[ 'requested_memory' ] = str( requested_memory )
1439            myAttrs[ 'ppn' ]              = str( ppn )
1440            myAttrs[ 'status' ]           = str( status )
1441            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1442            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1443            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1444            myAttrs[ 'nodes' ]            = nodeslist
1445            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1446            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1447
1448            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1449
1450                self.jobs[ job_id ] = myAttrs
1451
1452        for id, attrs in self.jobs.items():
1453
1454            if id not in jobs_processed:
1455
1456                # This one isn't there anymore; toedeledoki!
1457                #
1458                del self.jobs[ id ]
1459
1460class SgeDataGatherer(DataGatherer):
1461
1462    jobs = {}
1463
1464    def __init__( self ):
1465        self.jobs = {}
1466        self.timeoffset = 0
1467        self.dp = DataProcessor()
1468
1469    def getJobData( self ):
1470        """Gather all data on current jobs in SGE"""
1471
1472        import popen2
1473
1474        self.cur_time = 0
1475        queues = ""
1476        if QUEUE:    # only for specific queues
1477            # Fixme:  assumes queue names don't contain single
1478            # quote or comma.  Don't know what the SGE rules are.
1479            queues = " -q '" + string.join (QUEUE, ",") + "'"
1480        # Note the comment in SgeQstatXMLParser about scaling with
1481        # this method of getting data.  I haven't found better one.
1482        # Output with args `-xml -ext -f -r' is easier to parse
1483        # in some ways, harder in others, but it doesn't provide
1484        # the submission time (at least SGE 6.0).  The pipeline
1485        # into sed corrects bogus XML observed with a configuration
1486        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1487        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
1488sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
1489                           + queues, True)
1490        qstatparser = SgeQstatXMLParser()
1491        parse_err = 0
1492        try:
1493            xml.sax.parse(piping.fromchild, qstatparser)
1494        except NoJobs:
1495            pass
1496        except:
1497            parse_err = 1
1498        if piping.wait():
1499            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
1500            return None
1501        elif parse_err:
1502            debug_msg(10, "Bad XML output from qstat"())
1503            exit (1)
1504        for f in piping.fromchild, piping.tochild, piping.childerr:
1505            f.close()
1506        self.cur_time = time.time()
1507        jobs_processed = []
1508        for job in qstatparser.joblist:
1509            job_id = job["number"]
1510            if job["status"] in [ 'Q', 'R' ]:
1511                jobs_processed.append(job_id)
1512            if job["status"] == "R":
1513                job["nodes"] = do_nodelist (job["nodes"])
1514                # Fixme: why is job["nodes"] sometimes null?
1515                try:
1516                    # Fixme: Is this sensible?  The
1517                    # PBS-type PPN isn't something you use
1518                    # with SGE.
1519                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
1520                except:
1521                    job["ppn"] = 0
1522                if DETECT_TIME_DIFFS:
1523                    # If a job start is later than our
1524                    # current date, that must mean
1525                    # the SGE server's time is later
1526                    # than our local time.
1527                    start_timestamp = int (job["start_timestamp"])
1528                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
1529
1530                        self.timeoffset    = start_timestamp - int(self.cur_time)
1531            else:
1532                # fixme: Note sure what this should be:
1533                job["ppn"] = job["RN_max"]
1534                job["nodes"] = "1"
1535
1536            myAttrs = {}
1537            for attr in ["name", "queue", "owner",
1538                     "requested_time", "status",
1539                     "requested_memory", "ppn",
1540                     "start_timestamp", "queued_timestamp"]:
1541                myAttrs[attr] = str(job[attr])
1542            myAttrs["nodes"] = job["nodes"]
1543            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
1544            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1545            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
1546
1547            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
1548                self.jobs[job_id] = myAttrs
1549        for id, attrs in self.jobs.items():
1550            if id not in jobs_processed:
1551                del self.jobs[id]
1552
1553# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1554# Requres LSFObject http://sourceforge.net/projects/lsfobject
1555#
1556class LsfDataGatherer(DataGatherer):
1557
1558    """This is the DataGatherer for LSf"""
1559
1560    global lsfObject
1561
1562    def __init__( self ):
1563
1564        self.jobs = { }
1565        self.timeoffset = 0
1566        self.dp = DataProcessor()
1567        self.initLsfQuery()
1568
1569    def _countDuplicatesInList( self, dupedList ):
1570
1571        countDupes    = { }
1572
1573        for item in dupedList:
1574
1575            if not countDupes.has_key( item ):
1576
1577                countDupes[ item ]    = 1
1578            else:
1579                countDupes[ item ]    = countDupes[ item ] + 1
1580
1581        dupeCountList    = [ ]
1582
1583        for item, count in countDupes.items():
1584
1585            dupeCountList.append( ( item, count ) )
1586
1587        return dupeCountList
1588#
1589#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1590#print _countDuplicatesInList(lst)
1591#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1592########################
1593
1594    def initLsfQuery( self ):
1595        self.pq = None
1596        self.pq = lsfObject.jobInfoEntObject()
1597
1598    def getJobData( self, known_jobs="" ):
1599        """Gather all data on current jobs in LSF"""
1600        if len( known_jobs ) > 0:
1601            jobs = known_jobs
1602        else:
1603            jobs = { }
1604        joblist = {}
1605        joblist = self.pq.getJobInfo()
1606        nodelist = ''
1607
1608        self.cur_time = time.time()
1609
1610        jobs_processed = [ ]
1611
1612        for name, attrs in joblist.items():
1613            job_id = str(name)
1614            jobs_processed.append( job_id )
1615            name = self.getAttr( attrs, 'jobName' )
1616            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1617            owner = self.getAttr( attrs, 'user' )
1618
1619### THIS IS THE rLimit List index values
1620#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1621#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1622#define LSF_RLIMIT_DATA     2        /* data size */
1623#define LSF_RLIMIT_STACK    3        /* stack size */
1624#define LSF_RLIMIT_CORE     4        /* core file size */
1625#define LSF_RLIMIT_RSS      5        /* resident set size */
1626#define LSF_RLIMIT_NOFILE   6        /* open files */
1627#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1628#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
1629#define LSF_RLIMIT_SWAP     8
1630#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1631#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1632#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1633#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
1634
1635            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1636            if requested_time == -1: 
1637                requested_time = ""
1638            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1639            if requested_memory == -1: 
1640                requested_memory = ""
1641# This tries to get proc per node. We don't support this right now
1642            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1643            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1644            if requested_cpus == None or requested_cpus == "":
1645                requested_cpus = 1
1646
1647            if QUEUE:
1648                for q in QUEUE:
1649                    if q == queue:
1650                        display_queue = 1
1651                        break
1652                    else:
1653                        display_queue = 0
1654                        continue
1655            if display_queue == 0:
1656                continue
1657
1658            runState = self.getAttr( attrs, 'status' )
1659            if runState == 4:
1660                status = 'R'
1661            else:
1662                status = 'Q'
1663            queued_timestamp = self.getAttr( attrs, 'submitTime' )
1664
1665            if status == 'R':
1666                start_timestamp = self.getAttr( attrs, 'startTime' )
1667                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1668                nodelist = nodesCpu.keys()
1669
1670                if DETECT_TIME_DIFFS:
1671
1672                    # If a job start if later than our current date,
1673                    # that must mean the Torque server's time is later
1674                    # than our local time.
1675
1676                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
1677
1678                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1679
1680            elif status == 'Q':
1681                start_timestamp = ''
1682                count_mynodes = 0
1683                numeric_node = 1
1684                nodelist = ''
1685
1686            myAttrs = { }
1687            if name == "":
1688                myAttrs['name'] = "none"
1689            else:
1690                myAttrs['name'] = name
1691
1692            myAttrs[ 'owner' ]        = owner
1693            myAttrs[ 'requested_time' ]    = str(requested_time)
1694            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1695            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1696            myAttrs[ 'ppn' ]        = str( ppn )
1697            myAttrs[ 'status' ]        = status
1698            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1699            myAttrs[ 'queue' ]        = str(queue)
1700            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1701            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1702            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1703            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1704            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
1705
1706            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1707                jobs[ job_id ] = myAttrs
1708
1709                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
1710
1711        for id, attrs in jobs.items():
1712            if id not in jobs_processed:
1713                # This one isn't there anymore
1714                #
1715                del jobs[ id ]
1716        self.jobs=jobs
1717
1718
1719class PbsDataGatherer( DataGatherer ):
1720
1721    """This is the DataGatherer for PBS and Torque"""
1722
1723    global PBSQuery, PBSError
1724
1725    def __init__( self ):
1726
1727        """Setup appropriate variables"""
1728
1729        self.jobs       = { }
1730        self.timeoffset = 0
1731        self.dp         = DataProcessor()
1732
1733        self.initPbsQuery()
1734
1735    def initPbsQuery( self ):
1736
1737        self.pq = None
1738
1739        try:
1740
1741            if( BATCH_SERVER ):
1742
1743                self.pq = PBSQuery( BATCH_SERVER )
1744            else:
1745                self.pq = PBSQuery()
1746
1747        except PBSError, details:
1748            print 'Cannot connect to pbs server'
1749            print details
1750            sys.exit( 1 )
1751
1752        try:
1753            # TODO: actually use new data structure
1754            self.pq.old_data_structure()
1755
1756        except AttributeError:
1757
1758            # pbs_query is older
1759            #
1760            pass
1761
1762    def getNodeData( self ):
1763
1764        nodedict = { }
1765
1766        try:
1767            nodedict = self.pq.getnodes()
1768
1769        except PBSError, detail:
1770
1771            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1772
1773        return nodedict
1774
1775    def getJobData( self ):
1776
1777        """Gather all data on current jobs in Torque"""
1778
1779        joblist            = {}
1780        self.cur_time      = 0
1781
1782        try:
1783            joblist        = self.pq.getjobs()
1784            self.cur_time  = time.time()
1785
1786        except PBSError, detail:
1787
1788            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1789            return None
1790
1791        jobs_processed    = [ ]
1792
1793        for name, attrs in joblist.items():
1794            display_queue = 1
1795            job_id        = name.split( '.' )[0]
1796
1797            name          = self.getAttr( attrs, 'Job_Name' )
1798            queue         = self.getAttr( attrs, 'queue' )
1799
1800            if QUEUE:
1801                for q in QUEUE:
1802                    if q == queue:
1803                        display_queue = 1
1804                        break
1805                    else:
1806                        display_queue = 0
1807                        continue
1808            if display_queue == 0:
1809                continue
1810
1811
1812            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
1813            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1814            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
1815
1816            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
1817
1818            ppn = ''
1819
1820            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
1821
1822                mynoderequest_fields = mynoderequest.split( ':' )
1823
1824                for mynoderequest_field in mynoderequest_fields:
1825
1826                    if mynoderequest_field.find( 'ppn' ) != -1:
1827
1828                        ppn = mynoderequest_field.split( 'ppn=' )[1]
1829
1830            status = self.getAttr( attrs, 'job_state' )
1831
1832            if status in [ 'Q', 'R' ]:
1833
1834                jobs_processed.append( job_id )
1835
1836            queued_timestamp = self.getAttr( attrs, 'ctime' )
1837
1838            if status == 'R':
1839
1840                start_timestamp = self.getAttr( attrs, 'mtime' )
1841                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
1842
1843                nodeslist       = do_nodelist( nodes )
1844
1845                if DETECT_TIME_DIFFS:
1846
1847                    # If a job start if later than our current date,
1848                    # that must mean the Torque server's time is later
1849                    # than our local time.
1850               
1851                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1852
1853                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1854
1855            elif status == 'Q':
1856
1857                # 'mynodequest' can be a string in the following syntax according to the
1858                # Torque Administator's manual:
1859                #
1860                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1861                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1862                # etc
1863                #
1864
1865                #
1866                # For now we only count the amount of nodes request and ignore properties
1867                #
1868
1869                start_timestamp = ''
1870                count_mynodes   = 0
1871
1872                for node in mynoderequest.split( '+' ):
1873
1874                    # Just grab the {node_count|hostname} part and ignore properties
1875                    #
1876                    nodepart     = node.split( ':' )[0]
1877
1878                    # Let's assume a node_count value
1879                    #
1880                    numeric_node = 1
1881
1882                    # Chop the value up into characters
1883                    #
1884                    for letter in nodepart:
1885
1886                        # If this char is not a digit (0-9), this must be a hostname
1887                        #
1888                        if letter not in string.digits:
1889
1890                            numeric_node = 0
1891
1892                    # If this is a hostname, just count this as one (1) node
1893                    #
1894                    if not numeric_node:
1895
1896                        count_mynodes = count_mynodes + 1
1897                    else:
1898
1899                        # If this a number, it must be the node_count
1900                        # and increase our count with it's value
1901                        #
1902                        try:
1903                            count_mynodes = count_mynodes + int( nodepart )
1904
1905                        except ValueError, detail:
1906
1907                            # When we arrive here I must be bugged or very confused
1908                            # THIS SHOULD NOT HAPPEN!
1909                            #
1910                            debug_msg( 10, str( detail ) )
1911                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1912                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1913                            debug_msg( 10, 'job = ' + str( name ) )
1914                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1915                       
1916                nodeslist       = str( count_mynodes )
1917            else:
1918                start_timestamp = ''
1919                nodeslist       = ''
1920
1921            myAttrs                = { }
1922
1923            myAttrs[ 'name' ]             = str( name )
1924            myAttrs[ 'queue' ]            = str( queue )
1925            myAttrs[ 'owner' ]            = str( owner )
1926            myAttrs[ 'requested_time' ]   = str( requested_time )
1927            myAttrs[ 'requested_memory' ] = str( requested_memory )
1928            myAttrs[ 'ppn' ]              = str( ppn )
1929            myAttrs[ 'status' ]           = str( status )
1930            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1931            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1932            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1933            myAttrs[ 'nodes' ]            = nodeslist
1934            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1935            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1936
1937            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1938
1939                self.jobs[ job_id ] = myAttrs
1940
1941        for id, attrs in self.jobs.items():
1942
1943            if id not in jobs_processed:
1944
1945                # This one isn't there anymore; toedeledoki!
1946                #
1947                del self.jobs[ id ]
1948
1949GMETRIC_DEFAULT_TYPE    = 'string'
1950GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1951GMETRIC_DEFAULT_PORT    = '8649'
1952GMETRIC_DEFAULT_UNITS   = ''
1953
1954class Gmetric:
1955
1956    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1957
1958    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1959    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1960    protocol        = ( 'udp', 'multicast' )
1961
1962    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1963               
1964        global GMETRIC_DEFAULT_TYPE
1965
1966        self.prot       = self.checkHostProtocol( host )
1967        self.data_msg   = xdrlib.Packer()
1968        self.meta_msg   = xdrlib.Packer()
1969        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1970
1971        if self.prot not in self.protocol:
1972
1973            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1974
1975        if self.prot == 'multicast':
1976
1977            # Set multicast options
1978            #
1979            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1980
1981        self.hostport   = ( host, int( port ) )
1982        self.slopestr   = 'both'
1983        self.tmax       = 60
1984
1985    def checkHostProtocol( self, ip ):
1986
1987        """Detect if a ip adress is a multicast address"""
1988
1989        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1990        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1991
1992        ip_fields               = ip.split( '.' )
1993
1994        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1995
1996            return 'multicast'
1997        else:
1998            return 'udp'
1999
2000    def send( self, name, value, dmax, typestr = '', units = '' ):
2001
2002        if len( units ) == 0:
2003            units       = GMETRIC_DEFAULT_UNITS
2004
2005        if len( typestr ) == 0:
2006            typestr     = GMETRIC_DEFAULT_TYPE
2007
2008        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
2009
2010        meta_rt = self.socket.sendto( meta_msg, self.hostport )
2011        data_rt = self.socket.sendto( data_msg, self.hostport )
2012
2013        return ( meta_rt, data_rt )
2014
2015    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
2016
2017        hostname = "unset"
2018
2019        if slopestr not in self.slope:
2020
2021            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
2022
2023        if typestr not in self.type:
2024
2025            raise ValueError( "Type must be one of: " + str( self.type ) )
2026
2027        if len( name ) == 0:
2028
2029            raise ValueError( "Name must be non-empty" )
2030
2031        self.meta_msg.reset()
2032        self.meta_msg.pack_int( 128 )
2033
2034        if not spoof:
2035            self.meta_msg.pack_string( hostname )
2036        else:
2037            self.meta_msg.pack_string( spoof )
2038
2039        self.meta_msg.pack_string( name )
2040
2041        if not spoof:
2042            self.meta_msg.pack_int( 0 )
2043        else:
2044            self.meta_msg.pack_int( 1 )
2045           
2046        self.meta_msg.pack_string( typestr )
2047        self.meta_msg.pack_string( name )
2048        self.meta_msg.pack_string( unitstr )
2049        self.meta_msg.pack_int( self.slope[ slopestr ] )
2050        self.meta_msg.pack_uint( int( tmax ) )
2051        self.meta_msg.pack_uint( int( dmax ) )
2052
2053        if not group:
2054            self.meta_msg.pack_int( 0 )
2055        else:
2056            self.meta_msg.pack_int( 1 )
2057            self.meta_msg.pack_string( "GROUP" )
2058            self.meta_msg.pack_string( group )
2059
2060        self.data_msg.reset()
2061        self.data_msg.pack_int( 128+5 )
2062
2063        if not spoof:
2064            self.data_msg.pack_string( hostname )
2065        else:
2066            self.data_msg.pack_string( spoof )
2067
2068        self.data_msg.pack_string( name )
2069
2070        if not spoof:
2071            self.data_msg.pack_int( 0 )
2072        else:
2073            self.data_msg.pack_int( 1 )
2074
2075        self.data_msg.pack_string( "%s" )
2076        self.data_msg.pack_string( str( value ) )
2077
2078        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2079
2080def printTime( ):
2081
2082    """Print current time/date in human readable format for log/debug"""
2083
2084    return time.strftime("%a, %d %b %Y %H:%M:%S")
2085
2086def debug_msg( level, msg ):
2087
2088    """Print msg if at or above current debug level"""
2089
2090    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
2091
2092    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2093        sys.stderr.write( msg + '\n' )
2094
2095    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2096        syslog.syslog( msg )
2097
2098def write_pidfile():
2099
2100    # Write pidfile if PIDFILE is set
2101    #
2102    if PIDFILE:
2103
2104        pid    = os.getpid()
2105
2106        pidfile    = open( PIDFILE, 'w' )
2107
2108        pidfile.write( str( pid ) )
2109        pidfile.close()
2110
2111def main():
2112
2113    """Application start"""
2114
2115    global PBSQuery, PBSError, lsfObject, pyslurm
2116    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
2117
2118    if not processArgs( sys.argv[1:] ):
2119
2120        sys.exit( 1 )
2121
2122    # Load appropriate DataGatherer depending on which BATCH_API is set
2123    # and any required modules for the Gatherer
2124    #
2125    if BATCH_API == 'pbs':
2126
2127        try:
2128            from PBSQuery import PBSQuery, PBSError
2129
2130        except ImportError, details:
2131
2132            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
2133            print details
2134            sys.exit( 1 )
2135
2136        gather = PbsDataGatherer()
2137
2138    elif BATCH_API == 'sge':
2139
2140        if BATCH_SERVER != 'localhost':
2141
2142            # Print and log, but continue execution
2143            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2144            print err_msg
2145            debug_msg( 0, err_msg )
2146
2147        # Tested with SGE 6.0u11.
2148        #
2149        gather = SgeDataGatherer()
2150
2151    elif BATCH_API == 'lsf':
2152
2153        if BATCH_SERVER != 'localhost':
2154
2155            # Print and log, but continue execution
2156            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2157            print err_msg
2158            debug_msg( 0, err_msg )
2159
2160        try:
2161            from lsfObject import lsfObject
2162        except:
2163            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2164            sys.exit( 1 )
2165
2166        gather = LsfDataGatherer()
2167
2168    elif BATCH_API == 'slurm':
2169
2170        if BATCH_SERVER != 'localhost':
2171
2172            # Print and log, but continue execution
2173            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2174            print err_msg
2175            debug_msg( 0, err_msg )
2176
2177        try:
2178            import pyslurm
2179        except:
2180            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
2181            sys.exit( 1 )
2182
2183        gather = SLURMDataGatherer()
2184
2185    else:
2186        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
2187
2188        sys.exit( 1 )
2189
2190    if( DAEMONIZE and USE_SYSLOG ):
2191
2192        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2193
2194    if DAEMONIZE:
2195
2196        gather.daemon()
2197    else:
2198        gather.run()
2199
2200# wh00t? someone started me! :)
2201#
2202if __name__ == '__main__':
2203    main()
Note: See TracBrowser for help on using the repository browser.