source: trunk/jobmond/jobmond.py @ 666

Last change on this file since 666 was 666, checked in by ramonb, 9 years ago
  • first attempt at 'include file aware' gmond.conf parsing in search of GMETRIC_TARGET
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 34.2 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2012  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobmond.py 666 2012-09-04 13:59:34Z ramonb $
22#
23
24import sys, getopt, ConfigParser, time, os, socket, string, re
25import xdrlib, socket, syslog, xml, xml.sax
26from types import *
27from glob import glob
28
29VERSION='TRUNK+SVN'
30
31def usage( ver ):
32
33    print 'jobmond %s' %VERSION
34
35    if ver:
36        return 0
37
38    print
39    print 'Purpose:'
40    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
41    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
42    print
43    print 'Usage:   jobmond [OPTIONS]'
44    print
45    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
46    print '  -p, --pidfile=FILE Use pid file to store the process id'
47    print '  -h, --help     Print help and exit'
48    print '  -v, --version          Print version and exit'
49    print
50
51def processArgs( args ):
52
53    SHORT_L     = 'p:hvc:'
54    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
55
56    global PIDFILE
57    PIDFILE     = None
58
59    config_filename = '/etc/jobmond.conf'
60
61    try:
62
63        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
64
65    except getopt.GetoptError, detail:
66
67        print detail
68        usage()
69        sys.exit( 1 )
70
71    for opt, value in opts:
72
73        if opt in [ '--config', '-c' ]:
74       
75            config_filename = value
76
77        if opt in [ '--pidfile', '-p' ]:
78
79            PIDFILE     = value
80       
81        if opt in [ '--help', '-h' ]:
82 
83            usage( False )
84            sys.exit( 0 )
85
86        if opt in [ '--version', '-v' ]:
87
88            usage( True )
89            sys.exit( 0 )
90
91    return loadConfig( config_filename )
92
93# Fixme:  This doesn't DTRT with commented-out bits of the file.  E.g.
94# it picked up a commented-out `mcast_join' and tried to use a
95# multicast channel when it shouldn't have done.
96class GangliaConfigParser:
97
98    def __init__( self, config_file ):
99
100        self.config_file     = config_file
101        self.include_parsers = [ ]
102
103        if not os.path.exists( self.config_file ):
104
105            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
106            sys.exit( 1 )
107
108    def removeHaakjes( self, value ):
109
110        clean_value = value
111        clean_value = clean_value.replace( "(", "" )
112        clean_value = clean_value.replace( ")", "" )
113        clean_value = clean_value.strip()
114
115        return clean_value
116
117    def removeQuotes( self, value ):
118
119        clean_value = value
120        clean_value = clean_value.replace( "'", "" )
121        clean_value = clean_value.replace( '"', '' )
122        clean_value = clean_value.strip()
123
124        return clean_value
125
126    def removeComments( self, value ):
127
128        clean_value = value
129
130        if clean_value.find('#') != -1:
131
132            clean_value = value[:value.find('#')]
133
134        if clean_value.find('//') != -1:
135
136            clean_value = value[:value.find('//')]
137
138        return clean_value
139
140    def getVal( self, section, valname ):
141
142        cfg_fp      = open( self.config_file )
143        cfg_lines   = cfg_fp.readlines()
144        cfg_fp.close()
145
146        section_start = False
147        section_found = False
148        value         = None
149        comment_start = False
150
151        for line in cfg_lines:
152
153            line = line.strip()
154            line = self.removeComments( line )
155
156            if line.find( '/*' ) != -1:
157
158                line = line[:line.find('/*')]
159                comment_start = True
160
161            if line.find( '*/' ) != -1:
162
163                line = line[line.find('*/'):]
164                comment_start = False
165
166            if comment_start:
167
168                continue
169
170            if line.find( 'include' ) != -1:
171
172                line = line.removeHaakjes()
173
174                include_line  = line.strip(' ')[1:]
175
176                for include_file in glob( include_line ):
177
178                    include_parser = GangliaConfigParser( include_file )
179                    include_getval = include_parser.getStr( section, valname )
180
181                    if include_getval:
182
183                        value = include_getval
184                        #break ? FIXME value can be overriden by other includes: perhaps users problem
185
186                    del include_parser
187
188            if line.find( section ) != -1:
189
190                section_found   = True
191
192            if line.find( '{' ) != -1 and section_found:
193
194                section_start   = True
195
196            if line.find( '}' ) != -1 and section_found:
197
198                section_start   = False
199                section_found   = False
200
201            if line.find( valname ) != -1 and section_start:
202
203                value       = string.join( line.split( '=' )[1:], '' ).strip()
204
205        return value
206
207    def getInt( self, section, valname ):
208
209        value   = self.getVal( section, valname )
210
211        if not value:
212            return False
213
214        value   = self.removeQuotes( value )
215
216        return int( value )
217
218    def getStr( self, section, valname ):
219
220        value   = self.getVal( section, valname )
221
222        if not value:
223            return False
224
225        value   = self.removeQuotes( value )
226
227        return str( value )
228
229def findGmetric():
230
231    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
232
233        guess   = '%s/%s' %( dir, 'gmetric' )
234
235        if os.path.exists( guess ):
236
237            return guess
238
239    return False
240
241def loadConfig( filename ):
242
243    def getlist( cfg_string ):
244
245        my_list = [ ]
246
247        for item_txt in cfg_string.split( ',' ):
248
249                sep_char = None
250
251                item_txt = item_txt.strip()
252
253                for s_char in [ "'", '"' ]:
254
255                        if item_txt.find( s_char ) != -1:
256
257                                if item_txt.count( s_char ) != 2:
258
259                                        print 'Missing quote: %s' %item_txt
260                                        sys.exit( 1 )
261
262                                else:
263
264                                        sep_char = s_char
265                                        break
266
267                if sep_char:
268
269                        item_txt = item_txt.split( sep_char )[1]
270
271                my_list.append( item_txt )
272
273        return my_list
274
275    cfg     = ConfigParser.ConfigParser()
276
277    cfg.read( filename )
278
279    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
280    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
281    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
282    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
283
284    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
285
286    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
287
288    SYSLOG_LEVEL    = -1
289    SYSLOG_FACILITY = None
290
291    try:
292        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
293
294    except ConfigParser.NoOptionError:
295
296        USE_SYSLOG  = True
297
298        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
299
300    if USE_SYSLOG:
301
302        try:
303            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
304
305        except ConfigParser.NoOptionError:
306
307            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
308            SYSLOG_LEVEL    = 0
309
310        try:
311
312            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
313
314        except ConfigParser.NoOptionError:
315
316            SYSLOG_FACILITY = syslog.LOG_DAEMON
317
318            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
319
320    try:
321
322        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
323
324    except ConfigParser.NoOptionError:
325
326        # Backwards compatibility for old configs
327        #
328
329        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
330        api_guess       = 'pbs'
331   
332    try:
333   
334        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
335
336    except ConfigParser.NoOptionError:
337
338        # Backwards compatibility for old configs
339        #
340
341        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
342        api_guess       = 'pbs'
343   
344    try:
345
346        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
347
348    except ConfigParser.NoOptionError:
349
350        # Not specified: assume /etc/ganglia/gmond.conf
351        #
352        GMOND_CONF      = '/etc/ganglia/gmond.conf'
353
354    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
355
356    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
357    #
358    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
359
360    if not gmetric_dest_ip:
361
362        # Maybe unicast target then
363        #
364        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
365
366        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
367
368    if gmetric_dest_ip and gmetric_dest_port:
369
370        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
371    else:
372
373        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
374
375        # Couldn't figure it out: let's see if it's in our jobmond.conf
376        #
377        try:
378
379            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
380
381        # Guess not: now just give up
382        #
383        except ConfigParser.NoOptionError:
384
385            GMETRIC_TARGET  = None
386
387            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
388
389    gmetric_bin = findGmetric()
390
391    if gmetric_bin:
392
393        GMETRIC_BINARY      = gmetric_bin
394    else:
395        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
396
397        try:
398
399            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
400
401        except ConfigParser.NoOptionError:
402
403            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
404            sys.exit( 1 )
405
406    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
407
408    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
409
410    try:
411
412        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
413
414    except ConfigParser.NoOptionError, detail:
415
416        if BATCH_SERVER and api_guess:
417
418            BATCH_API   = api_guess
419        else:
420            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
421            sys.exit( 1 )
422
423    try:
424
425        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
426
427    except ConfigParser.NoOptionError, detail:
428
429        QUEUE       = None
430
431    return True
432
433def fqdn_parts (fqdn):
434
435    """Return pair of host and domain for fully-qualified domain name arg."""
436
437    parts = fqdn.split (".")
438
439    return (parts[0], string.join(parts[1:], "."))
440
441METRIC_MAX_VAL_LEN = 900
442
443class DataProcessor:
444
445    """Class for processing of data"""
446
447    binary = None
448
449    def __init__( self, binary=None ):
450
451        """Remember alternate binary location if supplied"""
452
453        global GMETRIC_BINARY
454
455        if binary:
456            self.binary = binary
457
458        if not self.binary:
459            self.binary = GMETRIC_BINARY
460
461        # Timeout for XML
462        #
463        # From ganglia's documentation:
464        #
465        # 'A metric will be deleted DMAX seconds after it is received, and
466            # DMAX=0 means eternal life.'
467
468        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
469
470        if GMOND_CONF:
471
472            incompatible = self.checkGmetricVersion()
473
474            if incompatible:
475
476                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
477                sys.exit( 1 )
478
479    def checkGmetricVersion( self ):
480
481        """
482        Check version of gmetric is at least 3.0.1
483        for the syntax we use
484        """
485
486        global METRIC_MAX_VAL_LEN
487
488        incompatible    = 0
489
490        gfp     = os.popen( self.binary + ' --version' )
491        lines       = gfp.readlines()
492
493        gfp.close()
494
495        for line in lines:
496
497            line = line.split( ' ' )
498
499            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
500           
501                gmetric_version = line[1].split( '\n' )[0]
502
503                version_major   = int( gmetric_version.split( '.' )[0] )
504                version_minor   = int( gmetric_version.split( '.' )[1] )
505                version_patch   = int( gmetric_version.split( '.' )[2] )
506
507                incompatible    = 0
508
509                if version_major < 3:
510
511                    incompatible = 1
512               
513                elif version_major == 3:
514
515                    if version_minor == 0:
516
517                        if version_patch < 1:
518                       
519                            incompatible = 1
520
521                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
522                        #
523                        if version_patch < 3:
524
525                            METRIC_MAX_VAL_LEN = 900
526
527                        elif version_patch >= 3:
528
529                            METRIC_MAX_VAL_LEN = 1400
530
531        return incompatible
532
533    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
534
535        """Call gmetric binary and multicast"""
536
537        cmd = self.binary
538
539        if GMETRIC_TARGET:
540
541            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
542            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
543
544            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
545
546            debug_msg( 10, printTime() + ' ' + metric_debug)
547
548            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
549
550            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
551
552        else:
553            try:
554                cmd = cmd + ' -c' + GMOND_CONF
555
556            except NameError:
557
558                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
559
560            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
561
562            if len( units ) > 0:
563
564                cmd = cmd + ' -u"' + units + '"'
565
566            debug_msg( 10, printTime() + ' ' + cmd )
567
568            os.system( cmd )
569
570class DataGatherer:
571
572    """Skeleton class for batch system DataGatherer"""
573
574    def printJobs( self, jobs ):
575
576        """Print a jobinfo overview"""
577
578        for name, attrs in self.jobs.items():
579
580            print 'job %s' %(name)
581
582            for name, val in attrs.items():
583
584                print '\t%s = %s' %( name, val )
585
586    def printJob( self, jobs, job_id ):
587
588        """Print job with job_id from jobs"""
589
590        print 'job %s' %(job_id)
591
592        for name, val in jobs[ job_id ].items():
593
594            print '\t%s = %s' %( name, val )
595
596    def getAttr( self, d, name ):
597
598        """Return certain attribute from dictionary, if exists"""
599
600        if d.has_key( name ):
601
602            if type( d[ name ] ) == ListType:
603
604                return string.join( d[ name ], ' ' )
605
606            return d[ name ]
607       
608        return ''
609
610    def jobDataChanged( self, jobs, job_id, attrs ):
611
612        """Check if job with attrs and job_id in jobs has changed"""
613
614        if jobs.has_key( job_id ):
615
616            oldData = jobs[ job_id ]   
617        else:
618            return 1
619
620        for name, val in attrs.items():
621
622            if oldData.has_key( name ):
623
624                if oldData[ name ] != attrs[ name ]:
625
626                    return 1
627
628            else:
629                return 1
630
631        return 0
632
633    def submitJobData( self ):
634
635        """Submit job info list"""
636
637        global BATCH_API
638
639        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
640
641        running_jobs    = 0
642        queued_jobs = 0
643
644        # Count how many running/queued jobs we found
645                #
646        for jobid, jobattrs in self.jobs.items():
647
648            if jobattrs[ 'status' ] == 'Q':
649
650                queued_jobs += 1
651
652            elif jobattrs[ 'status' ] == 'R':
653
654                running_jobs += 1
655
656        # Report running/queued jobs as seperate metric for a nice RRD graph
657                #
658        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
659        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
660
661        # Report down/offline nodes in batch (PBS only ATM)
662        #
663        if BATCH_API == 'pbs':
664
665            domain      = fqdn_parts( socket.getfqdn() )[1]
666
667            downed_nodes    = list()
668            offline_nodes   = list()
669       
670            l       = ['state']
671       
672            for name, node in self.pq.getnodes().items():
673
674                if 'down' in node[ 'state' ]:
675
676                    downed_nodes.append( name )
677
678                if 'offline' in node[ 'state' ]:
679
680                    offline_nodes.append( name )
681
682            downnodeslist       = do_nodelist( downed_nodes )
683            offlinenodeslist    = do_nodelist( offline_nodes )
684
685            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
686            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
687            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
688            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
689
690        # Now let's spread the knowledge
691        #
692        for jobid, jobattrs in self.jobs.items():
693
694            # Make gmetric values for each job: respect max gmetric value length
695            #
696            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
697
698            for g_name, g_val in gmetrics.items():
699
700                self.dp.multicastGmetric( g_name, g_val )
701
702    def compileGmetricVal( self, jobid, jobattrs ):
703
704        """Create gmetric name/value pairs of jobinfo"""
705
706        gmetrics = { }
707
708        for val_name, val_value in jobattrs.items():
709
710            gmetric_sequence = 0
711
712            if len( val_value ) > METRIC_MAX_VAL_LEN:
713
714                while len( val_value ) > METRIC_MAX_VAL_LEN:
715
716                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
717                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
718
719                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
720
721                    gmetrics[ gmetric_name ] = gmetric_value
722
723                    gmetric_sequence = gmetric_sequence + 1
724            else:
725                gmetric_value   = val_value
726
727                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
728
729                gmetrics[ gmetric_name ] = gmetric_value
730
731        return gmetrics
732
733    def daemon( self ):
734
735        """Run as daemon forever"""
736
737        # Fork the first child
738        #
739        pid = os.fork()
740        if pid > 0:
741                sys.exit(0)  # end parent
742
743        # creates a session and sets the process group ID
744        #
745        os.setsid()
746
747        # Fork the second child
748        #
749        pid = os.fork()
750        if pid > 0:
751                sys.exit(0)  # end parent
752
753        write_pidfile()
754
755        # Go to the root directory and set the umask
756        #
757        os.chdir('/')
758        os.umask(0)
759
760        sys.stdin.close()
761        sys.stdout.close()
762        sys.stderr.close()
763
764        os.open('/dev/null', os.O_RDWR)
765        os.dup2(0, 1)
766        os.dup2(0, 2)
767
768        self.run()
769
770    def run( self ):
771
772        """Main thread"""
773
774        while ( 1 ):
775       
776            self.getJobData()
777            self.submitJobData()
778            time.sleep( BATCH_POLL_INTERVAL )   
779
780# Abstracted from PBS original.
781#
782def do_nodelist( nodes ):
783
784    """Translate node list as appropriate."""
785
786    nodeslist       = [ ]
787    my_domain       = fqdn_parts( socket.getfqdn() )[1]
788
789    for node in nodes:
790
791        host        = node.split( '/' )[0] # not relevant for SGE
792        h, host_domain  = fqdn_parts(host)
793
794        if host_domain == my_domain:
795
796            host    = h
797
798        if nodeslist.count( host ) == 0:
799
800            for translate_pattern in BATCH_HOST_TRANSLATE:
801
802                if translate_pattern.find( '/' ) != -1:
803
804                    translate_orig  = \
805                        translate_pattern.split( '/' )[1]
806                    translate_new   = \
807                        translate_pattern.split( '/' )[2]
808                    host = re.sub( translate_orig,
809                               translate_new, host )
810            if not host in nodeslist:
811                nodeslist.append( host )
812    return nodeslist
813
814class PbsDataGatherer( DataGatherer ):
815
816    """This is the DataGatherer for PBS and Torque"""
817
818    global PBSQuery, PBSError
819
820    def __init__( self ):
821
822        """Setup appropriate variables"""
823
824        self.jobs   = { }
825        self.timeoffset = 0
826        self.dp     = DataProcessor()
827
828        self.initPbsQuery()
829
830    def initPbsQuery( self ):
831
832        self.pq     = None
833
834        if( BATCH_SERVER ):
835
836            self.pq     = PBSQuery( BATCH_SERVER )
837        else:
838            self.pq     = PBSQuery()
839
840    def getJobData( self ):
841
842        """Gather all data on current jobs in Torque"""
843
844        joblist     = {}
845        self.cur_time   = 0
846
847        try:
848            joblist     = self.pq.getjobs()
849            self.cur_time   = time.time()
850
851        except PBSError, detail:
852
853            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
854            return None
855
856        jobs_processed  = [ ]
857
858        for name, attrs in joblist.items():
859            display_queue       = 1
860            job_id          = name.split( '.' )[0]
861
862            owner           = self.getAttr( attrs, 'Job_Owner' )
863            name            = self.getAttr( attrs, 'Job_Name' )
864            queue           = self.getAttr( attrs, 'queue' )
865            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
866
867            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
868            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
869
870
871            requested_nodes     = ''
872            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
873
874            ppn         = ''
875            attributes  = ''
876
877            if mynoderequest.find( ':' ) != -1:
878
879                mynoderequest_fields    = mynoderequest.split( ':' )
880
881                for mynoderequest_field in mynoderequest_fields:
882
883                    if mynoderequest_field.isdigit():
884
885                        continue #TODO add requested_nodes if is hostname(s)
886
887                    if mynoderequest_field.find( 'ppn' ) != -1:
888
889                        ppn = mynoderequest_field.split( 'ppn=' )[1]
890
891                    else:
892
893                        if attributes == '':
894
895                            attributes = '%s' %mynoderequest_field
896                        else:
897                            attributes = '%s:%s' %( attributes, mynoderequest_field )
898
899            status          = self.getAttr( attrs, 'job_state' )
900
901            if status in [ 'Q', 'R', 'W' ]:
902
903                jobs_processed.append( job_id )
904
905            create_timestamp    = self.getAttr( attrs, 'ctime' )
906            running_nodes       = ''
907            exec_nodestr        = '' 
908
909            if status == 'R':
910
911                #start_timestamp     = self.getAttr( attrs, 'etime' )
912                start_timestamp     = self.getAttr( attrs, 'start_time' )
913                exec_nodestr        = self.getAttr( attrs, 'exec_host' )
914
915                nodes           = exec_nodestr.split( '+' )
916                nodeslist       = do_nodelist( nodes )
917                running_nodes   = string.join( nodeslist, ' ' )
918
919                if DETECT_TIME_DIFFS:
920
921                    # If a job start if later than our current date,
922                    # that must mean the Torque server's time is later
923                    # than our local time.
924               
925                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
926
927                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
928
929            elif status == 'Q':
930
931                # 'mynodequest' can be a string in the following syntax according to the
932                # Torque Administator's manual:
933                #
934                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
935                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
936                # etc
937                #
938
939                #
940                # For now we only count the amount of nodes request and ignore properties
941                #
942
943                start_timestamp     = ''
944                count_mynodes       = 0
945
946                queued_timestamp    = self.getAttr( attrs, 'qtime' )
947
948                for node in mynoderequest.split( '+' ):
949
950                    # Just grab the {node_count|hostname} part and ignore properties
951                    #
952                    nodepart    = node.split( ':' )[0]
953
954                    # Let's assume a node_count value
955                    #
956                    numeric_node    = 1
957
958                    # Chop the value up into characters
959                    #
960                    for letter in nodepart:
961
962                        # If this char is not a digit (0-9), this must be a hostname
963                        #
964                        if letter not in string.digits:
965
966                            numeric_node    = 0
967
968                    # If this is a hostname, just count this as one (1) node
969                    #
970                    if not numeric_node:
971
972                        count_mynodes   = count_mynodes + 1
973                    else:
974
975                        # If this a number, it must be the node_count
976                        # and increase our count with it's value
977                        #
978                        try:
979                            count_mynodes   = count_mynodes + int( nodepart )
980
981                        except ValueError, detail:
982
983                            # When we arrive here I must be bugged or very confused
984                            # THIS SHOULD NOT HAPPEN!
985                            #
986                            debug_msg( 10, str( detail ) )
987                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
988                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
989                            debug_msg( 10, 'job = ' + str( name ) )
990                            debug_msg( 10, 'attrs = ' + str( attrs ) )
991                       
992                nodeslist   = str( count_mynodes )
993            else:
994                start_timestamp = ''
995                nodeslist   = ''
996
997            myAttrs             = { }
998
999            myAttrs[ 'name' ]              = str( name )
1000            myAttrs[ 'status' ]            = str( status )
1001            myAttrs[ 'queue' ]             = str( queue )
1002            myAttrs[ 'owner' ]             = str( owner )
1003            myAttrs[ 'nodect' ]            = str( nodect )
1004            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
1005            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
1006            myAttrs[ 'req.walltime' ]      = str( requested_time )
1007            myAttrs[ 'req.memory' ]        = str( requested_memory )
1008            myAttrs[ 'req.nodes' ]         = str( requested_nodes )
1009            myAttrs[ 'req.ppn' ]           = str( ppn )
1010            myAttrs[ 'req.attributes' ]    = str( attributes )
1011            myAttrs[ 'timestamp.running' ] = str( start_timestamp )
1012            myAttrs[ 'timestamp.created' ] = str( create_timestamp )
1013            myAttrs[ 'timestamp.queued' ]  = str( queued_timestamp )
1014
1015            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q', 'W' ]:
1016
1017                self.jobs[ job_id ] = myAttrs
1018
1019        for id, attrs in self.jobs.items():
1020
1021            if id not in jobs_processed:
1022
1023                # This one isn't there anymore; toedeledoki!
1024                #
1025                del self.jobs[ id ]
1026
1027GMETRIC_DEFAULT_TYPE    = 'string'
1028GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1029GMETRIC_DEFAULT_PORT    = '8649'
1030GMETRIC_DEFAULT_UNITS   = ''
1031
1032class Gmetric:
1033
1034    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1035
1036    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1037    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1038    protocol        = ( 'udp', 'multicast' )
1039
1040    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1041               
1042        global GMETRIC_DEFAULT_TYPE
1043
1044        self.prot       = self.checkHostProtocol( host )
1045        self.data_msg   = xdrlib.Packer()
1046        self.meta_msg   = xdrlib.Packer()
1047        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1048
1049        if self.prot not in self.protocol:
1050
1051            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1052
1053        if self.prot == 'multicast':
1054
1055            # Set multicast options
1056            #
1057            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1058
1059        self.hostport   = ( host, int( port ) )
1060        self.slopestr   = 'both'
1061        self.tmax       = 60
1062
1063    def checkHostProtocol( self, ip ):
1064
1065        """Detect if a ip adress is a multicast address"""
1066
1067        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1068        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1069
1070        ip_fields               = ip.split( '.' )
1071
1072        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1073
1074            return 'multicast'
1075        else:
1076            return 'udp'
1077
1078    def send( self, name, value, dmax, typestr = '', units = '' ):
1079
1080        if len( units ) == 0:
1081            units       = GMETRIC_DEFAULT_UNITS
1082
1083        if len( typestr ) == 0:
1084            typestr     = GMETRIC_DEFAULT_TYPE
1085
1086        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
1087
1088        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1089        data_rt = self.socket.sendto( data_msg, self.hostport )
1090
1091        return ( meta_rt, data_rt )
1092
1093    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1094
1095        hostname = "unset"
1096
1097        if slopestr not in self.slope:
1098
1099            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
1100
1101        if typestr not in self.type:
1102
1103            raise ValueError( "Type must be one of: " + str( self.type ) )
1104
1105        if len( name ) == 0:
1106
1107            raise ValueError( "Name must be non-empty" )
1108
1109        self.meta_msg.reset()
1110        self.meta_msg.pack_int( 128 )
1111
1112        if not spoof:
1113            self.meta_msg.pack_string( hostname )
1114        else:
1115            self.meta_msg.pack_string( spoof )
1116
1117        self.meta_msg.pack_string( name )
1118
1119        if not spoof:
1120            self.meta_msg.pack_int( 0 )
1121        else:
1122            self.meta_msg.pack_int( 1 )
1123           
1124        self.meta_msg.pack_string( typestr )
1125        self.meta_msg.pack_string( name )
1126        self.meta_msg.pack_string( unitstr )
1127        self.meta_msg.pack_int( self.slope[ slopestr ] )
1128        self.meta_msg.pack_uint( int( tmax ) )
1129        self.meta_msg.pack_uint( int( dmax ) )
1130
1131        if not group:
1132            self.meta_msg.pack_int( 0 )
1133        else:
1134            self.meta_msg.pack_int( 1 )
1135            self.meta_msg.pack_string( "GROUP" )
1136            self.meta_msg.pack_string( group )
1137
1138        self.data_msg.reset()
1139        self.data_msg.pack_int( 128+5 )
1140
1141        if not spoof:
1142            self.data_msg.pack_string( hostname )
1143        else:
1144            self.data_msg.pack_string( spoof )
1145
1146        self.data_msg.pack_string( name )
1147
1148        if not spoof:
1149            self.data_msg.pack_int( 0 )
1150        else:
1151            self.data_msg.pack_int( 1 )
1152
1153        self.data_msg.pack_string( "%s" )
1154        self.data_msg.pack_string( str( value ) )
1155
1156        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
1157
1158def printTime( ):
1159
1160    """Print current time/date in human readable format for log/debug"""
1161
1162    return time.strftime("%a, %d %b %Y %H:%M:%S")
1163
1164def debug_msg( level, msg ):
1165
1166    """Print msg if at or above current debug level"""
1167
1168    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
1169
1170    if (not DAEMONIZE and DEBUG_LEVEL >= level):
1171        sys.stderr.write( msg + '\n' )
1172
1173    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1174        syslog.syslog( msg )
1175
1176def write_pidfile():
1177
1178    # Write pidfile if PIDFILE is set
1179    #
1180    if PIDFILE:
1181
1182        pid = os.getpid()
1183
1184        pidfile = open( PIDFILE, 'w' )
1185
1186        pidfile.write( str( pid ) )
1187        pidfile.close()
1188
1189def main():
1190
1191    """Application start"""
1192
1193    global PBSQuery, PBSError, lsfObject
1194    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
1195
1196    if not processArgs( sys.argv[1:] ):
1197
1198        sys.exit( 1 )
1199
1200    # Load appropriate DataGatherer depending on which BATCH_API is set
1201    # and any required modules for the Gatherer
1202    #
1203    if BATCH_API == 'pbs':
1204
1205        try:
1206            from PBSQuery import PBSQuery, PBSError
1207
1208        except ImportError:
1209
1210            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
1211            sys.exit( 1 )
1212
1213        gather = PbsDataGatherer()
1214
1215    elif BATCH_API == 'sge':
1216
1217        # Tested with SGE 6.0u11.
1218        #
1219        gather = SgeDataGatherer()
1220
1221    elif BATCH_API == 'lsf':
1222
1223        try:
1224            from lsfObject import lsfObject
1225        except:
1226            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
1227            sys.exit( 1)
1228
1229        gather = LsfDataGatherer()
1230
1231    else:
1232        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
1233
1234        sys.exit( 1 )
1235
1236    if( DAEMONIZE and USE_SYSLOG ):
1237
1238        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1239
1240    if DAEMONIZE:
1241
1242        gather.daemon()
1243    else:
1244        gather.run()
1245
1246# wh00t? someone started me! :)
1247#
1248if __name__ == '__main__':
1249    main()
Note: See TracBrowser for help on using the repository browser.