source: branches/1.1/jobmond/jobmond.py @ 942

Last change on this file since 942 was 942, checked in by dennis, 10 years ago

Make sure we encode the string before creating the XML

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 64.7 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
22# SVN $Id: jobmond.py 942 2013-12-18 11:24:49Z dennis $
23#
24
25# vi :set ts=4
26
27import sys, getopt, ConfigParser, time, os, socket, string, re
28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
29from xml.sax.handler import feature_namespaces
30from collections import deque
31from glob import glob
32import cgi
33
34VERSION='__VERSION__'
35
36def usage( ver ):
37
38    print 'jobmond %s' %VERSION
39
40    if ver:
41        return 0
42
43    print
44    print 'Purpose:'
45    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
46    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
47    print
48    print 'Usage:    jobmond [OPTIONS]'
49    print
50    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
51    print '  -p, --pidfile=FILE    Use pid file to store the process id'
52    print '  -h, --help        Print help and exit'
53    print '  -v, --version      Print version and exit'
54    print
55
56def processArgs( args ):
57
58    SHORT_L      = 'p:hvc:'
59    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
60
61    global PIDFILE, JOBMOND_CONF
62    PIDFILE      = None
63
64    JOBMOND_CONF = '/etc/jobmond.conf'
65
66    try:
67
68        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
69
70    except getopt.GetoptError, detail:
71
72        print detail
73        usage( False )
74        sys.exit( 1 )
75
76    for opt, value in opts:
77
78        if opt in [ '--config', '-c' ]:
79       
80            JOBMOND_CONF = value
81
82        if opt in [ '--pidfile', '-p' ]:
83
84            PIDFILE      = value
85       
86        if opt in [ '--help', '-h' ]:
87 
88            usage( False )
89            sys.exit( 0 )
90
91        if opt in [ '--version', '-v' ]:
92
93            usage( True )
94            sys.exit( 0 )
95
96    return loadConfig( JOBMOND_CONF )
97
98class GangliaConfigParser:
99
100    def __init__( self, filename ):
101
102        self.conf_lijst   = [ ]
103        self.conf_dict    = { }
104        self.filename     = filename
105        self.file_pointer = file( filename, 'r' )
106        self.lexx         = shlex.shlex( self.file_pointer )
107        self.lexx.whitespace_split = True
108
109        self.parse()
110
111    def __del__( self ):
112
113        """
114        Cleanup: close file descriptor
115        """
116
117        self.file_pointer.close()
118        del self.lexx
119        del self.conf_lijst
120
121    def removeQuotes( self, value ):
122
123        clean_value = value
124        clean_value = clean_value.replace( "'", "" )
125        clean_value = clean_value.replace( '"', '' )
126        clean_value = clean_value.strip()
127
128        return clean_value
129
130    def removeBraces( self, value ):
131
132        clean_value = value
133        clean_value = clean_value.replace( "(", "" )
134        clean_value = clean_value.replace( ')', '' )
135        clean_value = clean_value.strip()
136
137        return clean_value
138
139    def parse( self ):
140
141        """
142        Parse self.filename using shlex scanning.
143        - Removes /* comments */
144        - Traverses (recursively) through all include () statements
145        - Stores complete valid config tokens in self.conf_list
146
147        i.e.:
148            ['globals',
149             '{',
150             'daemonize',
151             '=',
152             'yes',
153             'setuid',
154             '=',
155             'yes',
156             'user',
157             '=',
158             'ganglia',
159             'debug_level',
160             '=',
161             '0',
162             <etc> ]
163        """
164
165        t = 'bogus'
166        c = False
167        i = False
168
169        while t != self.lexx.eof:
170            #print 'get token'
171            t = self.lexx.get_token()
172
173            if len( t ) >= 2:
174
175                if len( t ) >= 4:
176
177                    if t[:2] == '/*' and t[-2:] == '*/':
178
179                        #print 'comment line'
180                        #print 'skipping: %s' %t
181                        continue
182
183                if t == '/*' or t[:2] == '/*':
184                    c = True
185                    #print 'comment start'
186                    #print 'skipping: %s' %t
187                    continue
188
189                if t == '*/' or t[-2:] == '*/':
190                    c = False
191                    #print 'skipping: %s' %t
192                    #print 'comment end'
193                    continue
194
195            if c:
196                #print 'skipping: %s' %t
197                continue
198
199            if t == 'include':
200                i = True
201                #print 'include start'
202                #print 'skipping: %s' %t
203                continue
204
205            if i:
206
207                #print 'include start: %s' %t
208
209                t2 = self.removeQuotes( t )
210                t2 = self.removeBraces( t )
211
212                for in_file in glob( self.removeQuotes(t2) ):
213
214                    #print 'including file: %s' %in_file
215                    parse_infile = GangliaConfigParser( in_file )
216
217                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
218
219                    del parse_infile
220
221                i = False
222                #print 'include end'
223                #print 'skipping: %s' %t
224                continue
225
226            #print 'keep: %s' %t
227            self.conf_lijst.append( self.removeQuotes(t) )
228
229    def getConfLijst( self ):
230
231        return self.conf_lijst
232
233    def confListToDict( self, parent_list=None ):
234
235        """
236        Recursively traverses a conf_list and creates dictionary from it
237        """
238
239        new_dict = { }
240        count    = 0
241        skip     = 0
242
243        if not parent_list:
244            parent_list = self.conf_lijst
245
246        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
247
248        for n, c in enumerate( parent_list ):
249
250            count = count + 1
251
252            #print 'CL: n %d c %s' %(n, c)
253
254            if skip > 0:
255
256                #print '- skipped'
257                skip = skip - 1
258                continue
259
260            if (n+1) <= (len( parent_list )-1):
261
262                if parent_list[(n+1)] == '{':
263
264                    if not new_dict.has_key( c ):
265                        new_dict[ c ] = [ ]
266
267                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
268                    new_dict[ c ].append( temp_new_dict )
269
270                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
271
272                    if not new_dict.has_key( c ):
273                        new_dict[ c ] = [ ]
274
275                    new_dict[ c ].append( parent_list[ (n+2) ] )
276
277                    skip = 2
278
279                if parent_list[n] == '}':
280
281                    #print 'leaving confListToDict(): new dict = %s' %new_dict
282                    return (new_dict, count)
283
284    def makeConfDict( self ):
285
286        """
287        Walks through self.conf_list and creates a dictionary based upon config values
288
289        i.e.:
290            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
291                                                         'ip': ['"127.0.0.1"'],
292                                                         'mask': ['32']}]}],
293                                    'port': ['8649']}],
294            'udp_recv_channel': [{'port': ['8649']}],
295            'udp_send_channel': [{'host': ['145.101.32.3'],
296                                  'port': ['8649']},
297                                 {'host': ['145.101.32.207'],
298                                  'port': ['8649']}]}
299        """
300
301        new_dict = { }
302        skip     = 0
303
304        #print 'entering makeConfDict()'
305
306        for n, c in enumerate( self.conf_lijst ):
307
308            #print 'M: n %d c %s' %(n, c)
309
310            if skip > 0:
311
312                #print '- skipped'
313                skip = skip - 1
314                continue
315
316            if (n+1) <= (len( self.conf_lijst )-1):
317
318                if self.conf_lijst[(n+1)] == '{':
319
320                    if not new_dict.has_key( c ):
321                        new_dict[ c ] = [ ]
322
323                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
324                    new_dict[ c ].append( temp_new_dict )
325
326                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
327
328                    if not new_dict.has_key( c ):
329                        new_dict[ c ] = [ ]
330
331                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
332
333                    skip = 2
334
335        self.conf_dict = new_dict
336        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
337
338    def checkConfDict( self ):
339
340        if len( self.conf_lijst ) == 0:
341
342            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
343
344        if len( self.conf_dict ) == 0:
345
346            self.makeConfDict()
347
348    def getConfDict( self ):
349
350        self.checkConfDict()
351        return self.conf_dict
352
353    def getUdpSendChannels( self ):
354
355        self.checkConfDict()
356
357        udp_send_channels = [ ] # IP:PORT
358
359        if not self.conf_dict.has_key( 'udp_send_channel' ):
360            return None
361
362        for u in self.conf_dict[ 'udp_send_channel' ]:
363
364            if u.has_key( 'mcast_join' ):
365
366                ip = u['mcast_join'][0]
367
368            elif u.has_key( 'host' ):
369
370                ip = u['host'][0]
371
372            port = u['port'][0]
373
374            udp_send_channels.append( ( ip, port ) )
375
376        if len( udp_send_channels ) == 0:
377            return None
378
379        return udp_send_channels
380
381    def getSectionLastOption( self, section, option ):
382
383        """
384        Get last option set in a config section that could be set multiple times in multiple (include) files.
385
386        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
387        """
388
389        self.checkConfDict()
390        value = None
391
392        if not self.conf_dict.has_key( section ):
393
394            return None
395
396        # Could be set multiple times in multiple (include) files: get last one set
397        for c in self.conf_dict[ section ]:
398
399                if c.has_key( option ):
400
401                    value = c[ option ][0]
402
403        return value
404
405    def getClusterName( self ):
406
407        return self.getSectionLastOption( 'cluster', 'name' )
408
409    def getVal( self, section, option ):
410
411        return self.getSectionLastOption( section, option )
412
413    def getInt( self, section, valname ):
414
415        value    = self.getVal( section, valname )
416
417        if not value:
418            return None
419
420        return int( value )
421
422    def getStr( self, section, valname ):
423
424        value    = self.getVal( section, valname )
425
426        if not value:
427            return None
428
429        return str( value )
430
431def findGmetric():
432
433    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
434
435        guess    = '%s/%s' %( dir, 'gmetric' )
436
437        if os.path.exists( guess ):
438
439            return guess
440
441    return False
442
443def loadConfig( filename ):
444
445    def getlist( cfg_string ):
446
447        my_list = [ ]
448
449        for item_txt in cfg_string.split( ',' ):
450
451            sep_char = None
452
453            item_txt = item_txt.strip()
454
455            for s_char in [ "'", '"' ]:
456
457                if item_txt.find( s_char ) != -1:
458
459                    if item_txt.count( s_char ) != 2:
460
461                        print 'Missing quote: %s' %item_txt
462                        sys.exit( 1 )
463
464                    else:
465
466                        sep_char = s_char
467                        break
468
469            if sep_char:
470
471                item_txt = item_txt.split( sep_char )[1]
472
473            my_list.append( item_txt )
474
475        return my_list
476
477    if not os.path.isfile( JOBMOND_CONF ):
478
479        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
480        sys.exit( 1 )
481
482    try:
483        f = open( JOBMOND_CONF, 'r' )
484    except IOError, detail:
485        print "Cannot read config file: '%s'" %JOBMOND_CONF
486        sys.exit( 1 )
487    else:
488        f.close()
489
490    cfg        = ConfigParser.ConfigParser()
491
492    cfg.read( filename )
493
494    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
495    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
496    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
497    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
498    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
499
500    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
501
502    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
503
504    SYSLOG_LEVEL    = -1
505    SYSLOG_FACILITY = None
506
507    try:
508        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
509
510    except ConfigParser.NoOptionError:
511
512        USE_SYSLOG  = True
513
514        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
515
516    if USE_SYSLOG:
517
518        try:
519            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
520
521        except ConfigParser.NoOptionError:
522
523            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
524            SYSLOG_LEVEL = 0
525
526        try:
527
528            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
529
530        except ConfigParser.NoOptionError:
531
532            SYSLOG_FACILITY = syslog.LOG_DAEMON
533
534            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
535
536    try:
537
538        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
539
540    except ConfigParser.NoOptionError:
541
542        # Not required for all API's: only pbs api allows remote connections
543        BATCH_SERVER = None
544
545    try:
546   
547        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
548
549    except ConfigParser.NoOptionError:
550
551        # Backwards compatibility for old configs
552        #
553
554        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
555        api_guess           = 'pbs'
556   
557    try:
558
559        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
560
561    except ConfigParser.NoOptionError:
562
563        # Not specified: assume /etc/ganglia/gmond.conf
564        #
565        GMOND_CONF          = '/etc/ganglia/gmond.conf'
566
567    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
568    GMETRIC_TARGET          = None
569
570    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
571
572    if not GMOND_UDP_SEND_CHANNELS:
573
574        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
575
576        # Couldn't figure it out: let's see if it's in our jobmond.conf
577        #
578        try:
579
580            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
581
582        # Guess not: now just give up
583       
584        except ConfigParser.NoOptionError:
585
586            GMETRIC_TARGET    = None
587
588            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
589
590            gmetric_bin    = findGmetric()
591
592            if gmetric_bin:
593
594                GMETRIC_BINARY     = gmetric_bin
595            else:
596                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
597
598                try:
599
600                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
601
602                except ConfigParser.NoOptionError:
603
604                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
605                    sys.exit( 1 )
606
607    #TODO: is this really still needed or should be automatic
608    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
609
610    try:
611        BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
612
613    except ConfigParser.NoOptionError:
614
615        BATCH_HOST_TRANSLATE = [ ]
616        pass
617
618    try:
619
620        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
621
622    except ConfigParser.NoOptionError, detail:
623
624        print "FATAL ERROR: BATCH_API not set"
625        sys.exit( 1 )
626
627    try:
628
629        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
630
631    except ConfigParser.NoOptionError, detail:
632
633        QUEUE        = None
634
635    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
636
637    return True
638
639def fqdn_parts (fqdn):
640
641    """Return pair of host and domain for fully-qualified domain name arg."""
642
643    parts = fqdn.split (".")
644
645    return (parts[0], string.join(parts[1:], "."))
646
647class DataProcessor:
648
649    """Class for processing of data"""
650
651    binary = None
652
653    def __init__( self, binary=None ):
654
655        """Remember alternate binary location if supplied"""
656
657        global GMETRIC_BINARY, GMOND_CONF
658
659        if binary:
660            self.binary = binary
661
662        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
663            self.binary = GMETRIC_BINARY
664
665        # Timeout for XML
666        #
667        # From ganglia's documentation:
668        #
669        # 'A metric will be deleted DMAX seconds after it is received, and
670        # DMAX=0 means eternal life.'
671
672        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
673
674        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
675
676            incompatible = self.checkGmetricVersion()
677
678            if incompatible:
679
680                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
681                sys.exit( 1 )
682
683    def checkGmetricVersion( self ):
684
685        """
686        Check version of gmetric is at least 3.3.8
687        for the syntax we use
688        """
689
690        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
691
692        incompatible    = 0
693
694        gfp        = os.popen( self.binary + ' --version' )
695        lines      = gfp.readlines()
696
697        gfp.close()
698
699        for line in lines:
700
701            line = line.split( ' ' )
702
703            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
704           
705                gmetric_version    = line[1].split( '\n' )[0]
706
707                version_major    = int( gmetric_version.split( '.' )[0] )
708                version_minor    = int( gmetric_version.split( '.' )[1] )
709                version_patch    = int( gmetric_version.split( '.' )[2] )
710
711                incompatible    = 0
712
713                if version_major < 3:
714
715                    incompatible = 1
716               
717                elif version_major == 3:
718
719                    if version_minor < 3:
720
721                        incompatible = 1
722
723                    elif version_patch < 8:
724
725                        incompatible = 1
726
727        return incompatible
728
729    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
730
731        """Call gmetric binary and multicast"""
732
733        cmd = self.binary
734
735        if GMOND_UDP_SEND_CHANNELS:
736
737            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
738
739                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
740
741                debug_msg( 10, printTime() + ' ' + metric_debug)
742
743                gm = Gmetric( c_ip, c_port )
744
745                gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
746
747        elif GMETRIC_TARGET:
748
749            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
750            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
751
752            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
753
754            debug_msg( 10, printTime() + ' ' + metric_debug)
755
756            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
757
758            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
759
760        else:
761            try:
762                cmd = cmd + ' -c' + GMOND_CONF
763
764            except NameError:
765
766                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
767
768            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
769
770            if len( units ) > 0:
771
772                cmd = cmd + ' -u"' + units + '"'
773
774            debug_msg( 10, printTime() + ' ' + cmd )
775
776            os.system( cmd )
777
778class DataGatherer:
779
780    """Skeleton class for batch system DataGatherer"""
781
782    def printJobs( self, jobs ):
783
784        """Print a jobinfo overview"""
785
786        for name, attrs in self.jobs.items():
787
788            print 'job %s' %(name)
789
790            for name, val in attrs.items():
791
792                print '\t%s = %s' %( name, val )
793
794    def printJob( self, jobs, job_id ):
795
796        """Print job with job_id from jobs"""
797
798        print 'job %s' %(job_id)
799
800        for name, val in jobs[ job_id ].items():
801
802            print '\t%s = %s' %( name, val )
803
804    def getAttr( self, attrs, name ):
805
806        """Return certain attribute from dictionary, if exists"""
807
808        if attrs.has_key( name ):
809
810            return attrs[ name ]
811        else:
812            return ''
813
814    def jobDataChanged( self, jobs, job_id, attrs ):
815
816        """Check if job with attrs and job_id in jobs has changed"""
817
818        if jobs.has_key( job_id ):
819
820            oldData = jobs[ job_id ]   
821        else:
822            return 1
823
824        for name, val in attrs.items():
825
826            if oldData.has_key( name ):
827
828                if oldData[ name ] != attrs[ name ]:
829
830                    return 1
831
832            else:
833                return 1
834
835        return 0
836
837    def submitJobData( self ):
838
839        """Submit job info list"""
840
841        global BATCH_API
842
843        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
844
845        running_jobs = 0
846        queued_jobs  = 0
847
848        # Count how many running/queued jobs we found
849        #
850        for jobid, jobattrs in self.jobs.items():
851
852            if jobattrs[ 'status' ] == 'Q':
853
854                queued_jobs += 1
855
856            elif jobattrs[ 'status' ] == 'R':
857
858                running_jobs += 1
859
860        # Report running/queued jobs as seperate metric for a nice RRD graph
861        #
862        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
863        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
864
865        # Report down/offline nodes in batch (PBS only ATM)
866        #
867        if BATCH_API in [ 'pbs', 'slurm' ]:
868
869            domain        = fqdn_parts( socket.getfqdn() )[1]
870
871            downed_nodes  = list()
872            offline_nodes = list()
873       
874            l        = ['state']
875
876            nodelist = self.getNodeData()
877
878            for name, node in nodelist.items():
879
880                if ( node[ 'state' ].find( "down" ) != -1 ):
881
882                    downed_nodes.append( name )
883
884                if ( node[ 'state' ].find( "offline" ) != -1 ):
885
886                    offline_nodes.append( name )
887
888            downnodeslist    = do_nodelist( downed_nodes )
889            offlinenodeslist = do_nodelist( offline_nodes )
890
891            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
892            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
893            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
894            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
895
896        # Now let's spread the knowledge
897        #
898        for jobid, jobattrs in self.jobs.items():
899
900            # Make gmetric values for each job: respect max gmetric value length
901            #
902            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
903            metric_increment    = 0
904
905            # If we have more job info than max gmetric value length allows, split it up
906            # amongst multiple metrics
907            #
908            for val in gmetric_val:
909
910                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
911                self.dp.multicastGmetric( metric_name, val )
912
913                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
914                #
915                metric_increment    = metric_increment + 1
916
917    def compileGmetricVal( self, jobid, jobattrs ):
918
919        """Create a val string for gmetric of jobinfo"""
920
921        gval_lists    = [ ]
922        val_list    = { }
923
924        for val_name, val_value in jobattrs.items():
925
926            # These are our own metric names, i.e.: status, start_timestamp, etc
927            #
928            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
929
930            # These are their corresponding values
931            #
932            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
933
934            if val_name == 'nodes' and jobattrs['status'] == 'R':
935
936                node_str = None
937
938                for node in val_value:
939
940                    if node_str:
941
942                        node_str = node_str + ';' + node
943                    else:
944                        node_str = node
945
946                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
947                    #
948                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
949
950                        # It's too big, we need to make a new gmetric for the additional info
951                        #
952                        val_list[ val_name ]    = node_str
953
954                        gval_lists.append( val_list )
955
956                        val_list        = { }
957                        node_str        = None
958
959                val_list[ val_name ]    = node_str
960
961                gval_lists.append( val_list )
962
963                val_list        = { }
964
965            elif val_value != '':
966
967                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
968                #
969                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
970
971                    # It's too big, we need to make a new gmetric for the additional info
972                    #
973                    gval_lists.append( val_list )
974
975                    val_list        = { }
976
977                val_list[ val_name ]    = val_value
978
979        if len( val_list ) > 0:
980
981            gval_lists.append( val_list )
982
983        str_list    = [ ]
984
985        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
986        #
987        for val_list in gval_lists:
988
989            my_val_str    = None
990
991            for val_name, val_value in val_list.items():
992
993                if type(val_value) == list:
994
995                    val_value    = val_value.join( ',' )
996
997                if my_val_str:
998
999                    try:
1000                        # fixme: It's getting
1001                        # ('nodes', None) items
1002                        my_val_str = my_val_str + ' ' + val_name + '=' + cgi.encode(val_value)
1003                    except:
1004                        pass
1005
1006                else:
1007                    my_val_str = val_name + '=' + cgi.encode(val_value)
1008
1009            str_list.append( my_val_str )
1010
1011        return str_list
1012
1013    def daemon( self ):
1014
1015        """Run as daemon forever"""
1016
1017        # Fork the first child
1018        #
1019        pid = os.fork()
1020        if pid > 0:
1021            sys.exit(0)  # end parent
1022
1023        # creates a session and sets the process group ID
1024        #
1025        os.setsid()
1026
1027        # Fork the second child
1028        #
1029        pid = os.fork()
1030        if pid > 0:
1031            sys.exit(0)  # end parent
1032
1033        write_pidfile()
1034
1035        # Go to the root directory and set the umask
1036        #
1037        os.chdir('/')
1038        os.umask(0)
1039
1040        sys.stdin.close()
1041        sys.stdout.close()
1042        sys.stderr.close()
1043
1044        os.open('/dev/null', os.O_RDWR)
1045        os.dup2(0, 1)
1046        os.dup2(0, 2)
1047
1048        self.run()
1049
1050    def run( self ):
1051
1052        """Main thread"""
1053
1054        while ( 1 ):
1055       
1056            self.getJobData()
1057            self.submitJobData()
1058            time.sleep( BATCH_POLL_INTERVAL )   
1059
1060# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1061# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1062# with 6.2.  See also the fixmes.
1063
1064class NoJobs (Exception):
1065    """Exception raised by empty job list in qstat output."""
1066    pass
1067
1068class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
1069    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
1070
1071    def __init__(self):
1072        self.value = ""
1073        self.joblist = []
1074        self.job = {}
1075        self.queue = ""
1076        self.in_joblist = False
1077        self.lrequest = False
1078        self.eltq = deque()
1079        xml.sax.handler.ContentHandler.__init__(self)
1080
1081    # The structure of the output is as follows (for SGE 6.0).  It's
1082    # similar for 6.1, but radically different for 6.2, and is
1083    # undocumented generally.  Unfortunately it's voluminous, and probably
1084    # doesn't scale to large clusters/queues.
1085
1086    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1087    #   <djob_info>
1088    #     <qmaster_response>  <!-- job -->
1089    #       ...
1090    #       <JB_ja_template> 
1091    #     <ulong_sublist>
1092    #     ...         <!-- start_time, state ... -->
1093    #     </ulong_sublist>
1094    #       </JB_ja_template> 
1095    #       <JB_ja_tasks>
1096    #     <ulong_sublist>
1097    #       ...       <!-- task info
1098    #     </ulong_sublist>
1099    #     ...
1100    #       </JB_ja_tasks>
1101    #       ...
1102    #     </qmaster_response>
1103    #   </djob_info>
1104    #   <messages>
1105    #   ...
1106
1107    # NB.  We might treat each task as a separate job, like
1108    # straight qstat output, but the web interface expects jobs to
1109    # be identified by integers, not, say, <job number>.<task>.
1110
1111    # So, I lied.  If the job list is empty, we get invalid XML
1112    # like this, which we need to defend against:
1113
1114    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1115    #   <>
1116    #     <ST_name>*</ST_name>
1117    #   </>
1118    # </unknown_jobs>
1119
1120    def startElement(self, name, attrs):
1121        self.value = ""
1122        if name == "djob_info":    # job list
1123            self.in_joblist = True
1124        # The job container is "qmaster_response" in SGE 6.0
1125        # and 6.1, but "element" in 6.2.  This is only the very
1126        # start of what's necessary for 6.2, though (sigh).
1127        elif (name == "qmaster_response" or name == "element") \
1128                and self.eltq[-1] == "djob_info": # job
1129            self.job = {"job_state": "U", "slots": 0,
1130                    "nodes": [], "queued_timestamp": "",
1131                    "queued_timestamp": "", "queue": "",
1132                    "ppn": "0", "RN_max": 0,
1133                    # fixme in endElement
1134                    "requested_memory": 0, "requested_time": 0
1135                    }
1136            self.joblist.append(self.job)
1137        elif name == "qstat_l_requests": # resource request
1138            self.lrequest = True
1139        elif name == "unknown_jobs":
1140            raise NoJobs
1141        self.eltq.append (name)
1142
1143    def characters(self, ch):
1144        self.value += ch
1145
1146    def endElement(self, name): 
1147        """Snarf job elements contents into job dictionary.
1148           Translate keys if appropriate."""
1149
1150        name_trans = {
1151          "JB_job_number": "number",
1152          "JB_job_name": "name", "JB_owner": "owner",
1153          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1154          "JB_submission_time": "queued_timestamp"
1155          }
1156        value = self.value
1157        self.eltq.pop ()
1158
1159        if name == "djob_info":
1160            self.in_joblist = False
1161            self.job = {}
1162        elif name == "JAT_master_queue":
1163            self.job["queue"] = value.split("@")[0]
1164        elif name == "JG_qhostname":
1165            if not (value in self.job["nodes"]):
1166                self.job["nodes"].append(value)
1167        elif name == "JG_slots": # slots in use
1168            self.job["slots"] += int(value)
1169        elif name == "RN_max": # requested slots (tasks or parallel)
1170            self.job["RN_max"] = max (self.job["RN_max"],
1171                          int(value))
1172        elif name == "JAT_state": # job state (bitwise or)
1173            value = int (value)
1174            # Status values from sge_jobL.h
1175            #define JIDLE           0x00000000
1176            #define JHELD           0x00000010
1177            #define JMIGRATING          0x00000020
1178            #define JQUEUED         0x00000040
1179            #define JRUNNING        0x00000080
1180            #define JSUSPENDED          0x00000100
1181            #define JTRANSFERING        0x00000200
1182            #define JDELETED        0x00000400
1183            #define JWAITING        0x00000800
1184            #define JEXITING        0x00001000
1185            #define JWRITTEN        0x00002000
1186            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1187            #define JFINISHED           0x00010000
1188            if value & 0x80:
1189                self.job["status"] = "R"
1190            elif value & 0x40:
1191                self.job["status"] = "Q"
1192            else:
1193                self.job["status"] = "O" # `other'
1194        elif name == "CE_name" and self.lrequest and self.value in \
1195                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1196            # We're in a container for an interesting resource
1197            # request; record which type.
1198            self.lrequest = self.value
1199        elif name == "CE_doubleval" and self.lrequest:
1200            # if we're in a container for an interesting
1201            # resource request, use the maxmimum of the hard
1202            # and soft requests to record the requested CPU
1203            # or core.  Fixme:  I'm not sure if this logic is
1204            # right.
1205            if self.lrequest in ("h_core", "s_core"):
1206                self.job["requested_memory"] = \
1207                    max (float (value),
1208                     self.job["requested_memory"])
1209            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1210            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1211                self.job["requested_time"] = \
1212                    max (float (value),
1213                     self.job["requested_time"])
1214        elif name == "qstat_l_requests":
1215            self.lrequest = False
1216        elif self.job and self.in_joblist:
1217            if name in name_trans:
1218                name = name_trans[name]
1219                self.job[name] = value
1220
1221# Abstracted from PBS original.
1222# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
1223#
1224def do_nodelist( nodes ):
1225
1226    """Translate node list as appropriate."""
1227
1228    nodeslist        = [ ]
1229    my_domain        = fqdn_parts( socket.getfqdn() )[1]
1230
1231    for node in nodes:
1232
1233        host        = node.split( '/' )[0] # not relevant for SGE
1234        h, host_domain    = fqdn_parts(host)
1235
1236        if host_domain == my_domain:
1237
1238            host    = h
1239
1240        if nodeslist.count( host ) == 0:
1241
1242            for translate_pattern in BATCH_HOST_TRANSLATE:
1243
1244                if translate_pattern.find( '/' ) != -1:
1245
1246                    translate_orig    = \
1247                        translate_pattern.split( '/' )[1]
1248                    translate_new    = \
1249                        translate_pattern.split( '/' )[2]
1250                    host = re.sub( translate_orig,
1251                               translate_new, host )
1252            if not host in nodeslist:
1253                nodeslist.append( host )
1254    return nodeslist
1255
1256class SLURMDataGatherer( DataGatherer ):
1257
1258    global pyslurm
1259
1260    """This is the DataGatherer for SLURM"""
1261
1262    def __init__( self ):
1263
1264        """Setup appropriate variables"""
1265
1266        self.jobs       = { }
1267        self.timeoffset = 0
1268        self.dp         = DataProcessor()
1269
1270    def getNodeData( self ):
1271
1272        slurm_type  = pyslurm.node()
1273
1274        slurm_nodes = slurm_type.get()
1275
1276        nodedict    = { }
1277
1278        for node, attrs in slurm_nodes.items():
1279
1280            ( num_state, name_state ) = attrs['node_state'] 
1281
1282            if name_state == 'DOWN':
1283
1284                nodedict[ node ] = { 'state' : 'down' }
1285
1286            elif name_state == 'DRAIN':
1287
1288                nodedict[ node ] = { 'state' : 'offline' }
1289
1290        return nodedict
1291
1292    def getJobData( self ):
1293
1294        """Gather all data on current jobs"""
1295
1296        joblist            = {}
1297
1298        self.cur_time  = time.time()
1299
1300        slurm_type = pyslurm.job()
1301        joblist    = slurm_type.get()
1302
1303        jobs_processed    = [ ]
1304
1305        for name, attrs in joblist.items():
1306            display_queue = 1
1307            job_id        = name
1308
1309            name          = self.getAttr( attrs, 'name' )
1310            queue         = self.getAttr( attrs, 'partition' )
1311
1312            if QUEUE:
1313                for q in QUEUE:
1314                    if q == queue:
1315                        display_queue = 1
1316                        break
1317                    else:
1318                        display_queue = 0
1319                        continue
1320            if display_queue == 0:
1321                continue
1322
1323            owner_uid        = attrs[ 'user_id' ]
1324            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1325
1326            requested_time   = self.getAttr( attrs, 'time_limit' )
1327            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
1328
1329            if min_memory == 0:
1330
1331                requested_memory = ''
1332
1333            else:
1334                requested_memory = min_memory
1335
1336            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
1337
1338            if min_cpus == 0:
1339
1340                ppn = ''
1341
1342            else:
1343                ppn = min_cpus
1344
1345            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1346
1347            status = 'Q'
1348
1349            if status_long == 'RUNNING':
1350
1351                status = 'R'
1352
1353            elif status_long == 'COMPLETED':
1354
1355                continue
1356
1357            jobs_processed.append( job_id )
1358
1359            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1360
1361            start_timestamp = ''
1362            nodeslist       = ''
1363
1364            if status == 'R':
1365
1366                start_timestamp = self.getAttr( attrs, 'start_time' )
1367                nodes           = attrs[ 'nodes' ]
1368
1369                if not nodes:
1370
1371                    # This should not happen
1372
1373                    # Something wrong: running but 'nodes' returned empty by pyslurm
1374                    # Possible pyslurm bug: abort/quit/warning
1375
1376                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1377
1378                    print err_msg
1379                    debug_msg( 0, err_msg )
1380                    sys.exit(1)
1381
1382                my_nodelist = [ ]
1383
1384                slurm_hostlist  = pyslurm.hostlist()
1385                slurm_hostlist.create( nodes )
1386                slurm_hostlist.uniq()
1387
1388                while slurm_hostlist.count() > 0:
1389
1390                    my_nodelist.append( slurm_hostlist.pop() )
1391
1392                slurm_hostlist.destroy()
1393
1394                del slurm_hostlist
1395
1396                nodeslist       = do_nodelist( my_nodelist )
1397
1398                if DETECT_TIME_DIFFS:
1399
1400                    # If a job start if later than our current date,
1401                    # that must mean the Torque server's time is later
1402                    # than our local time.
1403               
1404                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1405
1406                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1407
1408            elif status == 'Q':
1409
1410                nodeslist       = str( attrs[ 'num_nodes' ] )
1411
1412            else:
1413                start_timestamp = ''
1414                nodeslist       = ''
1415
1416            myAttrs                = { }
1417
1418            myAttrs[ 'name' ]             = str( name )
1419            myAttrs[ 'queue' ]            = str( queue )
1420            myAttrs[ 'owner' ]            = str( owner )
1421            myAttrs[ 'requested_time' ]   = str( requested_time )
1422            myAttrs[ 'requested_memory' ] = str( requested_memory )
1423            myAttrs[ 'ppn' ]              = str( ppn )
1424            myAttrs[ 'status' ]           = str( status )
1425            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1426            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1427            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1428            myAttrs[ 'nodes' ]            = nodeslist
1429            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1430            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1431
1432            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1433
1434                self.jobs[ job_id ] = myAttrs
1435
1436        for id, attrs in self.jobs.items():
1437
1438            if id not in jobs_processed:
1439
1440                # This one isn't there anymore; toedeledoki!
1441                #
1442                del self.jobs[ id ]
1443
1444class SgeDataGatherer(DataGatherer):
1445
1446    jobs = {}
1447
1448    def __init__( self ):
1449        self.jobs = {}
1450        self.timeoffset = 0
1451        self.dp = DataProcessor()
1452
1453    def getJobData( self ):
1454        """Gather all data on current jobs in SGE"""
1455
1456        import popen2
1457
1458        self.cur_time = 0
1459        queues = ""
1460        if QUEUE:    # only for specific queues
1461            # Fixme:  assumes queue names don't contain single
1462            # quote or comma.  Don't know what the SGE rules are.
1463            queues = " -q '" + string.join (QUEUE, ",") + "'"
1464        # Note the comment in SgeQstatXMLParser about scaling with
1465        # this method of getting data.  I haven't found better one.
1466        # Output with args `-xml -ext -f -r' is easier to parse
1467        # in some ways, harder in others, but it doesn't provide
1468        # the submission time (at least SGE 6.0).  The pipeline
1469        # into sed corrects bogus XML observed with a configuration
1470        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1471        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
1472sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
1473                           + queues, True)
1474        qstatparser = SgeQstatXMLParser()
1475        parse_err = 0
1476        try:
1477            xml.sax.parse(piping.fromchild, qstatparser)
1478        except NoJobs:
1479            pass
1480        except:
1481            parse_err = 1
1482        if piping.wait():
1483            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
1484            return None
1485        elif parse_err:
1486            debug_msg(10, "Bad XML output from qstat"())
1487            exit (1)
1488        for f in piping.fromchild, piping.tochild, piping.childerr:
1489            f.close()
1490        self.cur_time = time.time()
1491        jobs_processed = []
1492        for job in qstatparser.joblist:
1493            job_id = job["number"]
1494            if job["status"] in [ 'Q', 'R' ]:
1495                jobs_processed.append(job_id)
1496            if job["status"] == "R":
1497                job["nodes"] = do_nodelist (job["nodes"])
1498                # Fixme: why is job["nodes"] sometimes null?
1499                try:
1500                    # Fixme: Is this sensible?  The
1501                    # PBS-type PPN isn't something you use
1502                    # with SGE.
1503                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
1504                except:
1505                    job["ppn"] = 0
1506                if DETECT_TIME_DIFFS:
1507                    # If a job start is later than our
1508                    # current date, that must mean
1509                    # the SGE server's time is later
1510                    # than our local time.
1511                    start_timestamp = int (job["start_timestamp"])
1512                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
1513
1514                        self.timeoffset    = start_timestamp - int(self.cur_time)
1515            else:
1516                # fixme: Note sure what this should be:
1517                job["ppn"] = job["RN_max"]
1518                job["nodes"] = "1"
1519
1520            myAttrs = {}
1521            for attr in ["name", "queue", "owner",
1522                     "requested_time", "status",
1523                     "requested_memory", "ppn",
1524                     "start_timestamp", "queued_timestamp"]:
1525                myAttrs[attr] = str(job[attr])
1526            myAttrs["nodes"] = job["nodes"]
1527            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
1528            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1529            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
1530
1531            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
1532                self.jobs[job_id] = myAttrs
1533        for id, attrs in self.jobs.items():
1534            if id not in jobs_processed:
1535                del self.jobs[id]
1536
1537# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1538# Requres LSFObject http://sourceforge.net/projects/lsfobject
1539#
1540class LsfDataGatherer(DataGatherer):
1541
1542    """This is the DataGatherer for LSf"""
1543
1544    global lsfObject
1545
1546    def __init__( self ):
1547
1548        self.jobs = { }
1549        self.timeoffset = 0
1550        self.dp = DataProcessor()
1551        self.initLsfQuery()
1552
1553    def _countDuplicatesInList( self, dupedList ):
1554
1555        countDupes    = { }
1556
1557        for item in dupedList:
1558
1559            if not countDupes.has_key( item ):
1560
1561                countDupes[ item ]    = 1
1562            else:
1563                countDupes[ item ]    = countDupes[ item ] + 1
1564
1565        dupeCountList    = [ ]
1566
1567        for item, count in countDupes.items():
1568
1569            dupeCountList.append( ( item, count ) )
1570
1571        return dupeCountList
1572#
1573#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1574#print _countDuplicatesInList(lst)
1575#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1576########################
1577
1578    def initLsfQuery( self ):
1579        self.pq = None
1580        self.pq = lsfObject.jobInfoEntObject()
1581
1582    def getJobData( self, known_jobs="" ):
1583        """Gather all data on current jobs in LSF"""
1584        if len( known_jobs ) > 0:
1585            jobs = known_jobs
1586        else:
1587            jobs = { }
1588        joblist = {}
1589        joblist = self.pq.getJobInfo()
1590        nodelist = ''
1591
1592        self.cur_time = time.time()
1593
1594        jobs_processed = [ ]
1595
1596        for name, attrs in joblist.items():
1597            job_id = str(name)
1598            jobs_processed.append( job_id )
1599            name = self.getAttr( attrs, 'jobName' )
1600            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1601            owner = self.getAttr( attrs, 'user' )
1602
1603### THIS IS THE rLimit List index values
1604#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1605#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1606#define LSF_RLIMIT_DATA     2        /* data size */
1607#define LSF_RLIMIT_STACK    3        /* stack size */
1608#define LSF_RLIMIT_CORE     4        /* core file size */
1609#define LSF_RLIMIT_RSS      5        /* resident set size */
1610#define LSF_RLIMIT_NOFILE   6        /* open files */
1611#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1612#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
1613#define LSF_RLIMIT_SWAP     8
1614#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1615#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1616#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1617#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
1618
1619            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1620            if requested_time == -1: 
1621                requested_time = ""
1622            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1623            if requested_memory == -1: 
1624                requested_memory = ""
1625# This tries to get proc per node. We don't support this right now
1626            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1627            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1628            if requested_cpus == None or requested_cpus == "":
1629                requested_cpus = 1
1630
1631            if QUEUE:
1632                for q in QUEUE:
1633                    if q == queue:
1634                        display_queue = 1
1635                        break
1636                    else:
1637                        display_queue = 0
1638                        continue
1639            if display_queue == 0:
1640                continue
1641
1642            runState = self.getAttr( attrs, 'status' )
1643            if runState == 4:
1644                status = 'R'
1645            else:
1646                status = 'Q'
1647            queued_timestamp = self.getAttr( attrs, 'submitTime' )
1648
1649            if status == 'R':
1650                start_timestamp = self.getAttr( attrs, 'startTime' )
1651                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1652                nodelist = nodesCpu.keys()
1653
1654                if DETECT_TIME_DIFFS:
1655
1656                    # If a job start if later than our current date,
1657                    # that must mean the Torque server's time is later
1658                    # than our local time.
1659
1660                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
1661
1662                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1663
1664            elif status == 'Q':
1665                start_timestamp = ''
1666                count_mynodes = 0
1667                numeric_node = 1
1668                nodelist = ''
1669
1670            myAttrs = { }
1671            if name == "":
1672                myAttrs['name'] = "none"
1673            else:
1674                myAttrs['name'] = name
1675
1676            myAttrs[ 'owner' ]        = owner
1677            myAttrs[ 'requested_time' ]    = str(requested_time)
1678            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1679            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1680            myAttrs[ 'ppn' ]        = str( ppn )
1681            myAttrs[ 'status' ]        = status
1682            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1683            myAttrs[ 'queue' ]        = str(queue)
1684            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1685            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1686            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1687            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1688            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
1689
1690            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1691                jobs[ job_id ] = myAttrs
1692
1693                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
1694
1695        for id, attrs in jobs.items():
1696            if id not in jobs_processed:
1697                # This one isn't there anymore
1698                #
1699                del jobs[ id ]
1700        self.jobs=jobs
1701
1702
1703class PbsDataGatherer( DataGatherer ):
1704
1705    """This is the DataGatherer for PBS and Torque"""
1706
1707    global PBSQuery, PBSError
1708
1709    def __init__( self ):
1710
1711        """Setup appropriate variables"""
1712
1713        self.jobs       = { }
1714        self.timeoffset = 0
1715        self.dp         = DataProcessor()
1716
1717        self.initPbsQuery()
1718
1719    def initPbsQuery( self ):
1720
1721        self.pq = None
1722
1723        try:
1724
1725            if( BATCH_SERVER ):
1726
1727                self.pq = PBSQuery( BATCH_SERVER )
1728            else:
1729                self.pq = PBSQuery()
1730
1731        except PBSError, details:
1732            print 'Cannot connect to pbs server'
1733            print details
1734            sys.exit( 1 )
1735
1736        try:
1737            # TODO: actually use new data structure
1738            self.pq.old_data_structure()
1739
1740        except AttributeError:
1741
1742            # pbs_query is older
1743            #
1744            pass
1745
1746    def getNodeData( self ):
1747
1748        nodedict = { }
1749
1750        try:
1751            nodedict = self.pq.getnodes()
1752
1753        except PBSError, detail:
1754
1755            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1756
1757        return nodedict
1758
1759    def getJobData( self ):
1760
1761        """Gather all data on current jobs in Torque"""
1762
1763        joblist            = {}
1764        self.cur_time      = 0
1765
1766        try:
1767            joblist        = self.pq.getjobs()
1768            self.cur_time  = time.time()
1769
1770        except PBSError, detail:
1771
1772            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1773            return None
1774
1775        jobs_processed    = [ ]
1776
1777        for name, attrs in joblist.items():
1778            display_queue = 1
1779            job_id        = name.split( '.' )[0]
1780
1781            name          = self.getAttr( attrs, 'Job_Name' )
1782            queue         = self.getAttr( attrs, 'queue' )
1783
1784            if QUEUE:
1785                for q in QUEUE:
1786                    if q == queue:
1787                        display_queue = 1
1788                        break
1789                    else:
1790                        display_queue = 0
1791                        continue
1792            if display_queue == 0:
1793                continue
1794
1795
1796            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
1797            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1798            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
1799
1800            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
1801
1802            ppn = ''
1803
1804            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
1805
1806                mynoderequest_fields = mynoderequest.split( ':' )
1807
1808                for mynoderequest_field in mynoderequest_fields:
1809
1810                    if mynoderequest_field.find( 'ppn' ) != -1:
1811
1812                        ppn = mynoderequest_field.split( 'ppn=' )[1]
1813
1814            status = self.getAttr( attrs, 'job_state' )
1815
1816            if status in [ 'Q', 'R' ]:
1817
1818                jobs_processed.append( job_id )
1819
1820            queued_timestamp = self.getAttr( attrs, 'ctime' )
1821
1822            if status == 'R':
1823
1824                start_timestamp = self.getAttr( attrs, 'mtime' )
1825                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
1826
1827                nodeslist       = do_nodelist( nodes )
1828
1829                if DETECT_TIME_DIFFS:
1830
1831                    # If a job start if later than our current date,
1832                    # that must mean the Torque server's time is later
1833                    # than our local time.
1834               
1835                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1836
1837                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1838
1839            elif status == 'Q':
1840
1841                # 'mynodequest' can be a string in the following syntax according to the
1842                # Torque Administator's manual:
1843                #
1844                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1845                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1846                # etc
1847                #
1848
1849                #
1850                # For now we only count the amount of nodes request and ignore properties
1851                #
1852
1853                start_timestamp = ''
1854                count_mynodes   = 0
1855
1856                for node in mynoderequest.split( '+' ):
1857
1858                    # Just grab the {node_count|hostname} part and ignore properties
1859                    #
1860                    nodepart     = node.split( ':' )[0]
1861
1862                    # Let's assume a node_count value
1863                    #
1864                    numeric_node = 1
1865
1866                    # Chop the value up into characters
1867                    #
1868                    for letter in nodepart:
1869
1870                        # If this char is not a digit (0-9), this must be a hostname
1871                        #
1872                        if letter not in string.digits:
1873
1874                            numeric_node = 0
1875
1876                    # If this is a hostname, just count this as one (1) node
1877                    #
1878                    if not numeric_node:
1879
1880                        count_mynodes = count_mynodes + 1
1881                    else:
1882
1883                        # If this a number, it must be the node_count
1884                        # and increase our count with it's value
1885                        #
1886                        try:
1887                            count_mynodes = count_mynodes + int( nodepart )
1888
1889                        except ValueError, detail:
1890
1891                            # When we arrive here I must be bugged or very confused
1892                            # THIS SHOULD NOT HAPPEN!
1893                            #
1894                            debug_msg( 10, str( detail ) )
1895                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1896                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1897                            debug_msg( 10, 'job = ' + str( name ) )
1898                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1899                       
1900                nodeslist       = str( count_mynodes )
1901            else:
1902                start_timestamp = ''
1903                nodeslist       = ''
1904
1905            myAttrs                = { }
1906
1907            myAttrs[ 'name' ]             = str( name )
1908            myAttrs[ 'queue' ]            = str( queue )
1909            myAttrs[ 'owner' ]            = str( owner )
1910            myAttrs[ 'requested_time' ]   = str( requested_time )
1911            myAttrs[ 'requested_memory' ] = str( requested_memory )
1912            myAttrs[ 'ppn' ]              = str( ppn )
1913            myAttrs[ 'status' ]           = str( status )
1914            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1915            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1916            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1917            myAttrs[ 'nodes' ]            = nodeslist
1918            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1919            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1920
1921            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1922
1923                self.jobs[ job_id ] = myAttrs
1924
1925        for id, attrs in self.jobs.items():
1926
1927            if id not in jobs_processed:
1928
1929                # This one isn't there anymore; toedeledoki!
1930                #
1931                del self.jobs[ id ]
1932
1933GMETRIC_DEFAULT_TYPE    = 'string'
1934GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1935GMETRIC_DEFAULT_PORT    = '8649'
1936GMETRIC_DEFAULT_UNITS   = ''
1937
1938class Gmetric:
1939
1940    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1941
1942    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1943    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1944    protocol        = ( 'udp', 'multicast' )
1945
1946    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1947               
1948        global GMETRIC_DEFAULT_TYPE
1949
1950        self.prot       = self.checkHostProtocol( host )
1951        self.data_msg   = xdrlib.Packer()
1952        self.meta_msg   = xdrlib.Packer()
1953        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1954
1955        if self.prot not in self.protocol:
1956
1957            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1958
1959        if self.prot == 'multicast':
1960
1961            # Set multicast options
1962            #
1963            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1964
1965        self.hostport   = ( host, int( port ) )
1966        self.slopestr   = 'both'
1967        self.tmax       = 60
1968
1969    def checkHostProtocol( self, ip ):
1970
1971        """Detect if a ip adress is a multicast address"""
1972
1973        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1974        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1975
1976        ip_fields               = ip.split( '.' )
1977
1978        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1979
1980            return 'multicast'
1981        else:
1982            return 'udp'
1983
1984    def send( self, name, value, dmax, typestr = '', units = '' ):
1985
1986        if len( units ) == 0:
1987            units       = GMETRIC_DEFAULT_UNITS
1988
1989        if len( typestr ) == 0:
1990            typestr     = GMETRIC_DEFAULT_TYPE
1991
1992        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
1993
1994        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1995        data_rt = self.socket.sendto( data_msg, self.hostport )
1996
1997        return ( meta_rt, data_rt )
1998
1999    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
2000
2001        hostname = "unset"
2002
2003        if slopestr not in self.slope:
2004
2005            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
2006
2007        if typestr not in self.type:
2008
2009            raise ValueError( "Type must be one of: " + str( self.type ) )
2010
2011        if len( name ) == 0:
2012
2013            raise ValueError( "Name must be non-empty" )
2014
2015        self.meta_msg.reset()
2016        self.meta_msg.pack_int( 128 )
2017
2018        if not spoof:
2019            self.meta_msg.pack_string( hostname )
2020        else:
2021            self.meta_msg.pack_string( spoof )
2022
2023        self.meta_msg.pack_string( name )
2024
2025        if not spoof:
2026            self.meta_msg.pack_int( 0 )
2027        else:
2028            self.meta_msg.pack_int( 1 )
2029           
2030        self.meta_msg.pack_string( typestr )
2031        self.meta_msg.pack_string( name )
2032        self.meta_msg.pack_string( unitstr )
2033        self.meta_msg.pack_int( self.slope[ slopestr ] )
2034        self.meta_msg.pack_uint( int( tmax ) )
2035        self.meta_msg.pack_uint( int( dmax ) )
2036
2037        if not group:
2038            self.meta_msg.pack_int( 0 )
2039        else:
2040            self.meta_msg.pack_int( 1 )
2041            self.meta_msg.pack_string( "GROUP" )
2042            self.meta_msg.pack_string( group )
2043
2044        self.data_msg.reset()
2045        self.data_msg.pack_int( 128+5 )
2046
2047        if not spoof:
2048            self.data_msg.pack_string( hostname )
2049        else:
2050            self.data_msg.pack_string( spoof )
2051
2052        self.data_msg.pack_string( name )
2053
2054        if not spoof:
2055            self.data_msg.pack_int( 0 )
2056        else:
2057            self.data_msg.pack_int( 1 )
2058
2059        self.data_msg.pack_string( "%s" )
2060        self.data_msg.pack_string( str( value ) )
2061
2062        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2063
2064def printTime( ):
2065
2066    """Print current time/date in human readable format for log/debug"""
2067
2068    return time.strftime("%a, %d %b %Y %H:%M:%S")
2069
2070def debug_msg( level, msg ):
2071
2072    """Print msg if at or above current debug level"""
2073
2074    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
2075
2076    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2077        sys.stderr.write( msg + '\n' )
2078
2079    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2080        syslog.syslog( msg )
2081
2082def write_pidfile():
2083
2084    # Write pidfile if PIDFILE is set
2085    #
2086    if PIDFILE:
2087
2088        pid    = os.getpid()
2089
2090        pidfile    = open( PIDFILE, 'w' )
2091
2092        pidfile.write( str( pid ) )
2093        pidfile.close()
2094
2095def main():
2096
2097    """Application start"""
2098
2099    global PBSQuery, PBSError, lsfObject, pyslurm
2100    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
2101
2102    if not processArgs( sys.argv[1:] ):
2103
2104        sys.exit( 1 )
2105
2106    # Load appropriate DataGatherer depending on which BATCH_API is set
2107    # and any required modules for the Gatherer
2108    #
2109    if BATCH_API == 'pbs':
2110
2111        try:
2112            from PBSQuery import PBSQuery, PBSError
2113
2114        except ImportError, details:
2115
2116            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
2117            print details
2118            sys.exit( 1 )
2119
2120        gather = PbsDataGatherer()
2121
2122    elif BATCH_API == 'sge':
2123
2124        if BATCH_SERVER != 'localhost':
2125
2126            # Print and log, but continue execution
2127            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2128            print err_msg
2129            debug_msg( 0, err_msg )
2130
2131        # Tested with SGE 6.0u11.
2132        #
2133        gather = SgeDataGatherer()
2134
2135    elif BATCH_API == 'lsf':
2136
2137        if BATCH_SERVER != 'localhost':
2138
2139            # Print and log, but continue execution
2140            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2141            print err_msg
2142            debug_msg( 0, err_msg )
2143
2144        try:
2145            from lsfObject import lsfObject
2146        except:
2147            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2148            sys.exit( 1 )
2149
2150        gather = LsfDataGatherer()
2151
2152    elif BATCH_API == 'slurm':
2153
2154        if BATCH_SERVER != 'localhost':
2155
2156            # Print and log, but continue execution
2157            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2158            print err_msg
2159            debug_msg( 0, err_msg )
2160
2161        try:
2162            import pyslurm
2163        except:
2164            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
2165            sys.exit( 1 )
2166
2167        gather = SLURMDataGatherer()
2168
2169    else:
2170        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
2171
2172        sys.exit( 1 )
2173
2174    if( DAEMONIZE and USE_SYSLOG ):
2175
2176        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2177
2178    if DAEMONIZE:
2179
2180        gather.daemon()
2181    else:
2182        gather.run()
2183
2184# wh00t? someone started me! :)
2185#
2186if __name__ == '__main__':
2187    main()
Note: See TracBrowser for help on using the repository browser.