source: trunk/jobmond/jobmond.py @ 665

Last change on this file since 665 was 665, checked in by ramonb, 12 years ago
  • handle comments in gmond.conf
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 33.4 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2012  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobmond.py 665 2012-09-04 13:02:29Z ramonb $
22#
23
24import sys, getopt, ConfigParser, time, os, socket, string, re
25import xdrlib, socket, syslog, xml, xml.sax
26from types import *
27
28VERSION='TRUNK+SVN'
29
30def usage( ver ):
31
32    print 'jobmond %s' %VERSION
33
34    if ver:
35        return 0
36
37    print
38    print 'Purpose:'
39    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
40    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
41    print
42    print 'Usage:   jobmond [OPTIONS]'
43    print
44    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
45    print '  -p, --pidfile=FILE Use pid file to store the process id'
46    print '  -h, --help     Print help and exit'
47    print '  -v, --version          Print version and exit'
48    print
49
50def processArgs( args ):
51
52    SHORT_L     = 'p:hvc:'
53    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
54
55    global PIDFILE
56    PIDFILE     = None
57
58    config_filename = '/etc/jobmond.conf'
59
60    try:
61
62        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
63
64    except getopt.GetoptError, detail:
65
66        print detail
67        usage()
68        sys.exit( 1 )
69
70    for opt, value in opts:
71
72        if opt in [ '--config', '-c' ]:
73       
74            config_filename = value
75
76        if opt in [ '--pidfile', '-p' ]:
77
78            PIDFILE     = value
79       
80        if opt in [ '--help', '-h' ]:
81 
82            usage( False )
83            sys.exit( 0 )
84
85        if opt in [ '--version', '-v' ]:
86
87            usage( True )
88            sys.exit( 0 )
89
90    return loadConfig( config_filename )
91
92# Fixme:  This doesn't DTRT with commented-out bits of the file.  E.g.
93# it picked up a commented-out `mcast_join' and tried to use a
94# multicast channel when it shouldn't have done.
95class GangliaConfigParser:
96
97    def __init__( self, config_file ):
98
99        self.config_file    = config_file
100
101        if not os.path.exists( self.config_file ):
102
103            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
104            sys.exit( 1 )
105
106    def removeQuotes( self, value ):
107
108        clean_value = value
109        clean_value = clean_value.replace( "'", "" )
110        clean_value = clean_value.replace( '"', '' )
111        clean_value = clean_value.strip()
112
113        return clean_value
114
115    def removeComments( self, value ):
116
117        clean_value = value
118
119        if clean_value.find('#') != -1:
120
121            clean_value = value[:value.find('#')]
122
123        if clean_value.find('//') != -1:
124
125            clean_value = value[:value.find('//')]
126
127        return clean_value
128
129    def getVal( self, section, valname ):
130
131        cfg_fp      = open( self.config_file )
132        cfg_lines   = cfg_fp.readlines()
133        cfg_fp.close()
134
135        section_start = False
136        section_found = False
137        value         = None
138        comment_start = False
139
140        for line in cfg_lines:
141
142            line = line.strip()
143            line = self.removeComments( line )
144
145            if line.find( '/*' ) != -1:
146
147                line = line[:line.find('/*')]
148                comment_start = True
149
150            if line.find( '*/' ) != -1:
151
152                line = line[line.find('*/'):]
153                comment_start = False
154
155            if comment_start:
156
157                continue
158
159            if line.find( section ) != -1:
160
161                section_found   = True
162
163            if line.find( '{' ) != -1 and section_found:
164
165                section_start   = True
166
167            if line.find( '}' ) != -1 and section_found:
168
169                section_start   = False
170                section_found   = False
171
172            if line.find( valname ) != -1 and section_start:
173
174                value       = string.join( line.split( '=' )[1:], '' ).strip()
175
176        return value
177
178    def getInt( self, section, valname ):
179
180        value   = self.getVal( section, valname )
181
182        if not value:
183            return False
184
185        value   = self.removeQuotes( value )
186
187        return int( value )
188
189    def getStr( self, section, valname ):
190
191        value   = self.getVal( section, valname )
192
193        if not value:
194            return False
195
196        value   = self.removeQuotes( value )
197
198        return str( value )
199
200def findGmetric():
201
202    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
203
204        guess   = '%s/%s' %( dir, 'gmetric' )
205
206        if os.path.exists( guess ):
207
208            return guess
209
210    return False
211
212def loadConfig( filename ):
213
214    def getlist( cfg_string ):
215
216        my_list = [ ]
217
218        for item_txt in cfg_string.split( ',' ):
219
220                sep_char = None
221
222                item_txt = item_txt.strip()
223
224                for s_char in [ "'", '"' ]:
225
226                        if item_txt.find( s_char ) != -1:
227
228                                if item_txt.count( s_char ) != 2:
229
230                                        print 'Missing quote: %s' %item_txt
231                                        sys.exit( 1 )
232
233                                else:
234
235                                        sep_char = s_char
236                                        break
237
238                if sep_char:
239
240                        item_txt = item_txt.split( sep_char )[1]
241
242                my_list.append( item_txt )
243
244        return my_list
245
246    cfg     = ConfigParser.ConfigParser()
247
248    cfg.read( filename )
249
250    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
251    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
252    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
253    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
254
255    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
256
257    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
258
259    SYSLOG_LEVEL    = -1
260    SYSLOG_FACILITY = None
261
262    try:
263        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
264
265    except ConfigParser.NoOptionError:
266
267        USE_SYSLOG  = True
268
269        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
270
271    if USE_SYSLOG:
272
273        try:
274            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
275
276        except ConfigParser.NoOptionError:
277
278            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
279            SYSLOG_LEVEL    = 0
280
281        try:
282
283            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
284
285        except ConfigParser.NoOptionError:
286
287            SYSLOG_FACILITY = syslog.LOG_DAEMON
288
289            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
290
291    try:
292
293        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
294
295    except ConfigParser.NoOptionError:
296
297        # Backwards compatibility for old configs
298        #
299
300        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
301        api_guess       = 'pbs'
302   
303    try:
304   
305        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
306
307    except ConfigParser.NoOptionError:
308
309        # Backwards compatibility for old configs
310        #
311
312        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
313        api_guess       = 'pbs'
314   
315    try:
316
317        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
318
319    except ConfigParser.NoOptionError:
320
321        # Not specified: assume /etc/ganglia/gmond.conf
322        #
323        GMOND_CONF      = '/etc/ganglia/gmond.conf'
324
325    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
326
327    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
328    #
329    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
330
331    if not gmetric_dest_ip:
332
333        # Maybe unicast target then
334        #
335        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
336
337        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
338
339    if gmetric_dest_ip and gmetric_dest_port:
340
341        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
342    else:
343
344        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
345
346        # Couldn't figure it out: let's see if it's in our jobmond.conf
347        #
348        try:
349
350            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
351
352        # Guess not: now just give up
353        #
354        except ConfigParser.NoOptionError:
355
356            GMETRIC_TARGET  = None
357
358            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
359
360    gmetric_bin = findGmetric()
361
362    if gmetric_bin:
363
364        GMETRIC_BINARY      = gmetric_bin
365    else:
366        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
367
368        try:
369
370            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
371
372        except ConfigParser.NoOptionError:
373
374            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
375            sys.exit( 1 )
376
377    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
378
379    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
380
381    try:
382
383        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
384
385    except ConfigParser.NoOptionError, detail:
386
387        if BATCH_SERVER and api_guess:
388
389            BATCH_API   = api_guess
390        else:
391            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
392            sys.exit( 1 )
393
394    try:
395
396        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
397
398    except ConfigParser.NoOptionError, detail:
399
400        QUEUE       = None
401
402    return True
403
404def fqdn_parts (fqdn):
405
406    """Return pair of host and domain for fully-qualified domain name arg."""
407
408    parts = fqdn.split (".")
409
410    return (parts[0], string.join(parts[1:], "."))
411
412METRIC_MAX_VAL_LEN = 900
413
414class DataProcessor:
415
416    """Class for processing of data"""
417
418    binary = None
419
420    def __init__( self, binary=None ):
421
422        """Remember alternate binary location if supplied"""
423
424        global GMETRIC_BINARY
425
426        if binary:
427            self.binary = binary
428
429        if not self.binary:
430            self.binary = GMETRIC_BINARY
431
432        # Timeout for XML
433        #
434        # From ganglia's documentation:
435        #
436        # 'A metric will be deleted DMAX seconds after it is received, and
437            # DMAX=0 means eternal life.'
438
439        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
440
441        if GMOND_CONF:
442
443            incompatible = self.checkGmetricVersion()
444
445            if incompatible:
446
447                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
448                sys.exit( 1 )
449
450    def checkGmetricVersion( self ):
451
452        """
453        Check version of gmetric is at least 3.0.1
454        for the syntax we use
455        """
456
457        global METRIC_MAX_VAL_LEN
458
459        incompatible    = 0
460
461        gfp     = os.popen( self.binary + ' --version' )
462        lines       = gfp.readlines()
463
464        gfp.close()
465
466        for line in lines:
467
468            line = line.split( ' ' )
469
470            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
471           
472                gmetric_version = line[1].split( '\n' )[0]
473
474                version_major   = int( gmetric_version.split( '.' )[0] )
475                version_minor   = int( gmetric_version.split( '.' )[1] )
476                version_patch   = int( gmetric_version.split( '.' )[2] )
477
478                incompatible    = 0
479
480                if version_major < 3:
481
482                    incompatible = 1
483               
484                elif version_major == 3:
485
486                    if version_minor == 0:
487
488                        if version_patch < 1:
489                       
490                            incompatible = 1
491
492                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
493                        #
494                        if version_patch < 3:
495
496                            METRIC_MAX_VAL_LEN = 900
497
498                        elif version_patch >= 3:
499
500                            METRIC_MAX_VAL_LEN = 1400
501
502        return incompatible
503
504    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
505
506        """Call gmetric binary and multicast"""
507
508        cmd = self.binary
509
510        if GMETRIC_TARGET:
511
512            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
513            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
514
515            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
516
517            debug_msg( 10, printTime() + ' ' + metric_debug)
518
519            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
520
521            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
522
523        else:
524            try:
525                cmd = cmd + ' -c' + GMOND_CONF
526
527            except NameError:
528
529                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
530
531            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
532
533            if len( units ) > 0:
534
535                cmd = cmd + ' -u"' + units + '"'
536
537            debug_msg( 10, printTime() + ' ' + cmd )
538
539            os.system( cmd )
540
541class DataGatherer:
542
543    """Skeleton class for batch system DataGatherer"""
544
545    def printJobs( self, jobs ):
546
547        """Print a jobinfo overview"""
548
549        for name, attrs in self.jobs.items():
550
551            print 'job %s' %(name)
552
553            for name, val in attrs.items():
554
555                print '\t%s = %s' %( name, val )
556
557    def printJob( self, jobs, job_id ):
558
559        """Print job with job_id from jobs"""
560
561        print 'job %s' %(job_id)
562
563        for name, val in jobs[ job_id ].items():
564
565            print '\t%s = %s' %( name, val )
566
567    def getAttr( self, d, name ):
568
569        """Return certain attribute from dictionary, if exists"""
570
571        if d.has_key( name ):
572
573            if type( d[ name ] ) == ListType:
574
575                return string.join( d[ name ], ' ' )
576
577            return d[ name ]
578       
579        return ''
580
581    def jobDataChanged( self, jobs, job_id, attrs ):
582
583        """Check if job with attrs and job_id in jobs has changed"""
584
585        if jobs.has_key( job_id ):
586
587            oldData = jobs[ job_id ]   
588        else:
589            return 1
590
591        for name, val in attrs.items():
592
593            if oldData.has_key( name ):
594
595                if oldData[ name ] != attrs[ name ]:
596
597                    return 1
598
599            else:
600                return 1
601
602        return 0
603
604    def submitJobData( self ):
605
606        """Submit job info list"""
607
608        global BATCH_API
609
610        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
611
612        running_jobs    = 0
613        queued_jobs = 0
614
615        # Count how many running/queued jobs we found
616                #
617        for jobid, jobattrs in self.jobs.items():
618
619            if jobattrs[ 'status' ] == 'Q':
620
621                queued_jobs += 1
622
623            elif jobattrs[ 'status' ] == 'R':
624
625                running_jobs += 1
626
627        # Report running/queued jobs as seperate metric for a nice RRD graph
628                #
629        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
630        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
631
632        # Report down/offline nodes in batch (PBS only ATM)
633        #
634        if BATCH_API == 'pbs':
635
636            domain      = fqdn_parts( socket.getfqdn() )[1]
637
638            downed_nodes    = list()
639            offline_nodes   = list()
640       
641            l       = ['state']
642       
643            for name, node in self.pq.getnodes().items():
644
645                if 'down' in node[ 'state' ]:
646
647                    downed_nodes.append( name )
648
649                if 'offline' in node[ 'state' ]:
650
651                    offline_nodes.append( name )
652
653            downnodeslist       = do_nodelist( downed_nodes )
654            offlinenodeslist    = do_nodelist( offline_nodes )
655
656            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
657            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
658            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
659            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
660
661        # Now let's spread the knowledge
662        #
663        for jobid, jobattrs in self.jobs.items():
664
665            # Make gmetric values for each job: respect max gmetric value length
666            #
667            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
668
669            for g_name, g_val in gmetrics.items():
670
671                self.dp.multicastGmetric( g_name, g_val )
672
673    def compileGmetricVal( self, jobid, jobattrs ):
674
675        """Create gmetric name/value pairs of jobinfo"""
676
677        gmetrics = { }
678
679        for val_name, val_value in jobattrs.items():
680
681            gmetric_sequence = 0
682
683            if len( val_value ) > METRIC_MAX_VAL_LEN:
684
685                while len( val_value ) > METRIC_MAX_VAL_LEN:
686
687                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
688                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
689
690                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
691
692                    gmetrics[ gmetric_name ] = gmetric_value
693
694                    gmetric_sequence = gmetric_sequence + 1
695            else:
696                gmetric_value   = val_value
697
698                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
699
700                gmetrics[ gmetric_name ] = gmetric_value
701
702        return gmetrics
703
704    def daemon( self ):
705
706        """Run as daemon forever"""
707
708        # Fork the first child
709        #
710        pid = os.fork()
711        if pid > 0:
712                sys.exit(0)  # end parent
713
714        # creates a session and sets the process group ID
715        #
716        os.setsid()
717
718        # Fork the second child
719        #
720        pid = os.fork()
721        if pid > 0:
722                sys.exit(0)  # end parent
723
724        write_pidfile()
725
726        # Go to the root directory and set the umask
727        #
728        os.chdir('/')
729        os.umask(0)
730
731        sys.stdin.close()
732        sys.stdout.close()
733        sys.stderr.close()
734
735        os.open('/dev/null', os.O_RDWR)
736        os.dup2(0, 1)
737        os.dup2(0, 2)
738
739        self.run()
740
741    def run( self ):
742
743        """Main thread"""
744
745        while ( 1 ):
746       
747            self.getJobData()
748            self.submitJobData()
749            time.sleep( BATCH_POLL_INTERVAL )   
750
751# Abstracted from PBS original.
752#
753def do_nodelist( nodes ):
754
755    """Translate node list as appropriate."""
756
757    nodeslist       = [ ]
758    my_domain       = fqdn_parts( socket.getfqdn() )[1]
759
760    for node in nodes:
761
762        host        = node.split( '/' )[0] # not relevant for SGE
763        h, host_domain  = fqdn_parts(host)
764
765        if host_domain == my_domain:
766
767            host    = h
768
769        if nodeslist.count( host ) == 0:
770
771            for translate_pattern in BATCH_HOST_TRANSLATE:
772
773                if translate_pattern.find( '/' ) != -1:
774
775                    translate_orig  = \
776                        translate_pattern.split( '/' )[1]
777                    translate_new   = \
778                        translate_pattern.split( '/' )[2]
779                    host = re.sub( translate_orig,
780                               translate_new, host )
781            if not host in nodeslist:
782                nodeslist.append( host )
783    return nodeslist
784
785class PbsDataGatherer( DataGatherer ):
786
787    """This is the DataGatherer for PBS and Torque"""
788
789    global PBSQuery, PBSError
790
791    def __init__( self ):
792
793        """Setup appropriate variables"""
794
795        self.jobs   = { }
796        self.timeoffset = 0
797        self.dp     = DataProcessor()
798
799        self.initPbsQuery()
800
801    def initPbsQuery( self ):
802
803        self.pq     = None
804
805        if( BATCH_SERVER ):
806
807            self.pq     = PBSQuery( BATCH_SERVER )
808        else:
809            self.pq     = PBSQuery()
810
811    def getJobData( self ):
812
813        """Gather all data on current jobs in Torque"""
814
815        joblist     = {}
816        self.cur_time   = 0
817
818        try:
819            joblist     = self.pq.getjobs()
820            self.cur_time   = time.time()
821
822        except PBSError, detail:
823
824            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
825            return None
826
827        jobs_processed  = [ ]
828
829        for name, attrs in joblist.items():
830            display_queue       = 1
831            job_id          = name.split( '.' )[0]
832
833            owner           = self.getAttr( attrs, 'Job_Owner' )
834            name            = self.getAttr( attrs, 'Job_Name' )
835            queue           = self.getAttr( attrs, 'queue' )
836            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
837
838            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
839            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
840
841
842            requested_nodes     = ''
843            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
844
845            ppn         = ''
846            attributes  = ''
847
848            if mynoderequest.find( ':' ) != -1:
849
850                mynoderequest_fields    = mynoderequest.split( ':' )
851
852                for mynoderequest_field in mynoderequest_fields:
853
854                    if mynoderequest_field.isdigit():
855
856                        continue #TODO add requested_nodes if is hostname(s)
857
858                    if mynoderequest_field.find( 'ppn' ) != -1:
859
860                        ppn = mynoderequest_field.split( 'ppn=' )[1]
861
862                    else:
863
864                        if attributes == '':
865
866                            attributes = '%s' %mynoderequest_field
867                        else:
868                            attributes = '%s:%s' %( attributes, mynoderequest_field )
869
870            status          = self.getAttr( attrs, 'job_state' )
871
872            if status in [ 'Q', 'R', 'W' ]:
873
874                jobs_processed.append( job_id )
875
876            create_timestamp    = self.getAttr( attrs, 'ctime' )
877            running_nodes       = ''
878            exec_nodestr        = '' 
879
880            if status == 'R':
881
882                #start_timestamp     = self.getAttr( attrs, 'etime' )
883                start_timestamp     = self.getAttr( attrs, 'start_time' )
884                exec_nodestr        = self.getAttr( attrs, 'exec_host' )
885
886                nodes           = exec_nodestr.split( '+' )
887                nodeslist       = do_nodelist( nodes )
888                running_nodes   = string.join( nodeslist, ' ' )
889
890                if DETECT_TIME_DIFFS:
891
892                    # If a job start if later than our current date,
893                    # that must mean the Torque server's time is later
894                    # than our local time.
895               
896                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
897
898                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
899
900            elif status == 'Q':
901
902                # 'mynodequest' can be a string in the following syntax according to the
903                # Torque Administator's manual:
904                #
905                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
906                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
907                # etc
908                #
909
910                #
911                # For now we only count the amount of nodes request and ignore properties
912                #
913
914                start_timestamp     = ''
915                count_mynodes       = 0
916
917                queued_timestamp    = self.getAttr( attrs, 'qtime' )
918
919                for node in mynoderequest.split( '+' ):
920
921                    # Just grab the {node_count|hostname} part and ignore properties
922                    #
923                    nodepart    = node.split( ':' )[0]
924
925                    # Let's assume a node_count value
926                    #
927                    numeric_node    = 1
928
929                    # Chop the value up into characters
930                    #
931                    for letter in nodepart:
932
933                        # If this char is not a digit (0-9), this must be a hostname
934                        #
935                        if letter not in string.digits:
936
937                            numeric_node    = 0
938
939                    # If this is a hostname, just count this as one (1) node
940                    #
941                    if not numeric_node:
942
943                        count_mynodes   = count_mynodes + 1
944                    else:
945
946                        # If this a number, it must be the node_count
947                        # and increase our count with it's value
948                        #
949                        try:
950                            count_mynodes   = count_mynodes + int( nodepart )
951
952                        except ValueError, detail:
953
954                            # When we arrive here I must be bugged or very confused
955                            # THIS SHOULD NOT HAPPEN!
956                            #
957                            debug_msg( 10, str( detail ) )
958                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
959                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
960                            debug_msg( 10, 'job = ' + str( name ) )
961                            debug_msg( 10, 'attrs = ' + str( attrs ) )
962                       
963                nodeslist   = str( count_mynodes )
964            else:
965                start_timestamp = ''
966                nodeslist   = ''
967
968            myAttrs             = { }
969
970            myAttrs[ 'name' ]              = str( name )
971            myAttrs[ 'status' ]            = str( status )
972            myAttrs[ 'queue' ]             = str( queue )
973            myAttrs[ 'owner' ]             = str( owner )
974            myAttrs[ 'nodect' ]            = str( nodect )
975            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
976            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
977            myAttrs[ 'req.walltime' ]      = str( requested_time )
978            myAttrs[ 'req.memory' ]        = str( requested_memory )
979            myAttrs[ 'req.nodes' ]         = str( requested_nodes )
980            myAttrs[ 'req.ppn' ]           = str( ppn )
981            myAttrs[ 'req.attributes' ]    = str( attributes )
982            myAttrs[ 'timestamp.running' ] = str( start_timestamp )
983            myAttrs[ 'timestamp.created' ] = str( create_timestamp )
984            myAttrs[ 'timestamp.queued' ]  = str( queued_timestamp )
985
986            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q', 'W' ]:
987
988                self.jobs[ job_id ] = myAttrs
989
990        for id, attrs in self.jobs.items():
991
992            if id not in jobs_processed:
993
994                # This one isn't there anymore; toedeledoki!
995                #
996                del self.jobs[ id ]
997
998GMETRIC_DEFAULT_TYPE    = 'string'
999GMETRIC_DEFAULT_HOST    = '127.0.0.1'
1000GMETRIC_DEFAULT_PORT    = '8649'
1001GMETRIC_DEFAULT_UNITS   = ''
1002
1003class Gmetric:
1004
1005    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
1006
1007    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
1008    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
1009    protocol        = ( 'udp', 'multicast' )
1010
1011    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
1012               
1013        global GMETRIC_DEFAULT_TYPE
1014
1015        self.prot       = self.checkHostProtocol( host )
1016        self.data_msg   = xdrlib.Packer()
1017        self.meta_msg   = xdrlib.Packer()
1018        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
1019
1020        if self.prot not in self.protocol:
1021
1022            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1023
1024        if self.prot == 'multicast':
1025
1026            # Set multicast options
1027            #
1028            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1029
1030        self.hostport   = ( host, int( port ) )
1031        self.slopestr   = 'both'
1032        self.tmax       = 60
1033
1034    def checkHostProtocol( self, ip ):
1035
1036        """Detect if a ip adress is a multicast address"""
1037
1038        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1039        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1040
1041        ip_fields               = ip.split( '.' )
1042
1043        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1044
1045            return 'multicast'
1046        else:
1047            return 'udp'
1048
1049    def send( self, name, value, dmax, typestr = '', units = '' ):
1050
1051        if len( units ) == 0:
1052            units       = GMETRIC_DEFAULT_UNITS
1053
1054        if len( typestr ) == 0:
1055            typestr     = GMETRIC_DEFAULT_TYPE
1056
1057        (meta_msg, data_msg) = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
1058
1059        meta_rt = self.socket.sendto( meta_msg, self.hostport )
1060        data_rt = self.socket.sendto( data_msg, self.hostport )
1061
1062        return ( meta_rt, data_rt )
1063
1064    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax, group=None, spoof=None ):
1065
1066        hostname = "unset"
1067
1068        if slopestr not in self.slope:
1069
1070            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
1071
1072        if typestr not in self.type:
1073
1074            raise ValueError( "Type must be one of: " + str( self.type ) )
1075
1076        if len( name ) == 0:
1077
1078            raise ValueError( "Name must be non-empty" )
1079
1080        self.meta_msg.reset()
1081        self.meta_msg.pack_int( 128 )
1082
1083        if not spoof:
1084            self.meta_msg.pack_string( hostname )
1085        else:
1086            self.meta_msg.pack_string( spoof )
1087
1088        self.meta_msg.pack_string( name )
1089
1090        if not spoof:
1091            self.meta_msg.pack_int( 0 )
1092        else:
1093            self.meta_msg.pack_int( 1 )
1094           
1095        self.meta_msg.pack_string( typestr )
1096        self.meta_msg.pack_string( name )
1097        self.meta_msg.pack_string( unitstr )
1098        self.meta_msg.pack_int( self.slope[ slopestr ] )
1099        self.meta_msg.pack_uint( int( tmax ) )
1100        self.meta_msg.pack_uint( int( dmax ) )
1101
1102        if not group:
1103            self.meta_msg.pack_int( 0 )
1104        else:
1105            self.meta_msg.pack_int( 1 )
1106            self.meta_msg.pack_string( "GROUP" )
1107            self.meta_msg.pack_string( group )
1108
1109        self.data_msg.reset()
1110        self.data_msg.pack_int( 128+5 )
1111
1112        if not spoof:
1113            self.data_msg.pack_string( hostname )
1114        else:
1115            self.data_msg.pack_string( spoof )
1116
1117        self.data_msg.pack_string( name )
1118
1119        if not spoof:
1120            self.data_msg.pack_int( 0 )
1121        else:
1122            self.data_msg.pack_int( 1 )
1123
1124        self.data_msg.pack_string( "%s" )
1125        self.data_msg.pack_string( str( value ) )
1126
1127        return ( self.meta_msg.get_buffer(), self.data_msg.get_buffer() )
1128
1129def printTime( ):
1130
1131    """Print current time/date in human readable format for log/debug"""
1132
1133    return time.strftime("%a, %d %b %Y %H:%M:%S")
1134
1135def debug_msg( level, msg ):
1136
1137    """Print msg if at or above current debug level"""
1138
1139    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
1140
1141    if (not DAEMONIZE and DEBUG_LEVEL >= level):
1142        sys.stderr.write( msg + '\n' )
1143
1144    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1145        syslog.syslog( msg )
1146
1147def write_pidfile():
1148
1149    # Write pidfile if PIDFILE is set
1150    #
1151    if PIDFILE:
1152
1153        pid = os.getpid()
1154
1155        pidfile = open( PIDFILE, 'w' )
1156
1157        pidfile.write( str( pid ) )
1158        pidfile.close()
1159
1160def main():
1161
1162    """Application start"""
1163
1164    global PBSQuery, PBSError, lsfObject
1165    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
1166
1167    if not processArgs( sys.argv[1:] ):
1168
1169        sys.exit( 1 )
1170
1171    # Load appropriate DataGatherer depending on which BATCH_API is set
1172    # and any required modules for the Gatherer
1173    #
1174    if BATCH_API == 'pbs':
1175
1176        try:
1177            from PBSQuery import PBSQuery, PBSError
1178
1179        except ImportError:
1180
1181            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
1182            sys.exit( 1 )
1183
1184        gather = PbsDataGatherer()
1185
1186    elif BATCH_API == 'sge':
1187
1188        # Tested with SGE 6.0u11.
1189        #
1190        gather = SgeDataGatherer()
1191
1192    elif BATCH_API == 'lsf':
1193
1194        try:
1195            from lsfObject import lsfObject
1196        except:
1197            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
1198            sys.exit( 1)
1199
1200        gather = LsfDataGatherer()
1201
1202    else:
1203        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
1204
1205        sys.exit( 1 )
1206
1207    if( DAEMONIZE and USE_SYSLOG ):
1208
1209        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1210
1211    if DAEMONIZE:
1212
1213        gather.daemon()
1214    else:
1215        gather.run()
1216
1217# wh00t? someone started me! :)
1218#
1219if __name__ == '__main__':
1220    main()
Note: See TracBrowser for help on using the repository browser.