source: trunk/jobmond/jobmond.py @ 663

Last change on this file since 663 was 663, checked in by ramonb, 12 years ago
  • remove SGE, LSF: no feedback from users/authors and not compatible anymore
  • removed egroup metric
  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 31.9 KB
Line 
1#!/usr/bin/env python
2#
3# This file is part of Jobmonarch
4#
5# Copyright (C) 2006-2012  Ramon Bastiaans
6#
7# Jobmonarch is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# Jobmonarch is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20#
21# SVN $Id: jobmond.py 663 2012-09-04 10:24:10Z ramonb $
22#
23
24import sys, getopt, ConfigParser, time, os, socket, string, re
25import xdrlib, socket, syslog, xml, xml.sax
26from types import *
27
28VERSION='TRUNK+SVN'
29
30def usage( ver ):
31
32    print 'jobmond %s' %VERSION
33
34    if ver:
35        return 0
36
37    print
38    print 'Purpose:'
39    print '  The Job Monitoring Daemon (jobmond) reports batch jobs information and statistics'
40    print '  to Ganglia, which can be viewed with Job Monarch web frontend'
41    print
42    print 'Usage:   jobmond [OPTIONS]'
43    print
44    print '  -c, --config=FILE  The configuration file to use (default: /etc/jobmond.conf)'
45    print '  -p, --pidfile=FILE Use pid file to store the process id'
46    print '  -h, --help     Print help and exit'
47    print '  -v, --version          Print version and exit'
48    print
49
50def processArgs( args ):
51
52    SHORT_L     = 'p:hvc:'
53    LONG_L      = [ 'help', 'config=', 'pidfile=', 'version' ]
54
55    global PIDFILE
56    PIDFILE     = None
57
58    config_filename = '/etc/jobmond.conf'
59
60    try:
61
62        opts, args  = getopt.getopt( args, SHORT_L, LONG_L )
63
64    except getopt.GetoptError, detail:
65
66        print detail
67        usage()
68        sys.exit( 1 )
69
70    for opt, value in opts:
71
72        if opt in [ '--config', '-c' ]:
73       
74            config_filename = value
75
76        if opt in [ '--pidfile', '-p' ]:
77
78            PIDFILE     = value
79       
80        if opt in [ '--help', '-h' ]:
81 
82            usage( False )
83            sys.exit( 0 )
84
85        if opt in [ '--version', '-v' ]:
86
87            usage( True )
88            sys.exit( 0 )
89
90    return loadConfig( config_filename )
91
92# Fixme:  This doesn't DTRT with commented-out bits of the file.  E.g.
93# it picked up a commented-out `mcast_join' and tried to use a
94# multicast channel when it shouldn't have done.
95class GangliaConfigParser:
96
97    def __init__( self, config_file ):
98
99        self.config_file    = config_file
100
101        if not os.path.exists( self.config_file ):
102
103            debug_msg( 0, "FATAL ERROR: gmond config '" + self.config_file + "' not found!" )
104            sys.exit( 1 )
105
106    def removeQuotes( self, value ):
107
108        clean_value = value
109        clean_value = clean_value.replace( "'", "" )
110        clean_value = clean_value.replace( '"', '' )
111        clean_value = clean_value.strip()
112
113        return clean_value
114
115    def getVal( self, section, valname ):
116
117        cfg_fp      = open( self.config_file )
118        section_start   = False
119        section_found   = False
120        value       = None
121
122        for line in cfg_fp.readlines():
123
124            if line.find( section ) != -1:
125
126                section_found   = True
127
128            if line.find( '{' ) != -1 and section_found:
129
130                section_start   = True
131
132            if line.find( '}' ) != -1 and section_found:
133
134                section_start   = False
135                section_found   = False
136
137            if line.find( valname ) != -1 and section_start:
138
139                value       = string.join( line.split( '=' )[1:], '' ).strip()
140
141        cfg_fp.close()
142
143        return value
144
145    def getInt( self, section, valname ):
146
147        value   = self.getVal( section, valname )
148
149        if not value:
150            return False
151
152        value   = self.removeQuotes( value )
153
154        return int( value )
155
156    def getStr( self, section, valname ):
157
158        value   = self.getVal( section, valname )
159
160        if not value:
161            return False
162
163        value   = self.removeQuotes( value )
164
165        return str( value )
166
167def findGmetric():
168
169    for dir in os.path.expandvars( '$PATH' ).split( ':' ):
170
171        guess   = '%s/%s' %( dir, 'gmetric' )
172
173        if os.path.exists( guess ):
174
175            return guess
176
177    return False
178
179def loadConfig( filename ):
180
181    def getlist( cfg_string ):
182
183        my_list = [ ]
184
185        for item_txt in cfg_string.split( ',' ):
186
187                sep_char = None
188
189                item_txt = item_txt.strip()
190
191                for s_char in [ "'", '"' ]:
192
193                        if item_txt.find( s_char ) != -1:
194
195                                if item_txt.count( s_char ) != 2:
196
197                                        print 'Missing quote: %s' %item_txt
198                                        sys.exit( 1 )
199
200                                else:
201
202                                        sep_char = s_char
203                                        break
204
205                if sep_char:
206
207                        item_txt = item_txt.split( sep_char )[1]
208
209                my_list.append( item_txt )
210
211        return my_list
212
213    cfg     = ConfigParser.ConfigParser()
214
215    cfg.read( filename )
216
217    global DEBUG_LEVEL, DAEMONIZE, BATCH_SERVER, BATCH_POLL_INTERVAL
218    global GMOND_CONF, DETECT_TIME_DIFFS, BATCH_HOST_TRANSLATE
219    global BATCH_API, QUEUE, GMETRIC_TARGET, USE_SYSLOG
220    global SYSLOG_LEVEL, SYSLOG_FACILITY, GMETRIC_BINARY
221
222    DEBUG_LEVEL = cfg.getint( 'DEFAULT', 'DEBUG_LEVEL' )
223
224    DAEMONIZE   = cfg.getboolean( 'DEFAULT', 'DAEMONIZE' )
225
226    SYSLOG_LEVEL    = -1
227    SYSLOG_FACILITY = None
228
229    try:
230        USE_SYSLOG  = cfg.getboolean( 'DEFAULT', 'USE_SYSLOG' )
231
232    except ConfigParser.NoOptionError:
233
234        USE_SYSLOG  = True
235
236        debug_msg( 0, 'ERROR: no option USE_SYSLOG found: assuming yes' )
237
238    if USE_SYSLOG:
239
240        try:
241            SYSLOG_LEVEL    = cfg.getint( 'DEFAULT', 'SYSLOG_LEVEL' )
242
243        except ConfigParser.NoOptionError:
244
245            debug_msg( 0, 'ERROR: no option SYSLOG_LEVEL found: assuming level 0' )
246            SYSLOG_LEVEL    = 0
247
248        try:
249
250            SYSLOG_FACILITY = eval( 'syslog.LOG_' + cfg.get( 'DEFAULT', 'SYSLOG_FACILITY' ) )
251
252        except ConfigParser.NoOptionError:
253
254            SYSLOG_FACILITY = syslog.LOG_DAEMON
255
256            debug_msg( 0, 'ERROR: no option SYSLOG_FACILITY found: assuming facility DAEMON' )
257
258    try:
259
260        BATCH_SERVER        = cfg.get( 'DEFAULT', 'BATCH_SERVER' )
261
262    except ConfigParser.NoOptionError:
263
264        # Backwards compatibility for old configs
265        #
266
267        BATCH_SERVER        = cfg.get( 'DEFAULT', 'TORQUE_SERVER' )
268        api_guess       = 'pbs'
269   
270    try:
271   
272        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'BATCH_POLL_INTERVAL' )
273
274    except ConfigParser.NoOptionError:
275
276        # Backwards compatibility for old configs
277        #
278
279        BATCH_POLL_INTERVAL = cfg.getint( 'DEFAULT', 'TORQUE_POLL_INTERVAL' )
280        api_guess       = 'pbs'
281   
282    try:
283
284        GMOND_CONF      = cfg.get( 'DEFAULT', 'GMOND_CONF' )
285
286    except ConfigParser.NoOptionError:
287
288        # Not specified: assume /etc/ganglia/gmond.conf
289        #
290        GMOND_CONF      = '/etc/ganglia/gmond.conf'
291
292    ganglia_cfg     = GangliaConfigParser( GMOND_CONF )
293
294    # Let's try to find the GMETRIC_TARGET ourselves first from GMOND_CONF
295    #
296    gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'mcast_join' )
297
298    if not gmetric_dest_ip:
299
300        # Maybe unicast target then
301        #
302        gmetric_dest_ip     = ganglia_cfg.getStr( 'udp_send_channel', 'host' )
303
304        gmetric_dest_port   = ganglia_cfg.getStr( 'udp_send_channel', 'port' )
305
306    if gmetric_dest_ip and gmetric_dest_port:
307
308        GMETRIC_TARGET  = '%s:%s' %( gmetric_dest_ip, gmetric_dest_port )
309    else:
310
311        debug_msg( 0, "WARNING: Can't parse udp_send_channel from: '%s'" %GMOND_CONF )
312
313        # Couldn't figure it out: let's see if it's in our jobmond.conf
314        #
315        try:
316
317            GMETRIC_TARGET  = cfg.get( 'DEFAULT', 'GMETRIC_TARGET' )
318
319        # Guess not: now just give up
320        #
321        except ConfigParser.NoOptionError:
322
323            GMETRIC_TARGET  = None
324
325            debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )
326
327    gmetric_bin = findGmetric()
328
329    if gmetric_bin:
330
331        GMETRIC_BINARY      = gmetric_bin
332    else:
333        debug_msg( 0, "WARNING: Can't find gmetric binary anywhere in $PATH" )
334
335        try:
336
337            GMETRIC_BINARY      = cfg.get( 'DEFAULT', 'GMETRIC_BINARY' )
338
339        except ConfigParser.NoOptionError:
340
341            debug_msg( 0, "FATAL ERROR: GMETRIC_BINARY not set and not in $PATH" )
342            sys.exit( 1 )
343
344    DETECT_TIME_DIFFS   = cfg.getboolean( 'DEFAULT', 'DETECT_TIME_DIFFS' )
345
346    BATCH_HOST_TRANSLATE    = getlist( cfg.get( 'DEFAULT', 'BATCH_HOST_TRANSLATE' ) )
347
348    try:
349
350        BATCH_API   = cfg.get( 'DEFAULT', 'BATCH_API' )
351
352    except ConfigParser.NoOptionError, detail:
353
354        if BATCH_SERVER and api_guess:
355
356            BATCH_API   = api_guess
357        else:
358            debug_msg( 0, "FATAL ERROR: BATCH_API not set and can't make guess" )
359            sys.exit( 1 )
360
361    try:
362
363        QUEUE       = getlist( cfg.get( 'DEFAULT', 'QUEUE' ) )
364
365    except ConfigParser.NoOptionError, detail:
366
367        QUEUE       = None
368
369    return True
370
371def fqdn_parts (fqdn):
372
373    """Return pair of host and domain for fully-qualified domain name arg."""
374
375    parts = fqdn.split (".")
376
377    return (parts[0], string.join(parts[1:], "."))
378
379METRIC_MAX_VAL_LEN = 900
380
381class DataProcessor:
382
383    """Class for processing of data"""
384
385    binary = None
386
387    def __init__( self, binary=None ):
388
389        """Remember alternate binary location if supplied"""
390
391        global GMETRIC_BINARY
392
393        if binary:
394            self.binary = binary
395
396        if not self.binary:
397            self.binary = GMETRIC_BINARY
398
399        # Timeout for XML
400        #
401        # From ganglia's documentation:
402        #
403        # 'A metric will be deleted DMAX seconds after it is received, and
404            # DMAX=0 means eternal life.'
405
406        self.dmax = str( int( int( BATCH_POLL_INTERVAL ) * 2 ) )
407
408        if GMOND_CONF:
409
410            incompatible = self.checkGmetricVersion()
411
412            if incompatible:
413
414                debug_msg( 0, 'Gmetric version not compatible, please upgrade to at least 3.0.1' )
415                sys.exit( 1 )
416
417    def checkGmetricVersion( self ):
418
419        """
420        Check version of gmetric is at least 3.0.1
421        for the syntax we use
422        """
423
424        global METRIC_MAX_VAL_LEN
425
426        incompatible    = 0
427
428        gfp     = os.popen( self.binary + ' --version' )
429        lines       = gfp.readlines()
430
431        gfp.close()
432
433        for line in lines:
434
435            line = line.split( ' ' )
436
437            if len( line ) == 2 and str( line ).find( 'gmetric' ) != -1:
438           
439                gmetric_version = line[1].split( '\n' )[0]
440
441                version_major   = int( gmetric_version.split( '.' )[0] )
442                version_minor   = int( gmetric_version.split( '.' )[1] )
443                version_patch   = int( gmetric_version.split( '.' )[2] )
444
445                incompatible    = 0
446
447                if version_major < 3:
448
449                    incompatible = 1
450               
451                elif version_major == 3:
452
453                    if version_minor == 0:
454
455                        if version_patch < 1:
456                       
457                            incompatible = 1
458
459                        # Gmetric 3.0.1 >< 3.0.3 had a bug in the max metric length
460                        #
461                        if version_patch < 3:
462
463                            METRIC_MAX_VAL_LEN = 900
464
465                        elif version_patch >= 3:
466
467                            METRIC_MAX_VAL_LEN = 1400
468
469        return incompatible
470
471    def multicastGmetric( self, metricname, metricval, valtype='string', units='' ):
472
473        """Call gmetric binary and multicast"""
474
475        cmd = self.binary
476
477        if GMETRIC_TARGET:
478
479            GMETRIC_TARGET_HOST = GMETRIC_TARGET.split( ':' )[0]
480            GMETRIC_TARGET_PORT = GMETRIC_TARGET.split( ':' )[1]
481
482            metric_debug        = "[gmetric] name: %s - val: %s - dmax: %s" %( str( metricname ), str( metricval ), str( self.dmax ) )
483
484            debug_msg( 10, printTime() + ' ' + metric_debug)
485
486            gm = Gmetric( GMETRIC_TARGET_HOST, GMETRIC_TARGET_PORT )
487
488            gm.send( str( metricname ), str( metricval ), str( self.dmax ), valtype, units )
489
490        else:
491            try:
492                cmd = cmd + ' -c' + GMOND_CONF
493
494            except NameError:
495
496                debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' )
497
498            cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax )
499
500            if len( units ) > 0:
501
502                cmd = cmd + ' -u"' + units + '"'
503
504            debug_msg( 10, printTime() + ' ' + cmd )
505
506            os.system( cmd )
507
508class DataGatherer:
509
510    """Skeleton class for batch system DataGatherer"""
511
512    def printJobs( self, jobs ):
513
514        """Print a jobinfo overview"""
515
516        for name, attrs in self.jobs.items():
517
518            print 'job %s' %(name)
519
520            for name, val in attrs.items():
521
522                print '\t%s = %s' %( name, val )
523
524    def printJob( self, jobs, job_id ):
525
526        """Print job with job_id from jobs"""
527
528        print 'job %s' %(job_id)
529
530        for name, val in jobs[ job_id ].items():
531
532            print '\t%s = %s' %( name, val )
533
534    def getAttr( self, d, name ):
535
536        """Return certain attribute from dictionary, if exists"""
537
538        if d.has_key( name ):
539
540            if type( d[ name ] ) == ListType:
541
542                return string.join( d[ name ], ' ' )
543
544            return d[ name ]
545       
546        return ''
547
548    def jobDataChanged( self, jobs, job_id, attrs ):
549
550        """Check if job with attrs and job_id in jobs has changed"""
551
552        if jobs.has_key( job_id ):
553
554            oldData = jobs[ job_id ]   
555        else:
556            return 1
557
558        for name, val in attrs.items():
559
560            if oldData.has_key( name ):
561
562                if oldData[ name ] != attrs[ name ]:
563
564                    return 1
565
566            else:
567                return 1
568
569        return 0
570
571    def submitJobData( self ):
572
573        """Submit job info list"""
574
575        global BATCH_API
576
577        self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
578
579        running_jobs    = 0
580        queued_jobs = 0
581
582        # Count how many running/queued jobs we found
583                #
584        for jobid, jobattrs in self.jobs.items():
585
586            if jobattrs[ 'status' ] == 'Q':
587
588                queued_jobs += 1
589
590            elif jobattrs[ 'status' ] == 'R':
591
592                running_jobs += 1
593
594        # Report running/queued jobs as seperate metric for a nice RRD graph
595                #
596        self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )
597        self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )
598
599        # Report down/offline nodes in batch (PBS only ATM)
600        #
601        if BATCH_API == 'pbs':
602
603            domain      = fqdn_parts( socket.getfqdn() )[1]
604
605            downed_nodes    = list()
606            offline_nodes   = list()
607       
608            l       = ['state']
609       
610            for name, node in self.pq.getnodes().items():
611
612                if 'down' in node[ 'state' ]:
613
614                    downed_nodes.append( name )
615
616                if 'offline' in node[ 'state' ]:
617
618                    offline_nodes.append( name )
619
620            downnodeslist       = do_nodelist( downed_nodes )
621            offlinenodeslist    = do_nodelist( offline_nodes )
622
623            down_str    = 'nodes=%s domain=%s reported=%s' %( string.join( downnodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
624            offl_str    = 'nodes=%s domain=%s reported=%s' %( string.join( offlinenodeslist, ';' ), domain, str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )
625            self.dp.multicastGmetric( 'MONARCH-DOWN'   , down_str )
626            self.dp.multicastGmetric( 'MONARCH-OFFLINE', offl_str )
627
628        # Now let's spread the knowledge
629        #
630        for jobid, jobattrs in self.jobs.items():
631
632            # Make gmetric values for each job: respect max gmetric value length
633            #
634            gmetrics     = self.compileGmetricVal( jobid, jobattrs )
635
636            for g_name, g_val in gmetrics.items():
637
638                self.dp.multicastGmetric( g_name, g_val )
639
640    def compileGmetricVal( self, jobid, jobattrs ):
641
642        """Create gmetric name/value pairs of jobinfo"""
643
644        gmetrics = { }
645
646        for val_name, val_value in jobattrs.items():
647
648            gmetric_sequence = 0
649
650            if len( val_value ) > METRIC_MAX_VAL_LEN:
651
652                while len( val_value ) > METRIC_MAX_VAL_LEN:
653
654                    gmetric_value   = val_value[:METRIC_MAX_VAL_LEN]
655                    val_value       = val_value[METRIC_MAX_VAL_LEN:]
656
657                    gmetric_name    = 'MONARCHJOB$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
658
659                    gmetrics[ gmetric_name ] = gmetric_value
660
661                    gmetric_sequence = gmetric_sequence + 1
662            else:
663                gmetric_value   = val_value
664
665                gmetric_name    = 'MONARCH$%s$%s$%s' %( jobid, string.upper(val_name), gmetric_sequence )
666
667                gmetrics[ gmetric_name ] = gmetric_value
668
669        return gmetrics
670
671    def daemon( self ):
672
673        """Run as daemon forever"""
674
675        # Fork the first child
676        #
677        pid = os.fork()
678        if pid > 0:
679                sys.exit(0)  # end parent
680
681        # creates a session and sets the process group ID
682        #
683        os.setsid()
684
685        # Fork the second child
686        #
687        pid = os.fork()
688        if pid > 0:
689                sys.exit(0)  # end parent
690
691        write_pidfile()
692
693        # Go to the root directory and set the umask
694        #
695        os.chdir('/')
696        os.umask(0)
697
698        sys.stdin.close()
699        sys.stdout.close()
700        sys.stderr.close()
701
702        os.open('/dev/null', os.O_RDWR)
703        os.dup2(0, 1)
704        os.dup2(0, 2)
705
706        self.run()
707
708    def run( self ):
709
710        """Main thread"""
711
712        while ( 1 ):
713       
714            self.getJobData()
715            self.submitJobData()
716            time.sleep( BATCH_POLL_INTERVAL )   
717
718# Abstracted from PBS original.
719#
720def do_nodelist( nodes ):
721
722    """Translate node list as appropriate."""
723
724    nodeslist       = [ ]
725    my_domain       = fqdn_parts( socket.getfqdn() )[1]
726
727    for node in nodes:
728
729        host        = node.split( '/' )[0] # not relevant for SGE
730        h, host_domain  = fqdn_parts(host)
731
732        if host_domain == my_domain:
733
734            host    = h
735
736        if nodeslist.count( host ) == 0:
737
738            for translate_pattern in BATCH_HOST_TRANSLATE:
739
740                if translate_pattern.find( '/' ) != -1:
741
742                    translate_orig  = \
743                        translate_pattern.split( '/' )[1]
744                    translate_new   = \
745                        translate_pattern.split( '/' )[2]
746                    host = re.sub( translate_orig,
747                               translate_new, host )
748            if not host in nodeslist:
749                nodeslist.append( host )
750    return nodeslist
751
752class PbsDataGatherer( DataGatherer ):
753
754    """This is the DataGatherer for PBS and Torque"""
755
756    global PBSQuery, PBSError
757
758    def __init__( self ):
759
760        """Setup appropriate variables"""
761
762        self.jobs   = { }
763        self.timeoffset = 0
764        self.dp     = DataProcessor()
765
766        self.initPbsQuery()
767
768    def initPbsQuery( self ):
769
770        self.pq     = None
771
772        if( BATCH_SERVER ):
773
774            self.pq     = PBSQuery( BATCH_SERVER )
775        else:
776            self.pq     = PBSQuery()
777
778    def getJobData( self ):
779
780        """Gather all data on current jobs in Torque"""
781
782        joblist     = {}
783        self.cur_time   = 0
784
785        try:
786            joblist     = self.pq.getjobs()
787            self.cur_time   = time.time()
788
789        except PBSError, detail:
790
791            debug_msg( 10, "Caught PBS unavailable, skipping until next polling interval: " + str( detail ) )
792            return None
793
794        jobs_processed  = [ ]
795
796        for name, attrs in joblist.items():
797            display_queue       = 1
798            job_id          = name.split( '.' )[0]
799
800            owner           = self.getAttr( attrs, 'Job_Owner' )
801            name            = self.getAttr( attrs, 'Job_Name' )
802            queue           = self.getAttr( attrs, 'queue' )
803            nodect          = self.getAttr( attrs['Resource_List'], 'nodect' )
804
805            requested_time      = self.getAttr( attrs['Resource_List'], 'walltime' )
806            requested_memory    = self.getAttr( attrs['Resource_List'], 'mem' )
807
808
809            requested_nodes     = ''
810            mynoderequest       = self.getAttr( attrs['Resource_List'], 'nodes' )
811
812            ppn         = ''
813            attributes  = ''
814
815            if mynoderequest.find( ':' ) != -1:
816
817                mynoderequest_fields    = mynoderequest.split( ':' )
818
819                for mynoderequest_field in mynoderequest_fields:
820
821                    if mynoderequest_field.isdigit():
822
823                        continue #TODO add requested_nodes if is hostname(s)
824
825                    if mynoderequest_field.find( 'ppn' ) != -1:
826
827                        ppn = mynoderequest_field.split( 'ppn=' )[1]
828
829                    else:
830
831                        if attributes == '':
832
833                            attributes = '%s' %mynoderequest_field
834                        else:
835                            attributes = '%s:%s' %( attributes, mynoderequest_field )
836
837            status          = self.getAttr( attrs, 'job_state' )
838
839            if status in [ 'Q', 'R', 'W' ]:
840
841                jobs_processed.append( job_id )
842
843            create_timestamp    = self.getAttr( attrs, 'ctime' )
844            running_nodes       = ''
845            exec_nodestr        = '' 
846
847            if status == 'R':
848
849                #start_timestamp     = self.getAttr( attrs, 'etime' )
850                start_timestamp     = self.getAttr( attrs, 'start_time' )
851                exec_nodestr        = self.getAttr( attrs, 'exec_host' )
852
853                nodes           = exec_nodestr.split( '+' )
854                nodeslist       = do_nodelist( nodes )
855                running_nodes   = string.join( nodeslist, ' ' )
856
857                if DETECT_TIME_DIFFS:
858
859                    # If a job start if later than our current date,
860                    # that must mean the Torque server's time is later
861                    # than our local time.
862               
863                    if int( start_timestamp ) > int( int( self.cur_time ) + int( self.timeoffset ) ):
864
865                        self.timeoffset = int( int(start_timestamp) - int(self.cur_time) )
866
867            elif status == 'Q':
868
869                # 'mynodequest' can be a string in the following syntax according to the
870                # Torque Administator's manual:
871                #
872                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
873                # {<node_count> | <hostname>}[:ppn=<ppn>][:<property>[:<property>]...][+ ...]
874                # etc
875                #
876
877                #
878                # For now we only count the amount of nodes request and ignore properties
879                #
880
881                start_timestamp     = ''
882                count_mynodes       = 0
883
884                queued_timestamp    = self.getAttr( attrs, 'qtime' )
885
886                for node in mynoderequest.split( '+' ):
887
888                    # Just grab the {node_count|hostname} part and ignore properties
889                    #
890                    nodepart    = node.split( ':' )[0]
891
892                    # Let's assume a node_count value
893                    #
894                    numeric_node    = 1
895
896                    # Chop the value up into characters
897                    #
898                    for letter in nodepart:
899
900                        # If this char is not a digit (0-9), this must be a hostname
901                        #
902                        if letter not in string.digits:
903
904                            numeric_node    = 0
905
906                    # If this is a hostname, just count this as one (1) node
907                    #
908                    if not numeric_node:
909
910                        count_mynodes   = count_mynodes + 1
911                    else:
912
913                        # If this a number, it must be the node_count
914                        # and increase our count with it's value
915                        #
916                        try:
917                            count_mynodes   = count_mynodes + int( nodepart )
918
919                        except ValueError, detail:
920
921                            # When we arrive here I must be bugged or very confused
922                            # THIS SHOULD NOT HAPPEN!
923                            #
924                            debug_msg( 10, str( detail ) )
925                            debug_msg( 10, "Encountered weird node in Resources_List?!" )
926                            debug_msg( 10, 'nodepart = ' + str( nodepart ) )
927                            debug_msg( 10, 'job = ' + str( name ) )
928                            debug_msg( 10, 'attrs = ' + str( attrs ) )
929                       
930                nodeslist   = str( count_mynodes )
931            else:
932                start_timestamp = ''
933                nodeslist   = ''
934
935            myAttrs             = { }
936
937            myAttrs[ 'name' ]              = str( name )
938            myAttrs[ 'status' ]            = str( status )
939            myAttrs[ 'queue' ]             = str( queue )
940            myAttrs[ 'owner' ]             = str( owner )
941            myAttrs[ 'nodect' ]            = str( nodect )
942            myAttrs[ 'exec.hostnames' ]    = str( running_nodes )
943            myAttrs[ 'exec.nodestr' ]      = str( exec_nodestr )
944            myAttrs[ 'req.walltime' ]      = str( requested_time )
945            myAttrs[ 'req.memory' ]        = str( requested_memory )
946            myAttrs[ 'req.nodes' ]         = str( requested_nodes )
947            myAttrs[ 'req.ppn' ]           = str( ppn )
948            myAttrs[ 'req.attributes' ]    = str( attributes )
949            myAttrs[ 'timestamp.running' ] = str( start_timestamp )
950            myAttrs[ 'timestamp.created' ] = str( create_timestamp )
951            myAttrs[ 'timestamp.queued' ]  = str( queued_timestamp )
952
953            if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q', 'W' ]:
954
955                self.jobs[ job_id ] = myAttrs
956
957        for id, attrs in self.jobs.items():
958
959            if id not in jobs_processed:
960
961                # This one isn't there anymore; toedeledoki!
962                #
963                del self.jobs[ id ]
964
965#
966# Gmetric by Nick Galbreath - nickg(a.t)modp(d.o.t)com
967# Version 1.0 - 21-April2-2007
968# http://code.google.com/p/embeddedgmetric/
969#
970# Modified by: Ramon Bastiaans
971# For the Job Monarch Project, see: https://subtrac.sara.nl/oss/jobmonarch/
972#
973# added: DEFAULT_TYPE for Gmetric's
974# added: checkHostProtocol to determine if target is multicast or not
975# changed: allow default for Gmetric constructor
976# changed: allow defaults for all send() values except dmax
977#
978
979GMETRIC_DEFAULT_TYPE    = 'string'
980GMETRIC_DEFAULT_HOST    = '127.0.0.1'
981GMETRIC_DEFAULT_PORT    = '8649'
982GMETRIC_DEFAULT_UNITS   = ''
983
984class Gmetric:
985
986    global GMETRIC_DEFAULT_HOST, GMETRIC_DEFAULT_PORT
987
988    slope           = { 'zero' : 0, 'positive' : 1, 'negative' : 2, 'both' : 3, 'unspecified' : 4 }
989    type            = ( '', 'string', 'uint16', 'int16', 'uint32', 'int32', 'float', 'double', 'timestamp' )
990    protocol        = ( 'udp', 'multicast' )
991
992    def __init__( self, host=GMETRIC_DEFAULT_HOST, port=GMETRIC_DEFAULT_PORT ):
993               
994        global GMETRIC_DEFAULT_TYPE
995
996        self.prot       = self.checkHostProtocol( host )
997        self.msg        = xdrlib.Packer()
998        self.socket     = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
999
1000        if self.prot not in self.protocol:
1001
1002            raise ValueError( "Protocol must be one of: " + str( self.protocol ) )
1003
1004        if self.prot == 'multicast':
1005
1006            # Set multicast options
1007            #
1008            self.socket.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20 )
1009
1010        self.hostport   = ( host, int( port ) )
1011        self.slopestr   = 'both'
1012        self.tmax       = 60
1013
1014    def checkHostProtocol( self, ip ):
1015
1016        """Detect if a ip adress is a multicast address"""
1017
1018        MULTICAST_ADDRESS_MIN   = ( "224", "0", "0", "0" )
1019        MULTICAST_ADDRESS_MAX   = ( "239", "255", "255", "255" )
1020
1021        ip_fields               = ip.split( '.' )
1022
1023        if ip_fields >= MULTICAST_ADDRESS_MIN and ip_fields <= MULTICAST_ADDRESS_MAX:
1024
1025            return 'multicast'
1026        else:
1027            return 'udp'
1028
1029    def send( self, name, value, dmax, typestr = '', units = '' ):
1030
1031        if len( units ) == 0:
1032            units       = GMETRIC_DEFAULT_UNITS
1033
1034        if len( typestr ) == 0:
1035            typestr     = GMETRIC_DEFAULT_TYPE
1036
1037        msg             = self.makexdr( name, value, typestr, units, self.slopestr, self.tmax, dmax )
1038
1039        return self.socket.sendto( msg, self.hostport )
1040
1041    def makexdr( self, name, value, typestr, unitstr, slopestr, tmax, dmax ):
1042
1043        if slopestr not in self.slope:
1044
1045            raise ValueError( "Slope must be one of: " + str( self.slope.keys() ) )
1046
1047        if typestr not in self.type:
1048
1049            raise ValueError( "Type must be one of: " + str( self.type ) )
1050
1051        if len( name ) == 0:
1052
1053            raise ValueError( "Name must be non-empty" )
1054
1055        self.msg.reset()
1056        self.msg.pack_int( 0 )
1057        self.msg.pack_string( typestr )
1058        self.msg.pack_string( name )
1059        self.msg.pack_string( str( value ) )
1060        self.msg.pack_string( unitstr )
1061        self.msg.pack_int( self.slope[ slopestr ] )
1062        self.msg.pack_uint( int( tmax ) )
1063        self.msg.pack_uint( int( dmax ) )
1064
1065        return self.msg.get_buffer()
1066
1067def printTime( ):
1068
1069    """Print current time/date in human readable format for log/debug"""
1070
1071    return time.strftime("%a, %d %b %Y %H:%M:%S")
1072
1073def debug_msg( level, msg ):
1074
1075    """Print msg if at or above current debug level"""
1076
1077    global DAEMONIZE, DEBUG_LEVEL, SYSLOG_LEVEL
1078
1079    if (not DAEMONIZE and DEBUG_LEVEL >= level):
1080        sys.stderr.write( msg + '\n' )
1081
1082    if (DAEMONIZE and USE_SYSLOG and SYSLOG_LEVEL >= level):
1083        syslog.syslog( msg )
1084
1085def write_pidfile():
1086
1087    # Write pidfile if PIDFILE is set
1088    #
1089    if PIDFILE:
1090
1091        pid = os.getpid()
1092
1093        pidfile = open( PIDFILE, 'w' )
1094
1095        pidfile.write( str( pid ) )
1096        pidfile.close()
1097
1098def main():
1099
1100    """Application start"""
1101
1102    global PBSQuery, PBSError, lsfObject
1103    global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE
1104
1105    if not processArgs( sys.argv[1:] ):
1106
1107        sys.exit( 1 )
1108
1109    # Load appropriate DataGatherer depending on which BATCH_API is set
1110    # and any required modules for the Gatherer
1111    #
1112    if BATCH_API == 'pbs':
1113
1114        try:
1115            from PBSQuery import PBSQuery, PBSError
1116
1117        except ImportError:
1118
1119            debug_msg( 0, "FATAL ERROR: BATCH_API set to 'pbs' but python module 'pbs_python' is not installed" )
1120            sys.exit( 1 )
1121
1122        gather = PbsDataGatherer()
1123
1124    elif BATCH_API == 'sge':
1125
1126        # Tested with SGE 6.0u11.
1127        #
1128        gather = SgeDataGatherer()
1129
1130    elif BATCH_API == 'lsf':
1131
1132        try:
1133            from lsfObject import lsfObject
1134        except:
1135            debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed")
1136            sys.exit( 1)
1137
1138        gather = LsfDataGatherer()
1139
1140    else:
1141        debug_msg( 0, "FATAL ERROR: unknown BATCH_API '" + BATCH_API + "' is not supported" )
1142
1143        sys.exit( 1 )
1144
1145    if( DAEMONIZE and USE_SYSLOG ):
1146
1147        syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY )
1148
1149    if DAEMONIZE:
1150
1151        gather.daemon()
1152    else:
1153        gather.run()
1154
1155# wh00t? someone started me! :)
1156#
1157if __name__ == '__main__':
1158    main()
Note: See TracBrowser for help on using the repository browser.