source: branches/1.1/jobmond/jobmond.py @ 923

Last change on this file since 923 was 913, checked in by olahaye, 8 years ago

[rpm&deb packaging] Now fixes the VERSION outside current directory (can be SVN)
This avoids .in files and let generate tarballs and packages (binary and sources) without any VERSION values.
make deb or rpm or install even from svn is now safe from "sed -i -e"

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 64.6 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2013  Ramon Bastiaans
6# Copyright (C) 2007, 2009  Dave Love  (SGE code)
7#
8# Jobmonarch is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# Jobmonarch is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21#
22# SVN $Id: jobmond.py 913 2013-05-22 17:00:22Z olahaye $
23#
24
25# vi :set ts=4
26
27import sys, getopt, ConfigParser, time, os, socket, string, re
28import xdrlib, socket, syslog, xml, xml.sax, shlex, os.path, pwd
29from xml.sax.handler import feature_namespaces
30from collections import deque
31from glob import glob
32
33VERSION='__VERSION__'
34
35def usage( ver ):
36
37    print 'jobmond %s' %VERSION
38
39    if ver:
40        return 0
41
42    print
43    print 'Purpose:'
44    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
45    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
46    print
47    print 'Usage:    jobmond [OPTIONS]'
48    print
49    print '  -c, --config=FILE    The configuration file to use (default: /etc/jobmond.conf)'
50    print '  -p, --pidfile=FILE    Use pid file to store the process id'
51    print '  -h, --help        Print help and exit'
52    print '  -v, --version      Print version and exit'
53    print
54
55def processArgs( args ):
56
57    SHORT_L      = 'p:hvc:'
58    LONG_L       = [ 'help', 'config=', 'pidfile=', 'version' ]
59
60    global PIDFILE, JOBMOND_CONF
61    PIDFILE      = None
62
63    JOBMOND_CONF = '/etc/jobmond.conf'
64
65    try:
66
67        opts, args    = getopt.getopt( args, SHORT_L, LONG_L )
68
69    except getopt.GetoptError, detail:
70
71        print detail
72        usage( False )
73        sys.exit( 1 )
74
75    for opt, value in opts:
76
77        if opt in [ '--config', '-c' ]:
78       
79            JOBMOND_CONF = value
80
81        if opt in [ '--pidfile', '-p' ]:
82
83            PIDFILE      = value
84       
85        if opt in [ '--help', '-h' ]:
86 
87            usage( False )
88            sys.exit( 0 )
89
90        if opt in [ '--version', '-v' ]:
91
92            usage( True )
93            sys.exit( 0 )
94
95    return loadConfig( JOBMOND_CONF )
96
97class GangliaConfigParser:
98
99    def __init__( self, filename ):
100
101        self.conf_lijst   = [ ]
102        self.conf_dict    = { }
103        self.filename     = filename
104        self.file_pointer = file( filename, 'r' )
105        self.lexx         = shlex.shlex( self.file_pointer )
106        self.lexx.whitespace_split = True
107
108        self.parse()
109
110    def __del__( self ):
111
112        """
113        Cleanup: close file descriptor
114        """
115
116        self.file_pointer.close()
117        del self.lexx
118        del self.conf_lijst
119
120    def removeQuotes( self, value ):
121
122        clean_value = value
123        clean_value = clean_value.replace( "'", "" )
124        clean_value = clean_value.replace( '"', '' )
125        clean_value = clean_value.strip()
126
127        return clean_value
128
129    def removeBraces( self, value ):
130
131        clean_value = value
132        clean_value = clean_value.replace( "(", "" )
133        clean_value = clean_value.replace( ')', '' )
134        clean_value = clean_value.strip()
135
136        return clean_value
137
138    def parse( self ):
139
140        """
141        Parse self.filename using shlex scanning.
142        - Removes /* comments */
143        - Traverses (recursively) through all include () statements
144        - Stores complete valid config tokens in self.conf_list
145
146        i.e.:
147            ['globals',
148             '{',
149             'daemonize',
150             '=',
151             'yes',
152             'setuid',
153             '=',
154             'yes',
155             'user',
156             '=',
157             'ganglia',
158             'debug_level',
159             '=',
160             '0',
161             <etc> ]
162        """
163
164        t = 'bogus'
165        c = False
166        i = False
167
168        while t != self.lexx.eof:
169            #print 'get token'
170            t = self.lexx.get_token()
171
172            if len( t ) >= 2:
173
174                if len( t ) >= 4:
175
176                    if t[:2] == '/*' and t[-2:] == '*/':
177
178                        #print 'comment line'
179                        #print 'skipping: %s' %t
180                        continue
181
182                if t == '/*' or t[:2] == '/*':
183                    c = True
184                    #print 'comment start'
185                    #print 'skipping: %s' %t
186                    continue
187
188                if t == '*/' or t[-2:] == '*/':
189                    c = False
190                    #print 'skipping: %s' %t
191                    #print 'comment end'
192                    continue
193
194            if c:
195                #print 'skipping: %s' %t
196                continue
197
198            if t == 'include':
199                i = True
200                #print 'include start'
201                #print 'skipping: %s' %t
202                continue
203
204            if i:
205
206                #print 'include start: %s' %t
207
208                t2 = self.removeQuotes( t )
209                t2 = self.removeBraces( t )
210
211                for in_file in glob( self.removeQuotes(t2) ):
212
213                    #print 'including file: %s' %in_file
214                    parse_infile = GangliaConfigParser( in_file )
215
216                    self.conf_lijst = self.conf_lijst + parse_infile.getConfLijst()
217
218                    del parse_infile
219
220                i = False
221                #print 'include end'
222                #print 'skipping: %s' %t
223                continue
224
225            #print 'keep: %s' %t
226            self.conf_lijst.append( self.removeQuotes(t) )
227
228    def getConfLijst( self ):
229
230        return self.conf_lijst
231
232    def confListToDict( self, parent_list=None ):
233
234        """
235        Recursively traverses a conf_list and creates dictionary from it
236        """
237
238        new_dict = { }
239        count    = 0
240        skip     = 0
241
242        if not parent_list:
243            parent_list = self.conf_lijst
244
245        #print 'entering confListToDict(): (parent) list size %s' %len(parent_list)
246
247        for n, c in enumerate( parent_list ):
248
249            count = count + 1
250
251            #print 'CL: n %d c %s' %(n, c)
252
253            if skip > 0:
254
255                #print '- skipped'
256                skip = skip - 1
257                continue
258
259            if (n+1) <= (len( parent_list )-1):
260
261                if parent_list[(n+1)] == '{':
262
263                    if not new_dict.has_key( c ):
264                        new_dict[ c ] = [ ]
265
266                    (temp_new_dict, skip) = self.confListToDict( parent_list[(n+2):] )
267                    new_dict[ c ].append( temp_new_dict )
268
269                if parent_list[(n+1)] == '=' and (n+2) <= (len( parent_list )-1):
270
271                    if not new_dict.has_key( c ):
272                        new_dict[ c ] = [ ]
273
274                    new_dict[ c ].append( parent_list[ (n+2) ] )
275
276                    skip = 2
277
278                if parent_list[n] == '}':
279
280                    #print 'leaving confListToDict(): new dict = %s' %new_dict
281                    return (new_dict, count)
282
283    def makeConfDict( self ):
284
285        """
286        Walks through self.conf_list and creates a dictionary based upon config values
287
288        i.e.:
289            'tcp_accept_channel': [{'acl': [{'access': [{'action': ['"allow"'],
290                                                         'ip': ['"127.0.0.1"'],
291                                                         'mask': ['32']}]}],
292                                    'port': ['8649']}],
293            'udp_recv_channel': [{'port': ['8649']}],
294            'udp_send_channel': [{'host': ['145.101.32.3'],
295                                  'port': ['8649']},
296                                 {'host': ['145.101.32.207'],
297                                  'port': ['8649']}]}
298        """
299
300        new_dict = { }
301        skip     = 0
302
303        #print 'entering makeConfDict()'
304
305        for n, c in enumerate( self.conf_lijst ):
306
307            #print 'M: n %d c %s' %(n, c)
308
309            if skip > 0:
310
311                #print '- skipped'
312                skip = skip - 1
313                continue
314
315            if (n+1) <= (len( self.conf_lijst )-1):
316
317                if self.conf_lijst[(n+1)] == '{':
318
319                    if not new_dict.has_key( c ):
320                        new_dict[ c ] = [ ]
321
322                    ( temp_new_dict, skip ) = self.confListToDict( self.conf_lijst[(n+2):] )
323                    new_dict[ c ].append( temp_new_dict )
324
325                if self.conf_lijst[(n+1)] == '=' and (n+2) <= (len( self.conf_lijst )-1):
326
327                    if not new_dict.has_key( c ):
328                        new_dict[ c ] = [ ]
329
330                    new_dict[ c ].append( self.conf_lijst[ (n+2) ] )
331
332                    skip = 2
333
334        self.conf_dict = new_dict
335        #print 'leaving makeConfDict(): conf dict size %d' %len( self.conf_dict )
336
337    def checkConfDict( self ):
338
339        if len( self.conf_lijst ) == 0:
340
341            raise Exception("Something went wrong generating conf list for %s" %self.file_name )
342
343        if len( self.conf_dict ) == 0:
344
345            self.makeConfDict()
346
347    def getConfDict( self ):
348
349        self.checkConfDict()
350        return self.conf_dict
351
352    def getUdpSendChannels( self ):
353
354        self.checkConfDict()
355
356        udp_send_channels = [ ] # IP:PORT
357
358        if not self.conf_dict.has_key( 'udp_send_channel' ):
359            return None
360
361        for u in self.conf_dict[ 'udp_send_channel' ]:
362
363            if u.has_key( 'mcast_join' ):
364
365                ip = u['mcast_join'][0]
366
367            elif u.has_key( 'host' ):
368
369                ip = u['host'][0]
370
371            port = u['port'][0]
372
373            udp_send_channels.append( ( ip, port ) )
374
375        if len( udp_send_channels ) == 0:
376            return None
377
378        return udp_send_channels
379
380    def getSectionLastOption( self, section, option ):
381
382        """
383        Get last option set in a config section that could be set multiple times in multiple (include) files.
384
385        i.e.: getSectionLastOption( 'globals', 'send_metadata_interval' )
386        """
387
388        self.checkConfDict()
389        value = None
390
391        if not self.conf_dict.has_key( section ):
392
393            return None
394
395        # Could be set multiple times in multiple (include) files: get last one set
396        for c in self.conf_dict[ section ]:
397
398                if c.has_key( option ):
399
400                    value = c[ option ][0]
401
402        return value
403
404    def getClusterName( self ):
405
406        return self.getSectionLastOption( 'cluster', 'name' )
407
408    def getVal( self, section, option ):
409
410        return self.getSectionLastOption( section, option )
411
412    def getInt( self, section, valname ):
413
414        value    = self.getVal( section, valname )
415
416        if not value:
417            return None
418
419        return int( value )
420
421    def getStr( self, section, valname ):
422
423        value    = self.getVal( section, valname )
424
425        if not value:
426            return None
427
428        return str( value )
429
430def findGmetric():
431
432    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
433
434        guess    = '%s/%s' %( dir, 'gmetric' )
435
436        if os.path.exists( guess ):
437
438            return guess
439
440    return False
441
442def loadConfig( filename ):
443
444    def getlist( cfg_string ):
445
446        my_list = [ ]
447
448        for item_txt in cfg_string.split( ',' ):
449
450            sep_char = None
451
452            item_txt = item_txt.strip()
453
454            for s_char in [ "'", '"' ]:
455
456                if item_txt.find( s_char ) != -1:
457
458                    if item_txt.count( s_char ) != 2:
459
460                        print 'Missing quote: %s' %item_txt
461                        sys.exit( 1 )
462
463                    else:
464
465                        sep_char = s_char
466                        break
467
468            if sep_char:
469
470                item_txt = item_txt.split( sep_char )[1]
471
472            my_list.append( item_txt )
473
474        return my_list
475
476    if not os.path.isfile( JOBMOND_CONF ):
477
478        print "Is not a file or does not exist: '%s'" %JOBMOND_CONF
479        sys.exit( 1 )
480
481    try:
482        f = open( JOBMOND_CONF, 'r' )
483    except IOError, detail:
484        print "Cannot read config file: '%s'" %JOBMOND_CONF
485        sys.exit( 1 )
486    else:
487        f.close()
488
489    cfg        = ConfigParser.ConfigParser()
490
491    cfg.read( filename )
492
493    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
494    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
495    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
496    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
497    global METRIC_MAX_VAL_LEN, GMOND_UDP_SEND_CHANNELS
498
499    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
500
501    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
502
503    SYSLOG_LEVEL    = -1
504    SYSLOG_FACILITY = None
505
506    try:
507        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
508
509    except ConfigParser.NoOptionError:
510
511        USE_SYSLOG  = True
512
513        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
514
515    if USE_SYSLOG:
516
517        try:
518            SYSLOG_LEVEL = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
519
520        except ConfigParser.NoOptionError:
521
522            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
523            SYSLOG_LEVEL = 0
524
525        try:
526
527            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
528
529        except ConfigParser.NoOptionError:
530
531            SYSLOG_FACILITY = syslog.LOG_DAEMON
532
533            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
534
535    try:
536
537        BATCH_SERVER = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
538
539    except ConfigParser.NoOptionError:
540
541        # Not required for all API's: only pbs api allows remote connections
542        BATCH_SERVER = None
543
544    try:
545   
546        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
547
548    except ConfigParser.NoOptionError:
549
550        # Backwards compatibility for old configs
551        #
552
553        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
554        api_guess           = 'pbs'
555   
556    try:
557
558        GMOND_CONF          = cfg.get( 'DEFAULT', 'GMOND_CONF' )
559
560    except ConfigParser.NoOptionError:
561
562        # Not specified: assume /etc/ganglia/gmond.conf
563        #
564        GMOND_CONF          = '/etc/ganglia/gmond.conf'
565
566    ganglia_cfg             = GangliaConfigParser( GMOND_CONF )
567    GMETRIC_TARGET          = None
568
569    GMOND_UDP_SEND_CHANNELS = ganglia_cfg.getUdpSendChannels()
570
571    if not GMOND_UDP_SEND_CHANNELS:
572
573        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s' - Trying: %s" %( GMOND_CONF, JOBMOND_CONF ) )
574
575        # Couldn't figure it out: let's see if it's in our jobmond.conf
576        #
577        try:
578
579            GMETRIC_TARGET    = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
580
581        # Guess not: now just give up
582       
583        except ConfigParser.NoOptionError:
584
585            GMETRIC_TARGET    = None
586
587            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
588
589            gmetric_bin    = findGmetric()
590
591            if gmetric_bin:
592
593                GMETRIC_BINARY     = gmetric_bin
594            else:
595                debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
596
597                try:
598
599                    GMETRIC_BINARY = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
600
601                except ConfigParser.NoOptionError:
602
603                    print "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH"
604                    sys.exit( 1 )
605
606    #TODO: is this really still needed or should be automatic
607    DETECT_TIME_DIFFS    = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
608
609    BATCH_HOST_TRANSLATE = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
610
611    try:
612
613        BATCH_API    = cfg.get( 'DEFAULT', 'BATCH_API' )
614
615    except ConfigParser.NoOptionError, detail:
616
617        print "FATAL ERROR: BATCH_API not set"
618        sys.exit( 1 )
619
620    try:
621
622        QUEUE        = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
623
624    except ConfigParser.NoOptionError, detail:
625
626        QUEUE        = None
627
628    METRIC_MAX_VAL_LEN = ganglia_cfg.getInt( 'globals', 'max_udp_msg_len' )
629
630    return True
631
632def fqdn_parts (fqdn):
633
634    """Return pair of host and domain for fully-qualified domain name arg."""
635
636    parts = fqdn.split (".")
637
638    return (parts[0], string.join(parts[1:], "."))
639
640class DataProcessor:
641
642    """Class for processing of data"""
643
644    binary = None
645
646    def __init__( self, binary=None ):
647
648        """Remember alternate binary location if supplied"""
649
650        global GMETRIC_BINARY, GMOND_CONF
651
652        if binary:
653            self.binary = binary
654
655        if not self.binary and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
656            self.binary = GMETRIC_BINARY
657
658        # Timeout for XML
659        #
660        # From ganglia's documentation:
661        #
662        # 'A metric will be deleted DMAX seconds after it is received, and
663        # DMAX=0 means eternal life.'
664
665        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
666
667        if GMOND_CONF and not GMETRIC_TARGET and not GMOND_UDP_SEND_CHANNELS:
668
669            incompatible = self.checkGmetricVersion()
670
671            if incompatible:
672
673                print 'Ganglia/Gmetric version not compatible, please upgrade to at least 3.3.8'
674                sys.exit( 1 )
675
676    def checkGmetricVersion( self ):
677
678        """
679        Check version of gmetric is at least 3.3.8
680        for the syntax we use
681        """
682
683        global METRIC_MAX_VAL_LEN, GMETRIC_TARGET
684
685        incompatible    = 0
686
687        gfp        = os.popen( self.binary + ' --version' )
688        lines      = gfp.readlines()
689
690        gfp.close()
691
692        for line in lines:
693
694            line = line.split( ' ' )
695
696            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
697           
698                gmetric_version    = line[1].split( '\n' )[0]
699
700                version_major    = int( gmetric_version.split( '.' )[0] )
701                version_minor    = int( gmetric_version.split( '.' )[1] )
702                version_patch    = int( gmetric_version.split( '.' )[2] )
703
704                incompatible    = 0
705
706                if version_major < 3:
707
708                    incompatible = 1
709               
710                elif version_major == 3:
711
712                    if version_minor < 3:
713
714                        incompatible = 1
715
716                    elif version_patch < 8:
717
718                        incompatible = 1
719
720        return incompatible
721
722    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
723
724        """Call gmetric binary and multicast"""
725
726        cmd = self.binary
727
728        if GMOND_UDP_SEND_CHANNELS:
729
730            for c_ip, c_port  in GMOND_UDP_SEND_CHANNELS:
731
732                metric_debug        = "[gmetric %s:%s] name: %s - val: %s - dmax: %s" %( str(c_ip), str(c_port), str( metricname ), str( metricval ), str( self.dmax ) )
733
734                debug_msg( 10, printTime() + ' ' + metric_debug)
735
736                gm = Gmetric( c_ip, c_port )
737
738                gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
739
740        elif GMETRIC_TARGET:
741
742            GMETRIC_TARGET_HOST    = GMETRIC_TARGET.split( ':' )[0]
743            GMETRIC_TARGET_PORT    = GMETRIC_TARGET.split( ':' )[1]
744
745            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
746
747            debug_msg( 10, printTime() + ' ' + metric_debug)
748
749            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
750
751            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
752
753        else:
754            try:
755                cmd = cmd + ' -c' + GMOND_CONF
756
757            except NameError:
758
759                debug_msg( 10, 'Assuming /etc/ganglia/gmond.conf for gmetric cmd' )
760
761            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
762
763            if len( units ) > 0:
764
765                cmd = cmd + ' -u"' + units + '"'
766
767            debug_msg( 10, printTime() + ' ' + cmd )
768
769            os.system( cmd )
770
771class DataGatherer:
772
773    """Skeleton class for batch system DataGatherer"""
774
775    def printJobs( self, jobs ):
776
777        """Print a jobinfo overview"""
778
779        for name, attrs in self.jobs.items():
780
781            print 'job %s' %(name)
782
783            for name, val in attrs.items():
784
785                print '\t%s = %s' %( name, val )
786
787    def printJob( self, jobs, job_id ):
788
789        """Print job with job_id from jobs"""
790
791        print 'job %s' %(job_id)
792
793        for name, val in jobs[ job_id ].items():
794
795            print '\t%s = %s' %( name, val )
796
797    def getAttr( self, attrs, name ):
798
799        """Return certain attribute from dictionary, if exists"""
800
801        if attrs.has_key( name ):
802
803            return attrs[ name ]
804        else:
805            return ''
806
807    def jobDataChanged( self, jobs, job_id, attrs ):
808
809        """Check if job with attrs and job_id in jobs has changed"""
810
811        if jobs.has_key( job_id ):
812
813            oldData = jobs[ job_id ]   
814        else:
815            return 1
816
817        for name, val in attrs.items():
818
819            if oldData.has_key( name ):
820
821                if oldData[ name ] != attrs[ name ]:
822
823                    return 1
824
825            else:
826                return 1
827
828        return 0
829
830    def submitJobData( self ):
831
832        """Submit job info list"""
833
834        global BATCH_API
835
836        self.dp.multicastGmetric( 'zplugin_monarch_heartbeat', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
837
838        running_jobs = 0
839        queued_jobs  = 0
840
841        # Count how many running/queued jobs we found
842        #
843        for jobid, jobattrs in self.jobs.items():
844
845            if jobattrs[ 'status' ] == 'Q':
846
847                queued_jobs += 1
848
849            elif jobattrs[ 'status' ] == 'R':
850
851                running_jobs += 1
852
853        # Report running/queued jobs as seperate metric for a nice RRD graph
854        #
855        self.dp.multicastGmetric( 'zplugin_monarch_rj', str( running_jobs ), 'uint32', 'jobs' )
856        self.dp.multicastGmetric( 'zplugin_monarch_qj', str( queued_jobs ), 'uint32', 'jobs' )
857
858        # Report down/offline nodes in batch (PBS only ATM)
859        #
860        if BATCH_API in [ 'pbs', 'slurm' ]:
861
862            domain        = fqdn_parts( socket.getfqdn() )[1]
863
864            downed_nodes  = list()
865            offline_nodes = list()
866       
867            l        = ['state']
868
869            nodelist = self.getNodeData()
870
871            for name, node in nodelist.items():
872
873                if ( node[ 'state' ].find( "down" ) != -1 ):
874
875                    downed_nodes.append( name )
876
877                if ( node[ 'state' ].find( "offline" ) != -1 ):
878
879                    offline_nodes.append( name )
880
881            downnodeslist    = do_nodelist( downed_nodes )
882            offlinenodeslist = do_nodelist( offline_nodes )
883
884            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
885            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
886            self.dp.multicastGmetric( 'zplugin_monarch_down'   , down_str )
887            self.dp.multicastGmetric( 'zplugin_monarch_offline', offl_str )
888
889        # Now let's spread the knowledge
890        #
891        for jobid, jobattrs in self.jobs.items():
892
893            # Make gmetric values for each job: respect max gmetric value length
894            #
895            gmetric_val        = self.compileGmetricVal( jobid, jobattrs )
896            metric_increment    = 0
897
898            # If we have more job info than max gmetric value length allows, split it up
899            # amongst multiple metrics
900            #
901            for val in gmetric_val:
902
903                metric_name = 'zplugin_monarch_job_%s_%s' %( str(metric_increment) , str( jobid ) )
904                self.dp.multicastGmetric( metric_name, val )
905
906                # Increase follow number if this jobinfo is split up amongst more than 1 gmetric
907                #
908                metric_increment    = metric_increment + 1
909
910    def compileGmetricVal( self, jobid, jobattrs ):
911
912        """Create a val string for gmetric of jobinfo"""
913
914        gval_lists    = [ ]
915        val_list    = { }
916
917        for val_name, val_value in jobattrs.items():
918
919            # These are our own metric names, i.e.: status, start_timestamp, etc
920            #
921            val_list_names_len    = len( string.join( val_list.keys() ) ) + len(val_list.keys())
922
923            # These are their corresponding values
924            #
925            val_list_vals_len    = len( string.join( val_list.values() ) ) + len(val_list.values())
926
927            if val_name == 'nodes' and jobattrs['status'] == 'R':
928
929                node_str = None
930
931                for node in val_value:
932
933                    if node_str:
934
935                        node_str = node_str + ';' + node
936                    else:
937                        node_str = node
938
939                    # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
940                    #
941                    if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:
942
943                        # It's too big, we need to make a new gmetric for the additional info
944                        #
945                        val_list[ val_name ]    = node_str
946
947                        gval_lists.append( val_list )
948
949                        val_list        = { }
950                        node_str        = None
951
952                val_list[ val_name ]    = node_str
953
954                gval_lists.append( val_list )
955
956                val_list        = { }
957
958            elif val_value != '':
959
960                # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN
961                #
962                if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:
963
964                    # It's too big, we need to make a new gmetric for the additional info
965                    #
966                    gval_lists.append( val_list )
967
968                    val_list        = { }
969
970                val_list[ val_name ]    = val_value
971
972        if len( val_list ) > 0:
973
974            gval_lists.append( val_list )
975
976        str_list    = [ ]
977
978        # Now append the value names and values together, i.e.: stop_timestamp=value, etc
979        #
980        for val_list in gval_lists:
981
982            my_val_str    = None
983
984            for val_name, val_value in val_list.items():
985
986                if type(val_value) == list:
987
988                    val_value    = val_value.join( ',' )
989
990                if my_val_str:
991
992                    try:
993                        # fixme: It's getting
994                        # ('nodes', None) items
995                        my_val_str = my_val_str + ' ' + val_name + '=' + val_value
996                    except:
997                        pass
998
999                else:
1000                    my_val_str = val_name + '=' + val_value
1001
1002            str_list.append( my_val_str )
1003
1004        return str_list
1005
1006    def daemon( self ):
1007
1008        """Run as daemon forever"""
1009
1010        # Fork the first child
1011        #
1012        pid = os.fork()
1013        if pid > 0:
1014            sys.exit(0)  # end parent
1015
1016        # creates a session and sets the process group ID
1017        #
1018        os.setsid()
1019
1020        # Fork the second child
1021        #
1022        pid = os.fork()
1023        if pid > 0:
1024            sys.exit(0)  # end parent
1025
1026        write_pidfile()
1027
1028        # Go to the root directory and set the umask
1029        #
1030        os.chdir('/')
1031        os.umask(0)
1032
1033        sys.stdin.close()
1034        sys.stdout.close()
1035        sys.stderr.close()
1036
1037        os.open('/dev/null', os.O_RDWR)
1038        os.dup2(0, 1)
1039        os.dup2(0, 2)
1040
1041        self.run()
1042
1043    def run( self ):
1044
1045        """Main thread"""
1046
1047        while ( 1 ):
1048       
1049            self.getJobData()
1050            self.submitJobData()
1051            time.sleep( BATCH_POLL_INTERVAL )   
1052
1053# SGE code by Dave Love <fx@gnu.org>.  Tested with SGE 6.0u8 and 6.0u11.  May
1054# work with SGE 6.1 (else should be easily fixable), but definitely doesn't
1055# with 6.2.  See also the fixmes.
1056
1057class NoJobs (Exception):
1058    """Exception raised by empty job list in qstat output."""
1059    pass
1060
1061class SgeQstatXMLParser(xml.sax.handler.ContentHandler):
1062    """SAX handler for XML output from Sun Grid Engine's `qstat'."""
1063
1064    def __init__(self):
1065        self.value = ""
1066        self.joblist = []
1067        self.job = {}
1068        self.queue = ""
1069        self.in_joblist = False
1070        self.lrequest = False
1071        self.eltq = deque()
1072        xml.sax.handler.ContentHandler.__init__(self)
1073
1074    # The structure of the output is as follows (for SGE 6.0).  It's
1075    # similar for 6.1, but radically different for 6.2, and is
1076    # undocumented generally.  Unfortunately it's voluminous, and probably
1077    # doesn't scale to large clusters/queues.
1078
1079    # <detailed_job_info  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1080    #   <djob_info>
1081    #     <qmaster_response>  <!-- job -->
1082    #       ...
1083    #       <JB_ja_template> 
1084    #     <ulong_sublist>
1085    #     ...         <!-- start_time, state ... -->
1086    #     </ulong_sublist>
1087    #       </JB_ja_template> 
1088    #       <JB_ja_tasks>
1089    #     <ulong_sublist>
1090    #       ...       <!-- task info
1091    #     </ulong_sublist>
1092    #     ...
1093    #       </JB_ja_tasks>
1094    #       ...
1095    #     </qmaster_response>
1096    #   </djob_info>
1097    #   <messages>
1098    #   ...
1099
1100    # NB.  We might treat each task as a separate job, like
1101    # straight qstat output, but the web interface expects jobs to
1102    # be identified by integers, not, say, <job number>.<task>.
1103
1104    # So, I lied.  If the job list is empty, we get invalid XML
1105    # like this, which we need to defend against:
1106
1107    # <unknown_jobs  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
1108    #   <>
1109    #     <ST_name>*</ST_name>
1110    #   </>
1111    # </unknown_jobs>
1112
1113    def startElement(self, name, attrs):
1114        self.value = ""
1115        if name == "djob_info":    # job list
1116            self.in_joblist = True
1117        # The job container is "qmaster_response" in SGE 6.0
1118        # and 6.1, but "element" in 6.2.  This is only the very
1119        # start of what's necessary for 6.2, though (sigh).
1120        elif (name == "qmaster_response" or name == "element") \
1121                and self.eltq[-1] == "djob_info": # job
1122            self.job = {"job_state": "U", "slots": 0,
1123                    "nodes": [], "queued_timestamp": "",
1124                    "queued_timestamp": "", "queue": "",
1125                    "ppn": "0", "RN_max": 0,
1126                    # fixme in endElement
1127                    "requested_memory": 0, "requested_time": 0
1128                    }
1129            self.joblist.append(self.job)
1130        elif name == "qstat_l_requests": # resource request
1131            self.lrequest = True
1132        elif name == "unknown_jobs":
1133            raise NoJobs
1134        self.eltq.append (name)
1135
1136    def characters(self, ch):
1137        self.value += ch
1138
1139    def endElement(self, name): 
1140        """Snarf job elements contents into job dictionary.
1141           Translate keys if appropriate."""
1142
1143        name_trans = {
1144          "JB_job_number": "number",
1145          "JB_job_name": "name", "JB_owner": "owner",
1146          "queue_name": "queue", "JAT_start_time": "start_timestamp",
1147          "JB_submission_time": "queued_timestamp"
1148          }
1149        value = self.value
1150        self.eltq.pop ()
1151
1152        if name == "djob_info":
1153            self.in_joblist = False
1154            self.job = {}
1155        elif name == "JAT_master_queue":
1156            self.job["queue"] = value.split("@")[0]
1157        elif name == "JG_qhostname":
1158            if not (value in self.job["nodes"]):
1159                self.job["nodes"].append(value)
1160        elif name == "JG_slots": # slots in use
1161            self.job["slots"] += int(value)
1162        elif name == "RN_max": # requested slots (tasks or parallel)
1163            self.job["RN_max"] = max (self.job["RN_max"],
1164                          int(value))
1165        elif name == "JAT_state": # job state (bitwise or)
1166            value = int (value)
1167            # Status values from sge_jobL.h
1168            #define JIDLE           0x00000000
1169            #define JHELD           0x00000010
1170            #define JMIGRATING          0x00000020
1171            #define JQUEUED         0x00000040
1172            #define JRUNNING        0x00000080
1173            #define JSUSPENDED          0x00000100
1174            #define JTRANSFERING        0x00000200
1175            #define JDELETED        0x00000400
1176            #define JWAITING        0x00000800
1177            #define JEXITING        0x00001000
1178            #define JWRITTEN        0x00002000
1179            #define JSUSPENDED_ON_THRESHOLD 0x00010000
1180            #define JFINISHED           0x00010000
1181            if value & 0x80:
1182                self.job["status"] = "R"
1183            elif value & 0x40:
1184                self.job["status"] = "Q"
1185            else:
1186                self.job["status"] = "O" # `other'
1187        elif name == "CE_name" and self.lrequest and self.value in \
1188                ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"):
1189            # We're in a container for an interesting resource
1190            # request; record which type.
1191            self.lrequest = self.value
1192        elif name == "CE_doubleval" and self.lrequest:
1193            # if we're in a container for an interesting
1194            # resource request, use the maxmimum of the hard
1195            # and soft requests to record the requested CPU
1196            # or core.  Fixme:  I'm not sure if this logic is
1197            # right.
1198            if self.lrequest in ("h_core", "s_core"):
1199                self.job["requested_memory"] = \
1200                    max (float (value),
1201                     self.job["requested_memory"])
1202            # Fixme:  Check what cpu means, c.f [hs]_cpu.
1203            elif self.lrequest in ("h_cpu", "s_cpu", "cpu"):
1204                self.job["requested_time"] = \
1205                    max (float (value),
1206                     self.job["requested_time"])
1207        elif name == "qstat_l_requests":
1208            self.lrequest = False
1209        elif self.job and self.in_joblist:
1210            if name in name_trans:
1211                name = name_trans[name]
1212                self.job[name] = value
1213
1214# Abstracted from PBS original.
1215# Fixme:  Is it worth (or appropriate for PBS) sorting the result?
1216#
1217def do_nodelist( nodes ):
1218
1219    """Translate node list as appropriate."""
1220
1221    nodeslist        = [ ]
1222    my_domain        = fqdn_parts( socket.getfqdn() )[1]
1223
1224    for node in nodes:
1225
1226        host        = node.split( '/' )[0] # not relevant for SGE
1227        h, host_domain    = fqdn_parts(host)
1228
1229        if host_domain == my_domain:
1230
1231            host    = h
1232
1233        if nodeslist.count( host ) == 0:
1234
1235            for translate_pattern in BATCH_HOST_TRANSLATE:
1236
1237                if translate_pattern.find( '/' ) != -1:
1238
1239                    translate_orig    = \
1240                        translate_pattern.split( '/' )[1]
1241                    translate_new    = \
1242                        translate_pattern.split( '/' )[2]
1243                    host = re.sub( translate_orig,
1244                               translate_new, host )
1245            if not host in nodeslist:
1246                nodeslist.append( host )
1247    return nodeslist
1248
1249class SLURMDataGatherer( DataGatherer ):
1250
1251    global pyslurm
1252
1253    """This is the DataGatherer for SLURM"""
1254
1255    def __init__( self ):
1256
1257        """Setup appropriate variables"""
1258
1259        self.jobs       = { }
1260        self.timeoffset = 0
1261        self.dp         = DataProcessor()
1262
1263    def getNodeData( self ):
1264
1265        slurm_type  = pyslurm.node()
1266
1267        slurm_nodes = slurm_type.get()
1268
1269        nodedict    = { }
1270
1271        for node, attrs in slurm_nodes.items():
1272
1273            ( num_state, name_state ) = attrs['node_state'] 
1274
1275            if name_state == 'DOWN':
1276
1277                nodedict[ node ] = { 'state' : 'down' }
1278
1279            elif name_state == 'DRAIN':
1280
1281                nodedict[ node ] = { 'state' : 'offline' }
1282
1283        return nodedict
1284
1285    def getJobData( self ):
1286
1287        """Gather all data on current jobs"""
1288
1289        joblist            = {}
1290
1291        self.cur_time  = time.time()
1292
1293        slurm_type = pyslurm.job()
1294        joblist    = slurm_type.get()
1295
1296        jobs_processed    = [ ]
1297
1298        for name, attrs in joblist.items():
1299            display_queue = 1
1300            job_id        = name
1301
1302            name          = self.getAttr( attrs, 'name' )
1303            queue         = self.getAttr( attrs, 'partition' )
1304
1305            if QUEUE:
1306                for q in QUEUE:
1307                    if q == queue:
1308                        display_queue = 1
1309                        break
1310                    else:
1311                        display_queue = 0
1312                        continue
1313            if display_queue == 0:
1314                continue
1315
1316            owner_uid        = attrs[ 'user_id' ]
1317            ( owner, owner_pw, owner_uid, owner_gid, owner_gecos, owner_dir, owner_shell ) = pwd.getpwuid( owner_uid )
1318
1319            requested_time   = self.getAttr( attrs, 'time_limit' )
1320            min_memory       = self.getAttr( attrs, 'pn_min_memory' )
1321
1322            if min_memory == 0:
1323
1324                requested_memory = ''
1325
1326            else:
1327                requested_memory = min_memory
1328
1329            min_cpus = self.getAttr( attrs, 'pn_min_cpus' )
1330
1331            if min_cpus == 0:
1332
1333                ppn = ''
1334
1335            else:
1336                ppn = min_cpus
1337
1338            ( something, status_long ) = self.getAttr( attrs, 'job_state' )
1339
1340            status = 'Q'
1341
1342            if status_long == 'RUNNING':
1343
1344                status = 'R'
1345
1346            elif status_long == 'COMPLETED':
1347
1348                continue
1349
1350            jobs_processed.append( job_id )
1351
1352            queued_timestamp = self.getAttr( attrs, 'submit_time' )
1353
1354            start_timestamp = ''
1355            nodeslist       = ''
1356
1357            if status == 'R':
1358
1359                start_timestamp = self.getAttr( attrs, 'start_time' )
1360                nodes           = attrs[ 'nodes' ]
1361
1362                if not nodes:
1363
1364                    # This should not happen
1365
1366                    # Something wrong: running but 'nodes' returned empty by pyslurm
1367                    # Possible pyslurm bug: abort/quit/warning
1368
1369                    err_msg = 'FATAL ERROR: job %s running but nodes returned empty: pyslurm bugged?' %job_id
1370
1371                    print err_msg
1372                    debug_msg( 0, err_msg )
1373                    sys.exit(1)
1374
1375                my_nodelist = [ ]
1376
1377                slurm_hostlist  = pyslurm.hostlist()
1378                slurm_hostlist.create( nodes )
1379                slurm_hostlist.uniq()
1380
1381                while slurm_hostlist.count() > 0:
1382
1383                    my_nodelist.append( slurm_hostlist.pop() )
1384
1385                slurm_hostlist.destroy()
1386
1387                del slurm_hostlist
1388
1389                nodeslist       = do_nodelist( my_nodelist )
1390
1391                if DETECT_TIME_DIFFS:
1392
1393                    # If a job start if later than our current date,
1394                    # that must mean the Torque server's time is later
1395                    # than our local time.
1396               
1397                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1398
1399                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1400
1401            elif status == 'Q':
1402
1403                nodeslist       = str( attrs[ 'num_nodes' ] )
1404
1405            else:
1406                start_timestamp = ''
1407                nodeslist       = ''
1408
1409            myAttrs                = { }
1410
1411            myAttrs[ 'name' ]             = str( name )
1412            myAttrs[ 'queue' ]            = str( queue )
1413            myAttrs[ 'owner' ]            = str( owner )
1414            myAttrs[ 'requested_time' ]   = str( requested_time )
1415            myAttrs[ 'requested_memory' ] = str( requested_memory )
1416            myAttrs[ 'ppn' ]              = str( ppn )
1417            myAttrs[ 'status' ]           = str( status )
1418            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1419            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1420            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1421            myAttrs[ 'nodes' ]            = nodeslist
1422            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1423            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1424
1425            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1426
1427                self.jobs[ job_id ] = myAttrs
1428
1429        for id, attrs in self.jobs.items():
1430
1431            if id not in jobs_processed:
1432
1433                # This one isn't there anymore; toedeledoki!
1434                #
1435                del self.jobs[ id ]
1436
1437class SgeDataGatherer(DataGatherer):
1438
1439    jobs = {}
1440
1441    def __init__( self ):
1442        self.jobs = {}
1443        self.timeoffset = 0
1444        self.dp = DataProcessor()
1445
1446    def getJobData( self ):
1447        """Gather all data on current jobs in SGE"""
1448
1449        import popen2
1450
1451        self.cur_time = 0
1452        queues = ""
1453        if QUEUE:    # only for specific queues
1454            # Fixme:  assumes queue names don't contain single
1455            # quote or comma.  Don't know what the SGE rules are.
1456            queues = " -q '" + string.join (QUEUE, ",") + "'"
1457        # Note the comment in SgeQstatXMLParser about scaling with
1458        # this method of getting data.  I haven't found better one.
1459        # Output with args `-xml -ext -f -r' is easier to parse
1460        # in some ways, harder in others, but it doesn't provide
1461        # the submission time (at least SGE 6.0).  The pipeline
1462        # into sed corrects bogus XML observed with a configuration
1463        # of SGE 6.0u8, which otherwise causes the parsing to hang.
1464        piping = popen2.Popen3("qstat -u '*' -j '*' -xml | \
1465sed -e 's/reported usage>/reported_usage>/g' -e 's;<\/*JATASK:.*>;;'" \
1466                           + queues, True)
1467        qstatparser = SgeQstatXMLParser()
1468        parse_err = 0
1469        try:
1470            xml.sax.parse(piping.fromchild, qstatparser)
1471        except NoJobs:
1472            pass
1473        except:
1474            parse_err = 1
1475        if piping.wait():
1476            debug_msg(10, "qstat error, skipping until next polling interval: " + piping.childerr.readline())
1477            return None
1478        elif parse_err:
1479            debug_msg(10, "Bad XML output from qstat"())
1480            exit (1)
1481        for f in piping.fromchild, piping.tochild, piping.childerr:
1482            f.close()
1483        self.cur_time = time.time()
1484        jobs_processed = []
1485        for job in qstatparser.joblist:
1486            job_id = job["number"]
1487            if job["status"] in [ 'Q', 'R' ]:
1488                jobs_processed.append(job_id)
1489            if job["status"] == "R":
1490                job["nodes"] = do_nodelist (job["nodes"])
1491                # Fixme: why is job["nodes"] sometimes null?
1492                try:
1493                    # Fixme: Is this sensible?  The
1494                    # PBS-type PPN isn't something you use
1495                    # with SGE.
1496                    job["ppn"] = float(job["slots"]) / len(job["nodes"])
1497                except:
1498                    job["ppn"] = 0
1499                if DETECT_TIME_DIFFS:
1500                    # If a job start is later than our
1501                    # current date, that must mean
1502                    # the SGE server's time is later
1503                    # than our local time.
1504                    start_timestamp = int (job["start_timestamp"])
1505                    if start_timestamp > int(self.cur_time) + int(self.timeoffset):
1506
1507                        self.timeoffset    = start_timestamp - int(self.cur_time)
1508            else:
1509                # fixme: Note sure what this should be:
1510                job["ppn"] = job["RN_max"]
1511                job["nodes"] = "1"
1512
1513            myAttrs = {}
1514            for attr in ["name", "queue", "owner",
1515                     "requested_time", "status",
1516                     "requested_memory", "ppn",
1517                     "start_timestamp", "queued_timestamp"]:
1518                myAttrs[attr] = str(job[attr])
1519            myAttrs["nodes"] = job["nodes"]
1520            myAttrs["reported"] = str(int(self.cur_time) + int(self.timeoffset))
1521            myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1]
1522            myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL)
1523
1524            if self.jobDataChanged(self.jobs, job_id, myAttrs) and myAttrs["status"] in ["R", "Q"]:
1525                self.jobs[job_id] = myAttrs
1526        for id, attrs in self.jobs.items():
1527            if id not in jobs_processed:
1528                del self.jobs[id]
1529
1530# LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt>
1531# Requres LSFObject http://sourceforge.net/projects/lsfobject
1532#
1533class LsfDataGatherer(DataGatherer):
1534
1535    """This is the DataGatherer for LSf"""
1536
1537    global lsfObject
1538
1539    def __init__( self ):
1540
1541        self.jobs = { }
1542        self.timeoffset = 0
1543        self.dp = DataProcessor()
1544        self.initLsfQuery()
1545
1546    def _countDuplicatesInList( self, dupedList ):
1547
1548        countDupes    = { }
1549
1550        for item in dupedList:
1551
1552            if not countDupes.has_key( item ):
1553
1554                countDupes[ item ]    = 1
1555            else:
1556                countDupes[ item ]    = countDupes[ item ] + 1
1557
1558        dupeCountList    = [ ]
1559
1560        for item, count in countDupes.items():
1561
1562            dupeCountList.append( ( item, count ) )
1563
1564        return dupeCountList
1565#
1566#lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7']
1567#print _countDuplicatesInList(lst)
1568#[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)]
1569########################
1570
1571    def initLsfQuery( self ):
1572        self.pq = None
1573        self.pq = lsfObject.jobInfoEntObject()
1574
1575    def getJobData( self, known_jobs="" ):
1576        """Gather all data on current jobs in LSF"""
1577        if len( known_jobs ) > 0:
1578            jobs = known_jobs
1579        else:
1580            jobs = { }
1581        joblist = {}
1582        joblist = self.pq.getJobInfo()
1583        nodelist = ''
1584
1585        self.cur_time = time.time()
1586
1587        jobs_processed = [ ]
1588
1589        for name, attrs in joblist.items():
1590            job_id = str(name)
1591            jobs_processed.append( job_id )
1592            name = self.getAttr( attrs, 'jobName' )
1593            queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' )
1594            owner = self.getAttr( attrs, 'user' )
1595
1596### THIS IS THE rLimit List index values
1597#define LSF_RLIMIT_CPU      0        /* cpu time in milliseconds */
1598#define LSF_RLIMIT_FSIZE    1        /* maximum file size */
1599#define LSF_RLIMIT_DATA     2        /* data size */
1600#define LSF_RLIMIT_STACK    3        /* stack size */
1601#define LSF_RLIMIT_CORE     4        /* core file size */
1602#define LSF_RLIMIT_RSS      5        /* resident set size */
1603#define LSF_RLIMIT_NOFILE   6        /* open files */
1604#define LSF_RLIMIT_OPEN_MAX 7        /* (from HP-UX) */
1605#define LSF_RLIMIT_VMEM     8        /* maximum swap mem */
1606#define LSF_RLIMIT_SWAP     8
1607#define LSF_RLIMIT_RUN      9        /* max wall-clock time limit */
1608#define LSF_RLIMIT_PROCESS  10       /* process number limit */
1609#define LSF_RLIMIT_THREAD   11       /* thread number limit (introduced in LSF6.0) */
1610#define LSF_RLIM_NLIMITS    12       /* number of resource limits */
1611
1612            requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9]
1613            if requested_time == -1: 
1614                requested_time = ""
1615            requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8]
1616            if requested_memory == -1: 
1617                requested_memory = ""
1618# This tries to get proc per node. We don't support this right now
1619            ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' )
1620            requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' )
1621            if requested_cpus == None or requested_cpus == "":
1622                requested_cpus = 1
1623
1624            if QUEUE:
1625                for q in QUEUE:
1626                    if q == queue:
1627                        display_queue = 1
1628                        break
1629                    else:
1630                        display_queue = 0
1631                        continue
1632            if display_queue == 0:
1633                continue
1634
1635            runState = self.getAttr( attrs, 'status' )
1636            if runState == 4:
1637                status = 'R'
1638            else:
1639                status = 'Q'
1640            queued_timestamp = self.getAttr( attrs, 'submitTime' )
1641
1642            if status == 'R':
1643                start_timestamp = self.getAttr( attrs, 'startTime' )
1644                nodesCpu =  dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' )))
1645                nodelist = nodesCpu.keys()
1646
1647                if DETECT_TIME_DIFFS:
1648
1649                    # If a job start if later than our current date,
1650                    # that must mean the Torque server's time is later
1651                    # than our local time.
1652
1653                    if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ):
1654
1655                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1656
1657            elif status == 'Q':
1658                start_timestamp = ''
1659                count_mynodes = 0
1660                numeric_node = 1
1661                nodelist = ''
1662
1663            myAttrs = { }
1664            if name == "":
1665                myAttrs['name'] = "none"
1666            else:
1667                myAttrs['name'] = name
1668
1669            myAttrs[ 'owner' ]        = owner
1670            myAttrs[ 'requested_time' ]    = str(requested_time)
1671            myAttrs[ 'requested_memory' ]    = str(requested_memory)
1672            myAttrs[ 'requested_cpus' ]    = str(requested_cpus)
1673            myAttrs[ 'ppn' ]        = str( ppn )
1674            myAttrs[ 'status' ]        = status
1675            myAttrs[ 'start_timestamp' ]    = str(start_timestamp)
1676            myAttrs[ 'queue' ]        = str(queue)
1677            myAttrs[ 'queued_timestamp' ]    = str(queued_timestamp)
1678            myAttrs[ 'reported' ]        = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1679            myAttrs[ 'nodes' ]        = do_nodelist( nodelist )
1680            myAttrs[ 'domain' ]        = fqdn_parts( socket.getfqdn() )[1]
1681            myAttrs[ 'poll_interval' ]    = str(BATCH_POLL_INTERVAL)
1682
1683            if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1684                jobs[ job_id ] = myAttrs
1685
1686                debug_msg( 10, printTime() + ' job %s state changed' %(job_id) )
1687
1688        for id, attrs in jobs.items():
1689            if id not in jobs_processed:
1690                # This one isn't there anymore
1691                #
1692                del jobs[ id ]
1693        self.jobs=jobs
1694
1695
1696class PbsDataGatherer( DataGatherer ):
1697
1698    """This is the DataGatherer for PBS and Torque"""
1699
1700    global PBSQuery, PBSError
1701
1702    def __init__( self ):
1703
1704        """Setup appropriate variables"""
1705
1706        self.jobs       = { }
1707        self.timeoffset = 0
1708        self.dp         = DataProcessor()
1709
1710        self.initPbsQuery()
1711
1712    def initPbsQuery( self ):
1713
1714        self.pq = None
1715
1716        try:
1717
1718            if( BATCH_SERVER ):
1719
1720                self.pq = PBSQuery( BATCH_SERVER )
1721            else:
1722                self.pq = PBSQuery()
1723
1724        except PBSError, details:
1725            print 'Cannot connect to pbs server'
1726            print details
1727            sys.exit( 1 )
1728
1729        try:
1730            # TODO: actually use new data structure
1731            self.pq.old_data_structure()
1732
1733        except AttributeError:
1734
1735            # pbs_query is older
1736            #
1737            pass
1738
1739    def getNodeData( self ):
1740
1741        nodedict = { }
1742
1743        try:
1744            nodedict = self.pq.getnodes()
1745
1746        except PBSError, detail:
1747
1748            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1749
1750        return nodedict
1751
1752    def getJobData( self ):
1753
1754        """Gather all data on current jobs in Torque"""
1755
1756        joblist            = {}
1757        self.cur_time      = 0
1758
1759        try:
1760            joblist        = self.pq.getjobs()
1761            self.cur_time  = time.time()
1762
1763        except PBSError, detail:
1764
1765            debug_msg( 10, "PBS server unavailable, skipping until next polling interval: " + str( detail ) )
1766            return None
1767
1768        jobs_processed    = [ ]
1769
1770        for name, attrs in joblist.items():
1771            display_queue = 1
1772            job_id        = name.split( '.' )[0]
1773
1774            name          = self.getAttr( attrs, 'Job_Name' )
1775            queue         = self.getAttr( attrs, 'queue' )
1776
1777            if QUEUE:
1778                for q in QUEUE:
1779                    if q == queue:
1780                        display_queue = 1
1781                        break
1782                    else:
1783                        display_queue = 0
1784                        continue
1785            if display_queue == 0:
1786                continue
1787
1788
1789            owner            = self.getAttr( attrs, 'Job_Owner' ).split( '@' )[0]
1790            requested_time   = self.getAttr( attrs, 'Resource_List.walltime' )
1791            requested_memory = self.getAttr( attrs, 'Resource_List.mem' )
1792
1793            mynoderequest    = self.getAttr( attrs, 'Resource_List.nodes' )
1794
1795            ppn = ''
1796
1797            if mynoderequest.find( ':' ) != -1 and mynoderequest.find( 'ppn' ) != -1:
1798
1799                mynoderequest_fields = mynoderequest.split( ':' )
1800
1801                for mynoderequest_field in mynoderequest_fields:
1802
1803                    if mynoderequest_field.find( 'ppn' ) != -1:
1804
1805                        ppn = mynoderequest_field.split( 'ppn=' )[1]
1806
1807            status = self.getAttr( attrs, 'job_state' )
1808
1809            if status in [ 'Q', 'R' ]:
1810
1811                jobs_processed.append( job_id )
1812
1813            queued_timestamp = self.getAttr( attrs, 'ctime' )
1814
1815            if status == 'R':
1816
1817                start_timestamp = self.getAttr( attrs, 'mtime' )
1818                nodes           = self.getAttr( attrs, 'exec_host' ).split( '+' )
1819
1820                nodeslist       = do_nodelist( nodes )
1821
1822                if DETECT_TIME_DIFFS:
1823
1824                    # If a job start if later than our current date,
1825                    # that must mean the Torque server's time is later
1826                    # than our local time.
1827               
1828                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
1829
1830                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
1831
1832            elif status == 'Q':
1833
1834                # 'mynodequest' can be a string in the following syntax according to the
1835                # Torque Administator's manual:
1836                #
1837                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1838                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
1839                # etc
1840                #
1841
1842                #
1843                # For now we only count the amount of nodes request and ignore properties
1844                #
1845
1846                start_timestamp = ''
1847                count_mynodes   = 0
1848
1849                for node in mynoderequest.split( '+' ):
1850
1851                    # Just grab the {node_count|hostname} part and ignore properties
1852                    #
1853                    nodepart     = node.split( ':' )[0]
1854
1855                    # Let's assume a node_count value
1856                    #
1857                    numeric_node = 1
1858
1859                    # Chop the value up into characters
1860                    #
1861                    for letter in nodepart:
1862
1863                        # If this char is not a digit (0-9), this must be a hostname
1864                        #
1865                        if letter not in string.digits:
1866
1867                            numeric_node = 0
1868
1869                    # If this is a hostname, just count this as one (1) node
1870                    #
1871                    if not numeric_node:
1872
1873                        count_mynodes = count_mynodes + 1
1874                    else:
1875
1876                        # If this a number, it must be the node_count
1877                        # and increase our count with it's value
1878                        #
1879                        try:
1880                            count_mynodes = count_mynodes + int( nodepart )
1881
1882                        except ValueError, detail:
1883
1884                            # When we arrive here I must be bugged or very confused
1885                            # THIS SHOULD NOT HAPPEN!
1886                            #
1887                            debug_msg( 10, str( detail ) )
1888                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
1889                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
1890                            debug_msg( 10, 'job = ' + str( name ) )
1891                            debug_msg( 10, 'attrs = ' + str( attrs ) )
1892                       
1893                nodeslist       = str( count_mynodes )
1894            else:
1895                start_timestamp = ''
1896                nodeslist       = ''
1897
1898            myAttrs                = { }
1899
1900            myAttrs[ 'name' ]             = str( name )
1901            myAttrs[ 'queue' ]            = str( queue )
1902            myAttrs[ 'owner' ]            = str( owner )
1903            myAttrs[ 'requested_time' ]   = str( requested_time )
1904            myAttrs[ 'requested_memory' ] = str( requested_memory )
1905            myAttrs[ 'ppn' ]              = str( ppn )
1906            myAttrs[ 'status' ]           = str( status )
1907            myAttrs[ 'start_timestamp' ]  = str( start_timestamp )
1908            myAttrs[ 'queued_timestamp' ] = str( queued_timestamp )
1909            myAttrs[ 'reported' ]         = str( int( int( self.cur_time ) + int( self.timeoffset ) ) )
1910            myAttrs[ 'nodes' ]            = nodeslist
1911            myAttrs[ 'domain' ]           = fqdn_parts( socket.getfqdn() )[1]
1912            myAttrs[ 'poll_interval' ]    = str( BATCH_POLL_INTERVAL )
1913
1914            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]:
1915
1916                self.jobs[ job_id ] = myAttrs
1917
1918        for id, attrs in self.jobs.items():
1919
1920            if id not in jobs_processed:
1921
1922                # This one isn't there anymore; toedeledoki!
1923                #
1924                del self.jobs[ id ]
1925
1926GMETRIC_DEFAULT_TYPE    = 'string'
1927GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1928GMETRIC_DEFAULT_PORT    = '8649'
1929GMETRIC_DEFAULT_UNITS   = ''
1930
1931class Gmetric:
1932
1933    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1934
1935    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1936    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1937    protocol        = ( 'udp', 'multicast' )
1938
1939    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1940               
1941        global GMETRIC_DEFAULT_TYPE
1942
1943        self.prot       = self.checkHostProtocol( host )
1944        self.data_msg   = xdrlib.Packer()
1945        self.meta_msg   = xdrlib.Packer()
1946        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1947
1948        if self.prot not in self.protocol:
1949
1950            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1951
1952        if self.prot == 'multicast':
1953
1954            # Set multicast options
1955            #
1956            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1957
1958        self.hostport   = ( host, int( port ) )
1959        self.slopestr   = 'both'
1960        self.tmax       = 60
1961
1962    def checkHostProtocol( self, ip ):
1963
1964        """Detect if a ip adress is a multicast address"""
1965
1966        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1967        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1968
1969        ip_fields               = ip.split( '.' )
1970
1971        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1972
1973            return 'multicast'
1974        else:
1975            return 'udp'
1976
1977    def send( self, name, value, dmax, typestr = '', units = '' ):
1978
1979        if len( units ) == 0:
1980            units       = GMETRIC_DEFAULT_UNITS
1981
1982        if len( typestr ) == 0:
1983            typestr     = GMETRIC_DEFAULT_TYPE
1984
1985        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
1986
1987        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1988        data_rt = self.socket.sendto( data_msg, self.hostport )
1989
1990        return ( meta_rt, data_rt )
1991
1992    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1993
1994        hostname = "unset"
1995
1996        if slopestr not in self.slope:
1997
1998            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
1999
2000        if typestr not in self.type:
2001
2002            raise ValueError( "Type must be one of: " + str( self.type ) )
2003
2004        if len( name ) == 0:
2005
2006            raise ValueError( "Name must be non-empty" )
2007
2008        self.meta_msg.reset()
2009        self.meta_msg.pack_int( 128 )
2010
2011        if not spoof:
2012            self.meta_msg.pack_string( hostname )
2013        else:
2014            self.meta_msg.pack_string( spoof )
2015
2016        self.meta_msg.pack_string( name )
2017
2018        if not spoof:
2019            self.meta_msg.pack_int( 0 )
2020        else:
2021            self.meta_msg.pack_int( 1 )
2022           
2023        self.meta_msg.pack_string( typestr )
2024        self.meta_msg.pack_string( name )
2025        self.meta_msg.pack_string( unitstr )
2026        self.meta_msg.pack_int( self.slope[ slopestr ] )
2027        self.meta_msg.pack_uint( int( tmax ) )
2028        self.meta_msg.pack_uint( int( dmax ) )
2029
2030        if not group:
2031            self.meta_msg.pack_int( 0 )
2032        else:
2033            self.meta_msg.pack_int( 1 )
2034            self.meta_msg.pack_string( "GROUP" )
2035            self.meta_msg.pack_string( group )
2036
2037        self.data_msg.reset()
2038        self.data_msg.pack_int( 128+5 )
2039
2040        if not spoof:
2041            self.data_msg.pack_string( hostname )
2042        else:
2043            self.data_msg.pack_string( spoof )
2044
2045        self.data_msg.pack_string( name )
2046
2047        if not spoof:
2048            self.data_msg.pack_int( 0 )
2049        else:
2050            self.data_msg.pack_int( 1 )
2051
2052        self.data_msg.pack_string( "%s" )
2053        self.data_msg.pack_string( str( value ) )
2054
2055        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
2056
2057def printTime( ):
2058
2059    """Print current time/date in human readable format for log/debug"""
2060
2061    return time.strftime("%a, %d %b %Y %H:%M:%S")
2062
2063def debug_msg( level, msg ):
2064
2065    """Print msg if at or above current debug level"""
2066
2067    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
2068
2069    if (not DAEMONIZE and DEBUG_LEVEL >= level):
2070        sys.stderr.write( msg + '\n' )
2071
2072    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
2073        syslog.syslog( msg )
2074
2075def write_pidfile():
2076
2077    # Write pidfile if PIDFILE is set
2078    #
2079    if PIDFILE:
2080
2081        pid    = os.getpid()
2082
2083        pidfile    = open( PIDFILE, 'w' )
2084
2085        pidfile.write( str( pid ) )
2086        pidfile.close()
2087
2088def main():
2089
2090    """Application start"""
2091
2092    global PBSQuery, PBSError, lsfObject, pyslurm
2093    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE, BATCH_SERVER
2094
2095    if not processArgs( sys.argv[1:] ):
2096
2097        sys.exit( 1 )
2098
2099    # Load appropriate DataGatherer depending on which BATCH_API is set
2100    # and any required modules for the Gatherer
2101    #
2102    if BATCH_API == 'pbs':
2103
2104        try:
2105            from PBSQuery import PBSQuery, PBSError
2106
2107        except ImportError, details:
2108
2109            print "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not found or installed"
2110            print details
2111            sys.exit( 1 )
2112
2113        gather = PbsDataGatherer()
2114
2115    elif BATCH_API == 'sge':
2116
2117        if BATCH_SERVER != 'localhost':
2118
2119            # Print and log, but continue execution
2120            err_msg = "WARNING: BATCH_API 'sge' ignores BATCH_SERVER (can only be 'localhost')"
2121            print err_msg
2122            debug_msg( 0, err_msg )
2123
2124        # Tested with SGE 6.0u11.
2125        #
2126        gather = SgeDataGatherer()
2127
2128    elif BATCH_API == 'lsf':
2129
2130        if BATCH_SERVER != 'localhost':
2131
2132            # Print and log, but continue execution
2133            err_msg = "WARNING: BATCH_API 'lsf' ignores BATCH_SERVER (can only be 'localhost')"
2134            print err_msg
2135            debug_msg( 0, err_msg )
2136
2137        try:
2138            from lsfObject import lsfObject
2139        except:
2140            print "FATAL ERROR: BATCH_API set to 'lsf' but python module 'lsfObject' is not found or installed"
2141            sys.exit( 1 )
2142
2143        gather = LsfDataGatherer()
2144
2145    elif BATCH_API == 'slurm':
2146
2147        if BATCH_SERVER != 'localhost':
2148
2149            # Print and log, but continue execution
2150            err_msg = "WARNING: BATCH_API 'slurm' ignores BATCH_SERVER (can only be 'localhost')"
2151            print err_msg
2152            debug_msg( 0, err_msg )
2153
2154        try:
2155            import pyslurm
2156        except:
2157            print "FATAL ERROR: BATCH_API set to 'slurm' but python module is not found or installed"
2158            sys.exit( 1 )
2159
2160        gather = SLURMDataGatherer()
2161
2162    else:
2163        print "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported"
2164
2165        sys.exit( 1 )
2166
2167    if( DAEMONIZE and USE_SYSLOG ):
2168
2169        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
2170
2171    if DAEMONIZE:
2172
2173        gather.daemon()
2174    else:
2175        gather.run()
2176
2177# wh00t? someone started me! :)
2178#
2179if __name__ == '__main__':
2180    main()
Note: See TracBrowser for help on using the repository browser.