Ticket #24: sge-2.diff
File sge-2.diff, 39.5 KB (added by d.love@…, 16 years ago) |
---|
-
pkg/deb/default/jobmond
Tue Mar 4 17:03:17 GMT 2008 Dave Love <fx@gnu.org> * Merge from svn to revision 496. Mon Dec 10 15:46:45 GMT 2007 Dave Love <fx@gnu.org> * Tidy/fix SGE changes. Fri Dec 7 10:57:55 GMT 2007 Dave Love <fx@gnu.org> * Add note about SGE tasks to overview template. Fri Nov 9 15:39:41 GMT 2007 Dave Love <fx@gnu.org> * Replace previous SGE implementation with a different one. This provides the full set of monarch data. There's some associated restructuring involving the PBS code. Fri Nov 9 15:27:01 GMT 2007 Dave Love <fx@gnu.org> * jobmond.conf comment fix. Fri Nov 9 15:21:25 GMT 2007 Dave Love <fx@gnu.org> * Somewhat modify RPM spec files. Wed Nov 7 17:49:37 GMT 2007 Dave Love <fx@gnu.org> * Modify jobmond for use with SGE. Wed Nov 7 17:45:24 GMT 2007 Dave Love <fx@gnu.org> * Reformat and update INSTALL file. Wed Nov 7 17:44:38 GMT 2007 Dave Love <fx@gnu.org> * Add PATH example to init file defaults.
1 1 OPTIONS="" 2 # If you use SGE, modify this as appropriate to find qstat. 3 case $(uname -m) in 4 x86_64) PATH="${PATH}:/opt/sge/bin/lx26-amd64";; 5 i*86) PATH="${PATH}:/opt/sge/bin/lx26-x86";; 6 esac -
pkg/rpm/jobmonarch-jobmond.spec
3 3 Version: 4 4 Release: 5 5 Summary: Job Monitoring Daemon 6 License: see /usr/share/doc/jobmonarch-jobmond/copyright7 Distribution: Debian8 Group: Converted/misc6 License: GPL 7 Distribution: Fedora 8 Group: Applications/System 9 9 10 10 %define _rpmdir ../ 11 11 %define _rpmfilename %%{NAME}-%%{VERSION}-%%{RELEASE}.%%{ARCH}.rpm … … 19 19 if [ -x /etc/init.d/jobmond ] 20 20 then 21 21 22 chkconfig --add jobmond 22 23 chkconfig jobmond on 24 /etc/init.d/jobmond restart 23 25 24 26 fi 25 27 26 /etc/init.d/jobmond restart27 28 29 28 %preun 30 29 #!/bin/sh 31 30 32 31 /etc/init.d/jobmond stop 33 32 chkconfig jobmond off 33 chkconfig --del jobmond 34 34 35 35 36 36 %description -
pkg/rpm/sysconfig/jobmond
1 1 OPTIONS="" 2 # If you use SGE, modify this as appropriate to find qstat. 3 case $(uname -m) in 4 x86_64) PATH="${PATH}:/opt/sge/bin/lx26-amd64";; 5 i*86) PATH="${PATH}:/opt/sge/bin/lx26-x86";; 6 esac -
pkg/rpm/jobmonarch-webfrontend.spec
2 2 Name: jobmonarch-webfrontend 3 3 Version: 4 4 Release: 5 Summary: Job MonArch 'sWeb Frontend6 License: see /usr/share/doc/jobmonarch-webfrontend/copyright7 Distribution: Debian8 Group: Converted/misc5 Summary: Job MonArch Web Frontend 6 License: GPL 7 Distribution: Fedora 8 Group: Applications/Internet 9 9 10 10 %define _rpmdir ../ 11 11 %define _rpmfilename %%{NAME}-%%{VERSION}-%%{RELEASE}.%%{ARCH}.rpm … … 32 32 Job Monarch's web frontend. 33 33 34 34 %files 35 %dir "/ var/www/ganglia/templates/job_monarch/"36 %dir "/ var/www/ganglia/templates/job_monarch/images/"37 "/ var/www/ganglia/templates/job_monarch/cluster_extra.tpl"38 "/ var/www/ganglia/templates/job_monarch/host_extra.tpl"39 %dir "/ var/www/ganglia/addons/job_monarch/"40 %dir "/ var/www/ganglia/addons/job_monarch/clusterconf/"41 "/ var/www/ganglia/addons/job_monarch/clusterconf/example.php"42 %dir "/ var/www/ganglia/addons/job_monarch/templates/"43 "/ var/www/ganglia/addons/job_monarch/templates/overview.tpl"44 "/ var/www/ganglia/addons/job_monarch/templates/search.tpl"45 "/ var/www/ganglia/addons/job_monarch/templates/footer.tpl"46 "/ var/www/ganglia/addons/job_monarch/templates/header.tpl"47 "/ var/www/ganglia/addons/job_monarch/templates/host_view.tpl"48 "/ var/www/ganglia/addons/job_monarch/templates/index.tpl"49 %config "/ var/www/ganglia/addons/job_monarch/conf.php"50 "/ var/www/ganglia/addons/job_monarch/search.php"51 "/ var/www/ganglia/addons/job_monarch/libtoga.php"52 "/ var/www/ganglia/addons/job_monarch/version.php"53 "/ var/www/ganglia/addons/job_monarch/cal.gif"54 "/ var/www/ganglia/addons/job_monarch/document_archive.jpg"55 "/ var/www/ganglia/addons/job_monarch/graph.php"56 "/ var/www/ganglia/addons/job_monarch/header.php"57 "/ var/www/ganglia/addons/job_monarch/host_view.php"58 "/ var/www/ganglia/addons/job_monarch/image.php"59 "/ var/www/ganglia/addons/job_monarch/libtoga.js"60 "/ var/www/ganglia/addons/job_monarch/logo_ned.gif"61 "/ var/www/ganglia/addons/job_monarch/next.gif"62 "/ var/www/ganglia/addons/job_monarch/prev.gif"63 "/ var/www/ganglia/addons/job_monarch/redcross.jpg"64 "/ var/www/ganglia/addons/job_monarch/ts_picker.js"65 "/ var/www/ganglia/addons/job_monarch/ts_validatetime.js"66 "/ var/www/ganglia/addons/job_monarch/footer.php"67 "/ var/www/ganglia/addons/job_monarch/styles.css"68 "/ var/www/ganglia/addons/job_monarch/index.php"69 "/ var/www/ganglia/addons/job_monarch/overview.php"70 "/ var/www/ganglia/addons/job_monarch/jobmonarch.gif"71 "/ var/www/ganglia/templates/job_monarch/images/logo.jpg"35 %dir "/usr/share/ganglia/templates/job_monarch/" 36 %dir "/usr/share/ganglia/templates/job_monarch/images/" 37 "/usr/share/ganglia/templates/job_monarch/cluster_extra.tpl" 38 "/usr/share/ganglia/templates/job_monarch/host_extra.tpl" 39 %dir "/usr/share/ganglia/addons/job_monarch/" 40 %dir "/usr/share/ganglia/addons/job_monarch/clusterconf/" 41 "/usr/share/ganglia/addons/job_monarch/clusterconf/example.php" 42 %dir "/usr/share/ganglia/addons/job_monarch/templates/" 43 "/usr/share/ganglia/addons/job_monarch/templates/overview.tpl" 44 "/usr/share/ganglia/addons/job_monarch/templates/search.tpl" 45 "/usr/share/ganglia/addons/job_monarch/templates/footer.tpl" 46 "/usr/share/ganglia/addons/job_monarch/templates/header.tpl" 47 "/usr/share/ganglia/addons/job_monarch/templates/host_view.tpl" 48 "/usr/share/ganglia/addons/job_monarch/templates/index.tpl" 49 %config "/usr/share/ganglia/addons/job_monarch/conf.php" 50 "/usr/share/ganglia/addons/job_monarch/search.php" 51 "/usr/share/ganglia/addons/job_monarch/libtoga.php" 52 "/usr/share/ganglia/addons/job_monarch/version.php" 53 "/usr/share/ganglia/addons/job_monarch/cal.gif" 54 "/usr/share/ganglia/addons/job_monarch/document_archive.jpg" 55 "/usr/share/ganglia/addons/job_monarch/graph.php" 56 "/usr/share/ganglia/addons/job_monarch/header.php" 57 "/usr/share/ganglia/addons/job_monarch/host_view.php" 58 "/usr/share/ganglia/addons/job_monarch/image.php" 59 "/usr/share/ganglia/addons/job_monarch/libtoga.js" 60 "/usr/share/ganglia/addons/job_monarch/logo_ned.gif" 61 "/usr/share/ganglia/addons/job_monarch/next.gif" 62 "/usr/share/ganglia/addons/job_monarch/prev.gif" 63 "/usr/share/ganglia/addons/job_monarch/redcross.jpg" 64 "/usr/share/ganglia/addons/job_monarch/ts_picker.js" 65 "/usr/share/ganglia/addons/job_monarch/ts_validatetime.js" 66 "/usr/share/ganglia/addons/job_monarch/footer.php" 67 "/usr/share/ganglia/addons/job_monarch/styles.css" 68 "/usr/share/ganglia/addons/job_monarch/index.php" 69 "/usr/share/ganglia/addons/job_monarch/overview.php" 70 "/usr/share/ganglia/addons/job_monarch/jobmonarch.gif" 71 "/usr/share/ganglia/templates/job_monarch/images/logo.jpg" -
pkg/rpm/jobmonarch-jobarchived.spec
3 3 Version: 4 4 Release: 5 5 Summary: Job Archiving Daemon 6 License: see /usr/share/doc/jobmonarch-jobarchived/copyright7 Distribution: Debian8 Group: Converted/misc6 License: GPL 7 Distribution: Fedora 8 Group: Applications/System 9 9 10 10 %define _rpmdir ../ 11 11 %define _rpmfilename %%{NAME}-%%{VERSION}-%%{RELEASE}.%%{ARCH}.rpm … … 19 19 if [ -x /etc/init.d/jobarchived ] 20 20 then 21 21 22 chkconfig --add jobarchived 22 23 chkconfig jobarchived on 23 24 24 25 fi … … 40 41 41 42 /etc/init.d/jobarchived stop 42 43 chkconfig jobarchived off 44 chkconfig --del jobarchived 43 45 44 46 45 47 %description -
INSTALL
1 1 DESCRIPTION 2 2 =========== 3 3 4 Job Monarch is a set of tools to monitor and optionally archive (batch)job information. 4 Job Monarch is a set of tools to monitor and optionally archive 5 (batch)job information. 5 6 6 It is a addon for the Ganglia monitoring system and plugs in to a existing Ganglia setup. 7 It is a addon for the Ganglia monitoring system and plugs in to a 8 existing Ganglia setup. 7 9 8 To view a operational setup with Job Monarch, have a look here: http://ganglia.sara.nl/ 10 To view a operational setup with Job Monarch, have a look here: 11 http://ganglia.sara.nl/ 9 12 10 13 11 Job Monarch stands for 'Job Monitoring and Archiving' tool and consists of three (3) components: 14 Job Monarch stands for 'Job Monitoring and Archiving' tool and 15 consists of three (3) components: 12 16 13 17 * jobmond 14 18 15 19 The Job Monitoring Daemon. 16 20 17 Gathers PBS/Torque batch statistics on jobs/nodes and submits them into18 Ganglia's XML stream.21 Gathers PBS/Torque or SGE batch statistics on 22 jobs/nodes and submits them into Ganglia's XML stream. 19 23 20 Through this daemon, users are able to view the PBS/Torque batch system and the 21 jobs/nodes that are in it (be it either running or queued). 24 Through this daemon, users are able to view the batch 25 system and the jobs/nodes that are in it (be it either 26 running or queued). 22 27 23 28 * jobarchived (optionally) 24 29 25 30 The Job Archiving Daemon. 26 31 27 Listens to Ganglia's XML stream and archives the job and node statistics.28 It stores the job statistics in a Postgres SQL database and the node statistics29 in RRD files.32 Listens to Ganglia's XML stream and archives the job and node 33 statistics. It stores the job statistics in a Postgres SQL 34 database and the node statistics in RRD files. 30 35 31 Through this daemon, users are able to lookup a old/finished job32 and view all it's statistics.36 Through this daemon, users are able to lookup a old/finished 37 job and view all it's statistics. 33 38 34 Optionally: You can either choose to use this daemon if your users have use for it. 35 As it can be a heavy application to run and not everyone may have a need for it. 39 Optionally: You can either choose to use this daemon if your 40 users have use for it. 41 As it can be a heavy application to run and not everyone may 42 have a need for it. 36 43 37 - Multithreaded: Will not miss any data regardless of (slow) storage 44 - Multithreaded: Will not miss any data regardless of 45 (slow) storage 38 46 39 47 - Staged writing: Spread load over bigger time periods 40 48 41 - High precision RRDs: Allow for zooming on old periods with large precision 49 - High precision RRDs: Allow for zooming on old periods with 50 large precision 42 51 43 - Timeperiod RRDs: Allow for smaller number of files while still keeping advantage 52 - Timeperiod RRDs: Allow for smaller number of files 53 while still keeping advantage 44 54 of small disk space 45 55 46 56 * web 47 57 48 58 The Job Monarch web interface. 49 59 50 This interfaces with the jobmond data and (optionally) the jobarchived and presents the51 data and graphs.60 This interfaces with the jobmond data and (optionally) the 61 jobarchived and presents the data and graphs. 52 62 53 It does this in a similar layout/setup as Ganglia itself, so the navigation and usage is intuitive. 63 It does this in a similar layout/setup as Ganglia itself, so 64 the navigation and usage is intuitive. 54 65 55 - Graphical usage: Displays graphical cluster overview so you can see the cluster (job) state 56 in one view/image and additional pie chart with relevant information on your 57 current view 66 - Graphical usage: Displays graphical cluster overview so 67 you can see the cluster (job) state 68 in one view/image and additional pie 69 chart with relevant information on 70 your current view 58 71 59 - Filters: Ability to filter output to limit information displayed (usefull for those 60 clusters with 500+ jobs). This also filters the graphical overview images output 61 and pie chart so you only see the filter relevant data 72 - Filters: Ability to filter output to limit 73 information displayed (usefull for 74 those clusters with 500+ jobs). This 75 also filters the graphical overview 76 images output and pie chart so you 77 only see the filter relevant data 62 78 63 - Archive: When enabling jobarchived, users can go back as far as recorded in the database 64 or archived RRDs to find out what happened to a crashed or old job 79 - Archive: When enabling jobarchived, users can 80 go back as far as recorded in the 81 database or archived RRDs to find out 82 what happened to a crashed or old job 65 83 66 - Zoom ability: Users can zoom into a timepriod as small as the smallest grain of the RRDS 67 (typically up to 10 seconds) when a jobarchived is present 84 - Zoom ability: Users can zoom into a timepriod as 85 small as the smallest grain of the 86 RRDS (typically up to 10 seconds) when 87 a jobarchived is present 68 88 69 89 REQUIREMENTS 70 90 ============ … … 75 95 76 96 jobmond: 77 97 78 - pbs_python v2.8.2 or higher 98 - pbs_python v2.8.2 or higher (if using PBS) 79 99 https://subtrac.sara.nl/oss/pbs_python/ 80 100 81 101 - gmond v3.0.1 or higher 82 102 http://www.ganglia.info/ 83 103 104 - For SGE, this is only tested on version 6.0u8 105 upwards. It may need changes for 6.1 106 84 107 jobarchived: 85 108 86 109 - Postgres SQL v7.xx … … 116 139 INSTALLATION 117 140 ============ 118 141 119 Prior to installing the software make sure you meet the necessary requirements as120 mentioned above.142 Prior to installing the software make sure you meet the necessary 143 requirements as mentioned above. 121 144 122 NOTE: You can choose to install to other path/directories if your setup is different. 145 NOTE: You can choose to install to other path/directories if your 146 setup is different. 123 147 124 148 * jobmond 125 149 … … 195 219 196 220 ( see config comments for syntax and explanation ) 197 221 222 3. If using SGE, make sure that qstat(1) is on your 223 path, possibly by editing /etc/sysconfig/jobmond 224 (RedHat) or /etc/default/jobmond (Debian). 225 198 226 START 199 227 ===== 200 228 201 229 * jobmond 202 230 203 231 The Job Monitor has to be run on a machine that is allowed to 204 query the PBS/Torque server.205 Make sure that if you have 'acl_hosts' enabled on your PBS/Torque206 server thatjobmond's machine is in it.232 query the batch system (PBS/Torque server or SGE qmaster). 233 Make sure that if you have 'acl_hosts' enabled on your 234 PBS/Torque server, jobmond's machine is in it. 207 235 208 236 1. Start the Job Monitor: 209 237 … … 218 246 * web 219 247 220 248 Doesn't require you to (re)start anything. 221 ( make sure the Postgres database is running though ) 249 ( make sure the Postgres database is running though, 250 if using jobarchived ) 222 251 223 252 CONTACT 224 253 ======= -
jobmond/jobmond.py
3 3 # This file is part of Jobmonarch 4 4 # 5 5 # Copyright (C) 2006-2007 Ramon Bastiaans 6 # Copyright (C) 2007 Dave Love (SGE code) 6 7 # 7 8 # Jobmonarch is free software; you can redistribute it and/or modify 8 9 # it under the terms of the GNU General Public License as published by … … 22 23 # 23 24 24 25 import sys, getopt, ConfigParser, time, os, socket, string, re 25 import xdrlib, socket, syslog, xml, xml.sax 26 from xml.sax import saxutils, make_parser 27 from xml.sax import make_parser 26 import xdrlib, syslog, xml, xml.sax 28 27 from xml.sax.handler import feature_namespaces 29 28 30 29 VERSION='0.3' … … 251 250 sys.exit( 1 ) 252 251 else: 253 252 254 debug_msg( 0, "ERROR: GMETRIC_TARGET not set: intern el Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" )253 debug_msg( 0, "ERROR: GMETRIC_TARGET not set: internal Gmetric handling aborted. Failing back to DEPRECATED use of gmond.conf/gmetric binary. This will slow down jobmond significantly!" ) 255 254 256 255 return True 257 256 257 def fqdn_parts (fqdn): 258 """Return pair of host and domain for fully-qualified domain name arg.""" 259 parts = fqdn.split (".") 260 return (parts[0], string.join(parts[1:], ".")) 261 258 262 METRIC_MAX_VAL_LEN = 900 259 263 260 264 class DataProcessor: … … 382 386 383 387 except NameError: 384 388 385 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (om mitting)' )389 debug_msg( 10, 'Assuming /etc/gmond.conf for gmetric cmd (omitting)' ) 386 390 387 391 cmd = cmd + ' -n' + str( metricname )+ ' -v"' + str( metricval )+ '" -t' + str( valtype ) + ' -d' + str( self.dmax ) 388 392 … … 420 424 421 425 print '\t%s = %s' %( name, val ) 422 426 427 def getAttr( self, attrs, name ): 428 429 """Return certain attribute from dictionary, if exists""" 430 431 if attrs.has_key( name ): 432 433 return attrs[ name ] 434 else: 435 return '' 436 437 def jobDataChanged( self, jobs, job_id, attrs ): 438 439 """Check if job with attrs and job_id in jobs has changed""" 440 441 if jobs.has_key( job_id ): 442 443 oldData = jobs[ job_id ] 444 else: 445 return 1 446 447 for name, val in attrs.items(): 448 449 if oldData.has_key( name ): 450 451 if oldData[ name ] != attrs[ name ]: 452 453 return 1 454 455 else: 456 return 1 457 458 return 0 459 460 def submitJobData( self ): 461 462 """Submit job info list""" 463 464 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 465 466 running_jobs = 0 467 queued_jobs = 0 468 469 # Count how many running/queued jobs we found 470 # 471 for jobid, jobattrs in self.jobs.items(): 472 473 if jobattrs[ 'status' ] == 'Q': 474 475 queued_jobs += 1 476 477 elif jobattrs[ 'status' ] == 'R': 478 479 running_jobs += 1 480 481 # Report running/queued jobs as seperate metric for a nice RRD graph 482 # 483 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' ) 484 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' ) 485 486 # Now let's spread the knowledge 487 # 488 for jobid, jobattrs in self.jobs.items(): 489 490 # Make gmetric values for each job: respect max gmetric value length 491 # 492 gmetric_val = self.compileGmetricVal( jobid, jobattrs ) 493 metric_increment = 0 494 495 # If we have more job info than max gmetric value length allows, split it up 496 # amongst multiple metrics 497 # 498 for val in gmetric_val: 499 500 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val ) 501 502 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric 503 # 504 metric_increment = metric_increment + 1 505 506 def compileGmetricVal( self, jobid, jobattrs ): 507 508 """Create a val string for gmetric of jobinfo""" 509 510 gval_lists = [ ] 511 val_list = { } 512 513 for val_name, val_value in jobattrs.items(): 514 515 # These are our own metric names, i.e.: status, start_timestamp, etc 516 # 517 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys()) 518 519 # These are their corresponding values 520 # 521 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values()) 522 523 if val_name == 'nodes' and jobattrs['status'] == 'R': 524 525 node_str = None 526 527 for node in val_value: 528 529 if node_str: 530 531 node_str = node_str + ';' + node 532 else: 533 node_str = node 534 535 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 536 # 537 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN: 538 539 # It's too big, we need to make a new gmetric for the additional info 540 # 541 val_list[ val_name ] = node_str 542 543 gval_lists.append( val_list ) 544 545 val_list = { } 546 node_str = None 547 548 val_list[ val_name ] = node_str 549 550 gval_lists.append( val_list ) 551 552 val_list = { } 553 554 elif val_value != '': 555 556 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN 557 # 558 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN: 559 560 # It's too big, we need to make a new gmetric for the additional info 561 # 562 gval_lists.append( val_list ) 563 564 val_list = { } 565 566 val_list[ val_name ] = val_value 567 568 if len( val_list ) > 0: 569 570 gval_lists.append( val_list ) 571 572 str_list = [ ] 573 574 # Now append the value names and values together, i.e.: stop_timestamp=value, etc 575 # 576 for val_list in gval_lists: 577 578 my_val_str = None 579 580 for val_name, val_value in val_list.items(): 581 582 if my_val_str: 583 584 my_val_str = my_val_str + ' ' + val_name + '=' + val_value 585 else: 586 my_val_str = val_name + '=' + val_value 587 588 str_list.append( my_val_str ) 589 590 return str_list 591 423 592 def daemon( self ): 424 593 425 594 """Run as daemon forever""" … … 467 636 self.submitJobData() 468 637 time.sleep( BATCH_POLL_INTERVAL ) 469 638 470 class SgeQstatXMLParser(xml.sax.handler.ContentHandler): 639 # SGE code by Dave Love <fx@gnu.org>. Tested with SGE 6.0u8 and 6.0u11. 640 # Probably needs modification for SGE 6.1. See also the fixmes. 471 641 472 """Babu Sundaram's experimental SGE qstat XML parser""" 642 class NoJobs (Exception): 643 """Exception raised by empty job list in qstat output.""" 644 pass 473 645 474 def __init__(self, qstatinxml): 646 class SgeQstatXMLParser(xml.sax.handler.ContentHandler): 647 """SAX handler for XML output from Sun Grid Engine's `qstat'.""" 475 648 476 self.qstatfile = qstatinxml477 self. attribs = {}478 self. value = ''479 self.job ID = ''480 self. currentJobInfo = ''481 self. job_list = []482 self. EOFFlag = 0483 self.jobinfoCount = 0649 def __init__(self): 650 self.value = "" 651 self.joblist = [] 652 self.job = {} 653 self.queue = "" 654 self.in_joblist = False 655 self.lrequest = False 656 xml.sax.handler.ContentHandler.__init__(self) 484 657 658 # The structure of the output is as follows. Unfortunately 659 # it's voluminous, and probably doesn't scale to large 660 # clusters/queues. 485 661 486 def startElement(self, name, attrs): 662 # <detailed_job_info xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 663 # <djob_info> 664 # <qmaster_response> <!-- job --> 665 # ... 666 # <JB_ja_template> 667 # <ulong_sublist> 668 # ... <!-- start_time, state ... --> 669 # </ulong_sublist> 670 # </JB_ja_template> 671 # <JB_ja_tasks> 672 # <ulong_sublist> 673 # ... <!-- task info 674 # </ulong_sublist> 675 # ... 676 # </JB_ja_tasks> 677 # ... 678 # </qmaster_response> 679 # </djob_info> 680 # <messages> 681 # ... 487 682 488 if name == 'job_list': 489 self.currentJobInfo = 'Status=' + attrs.get('state', None) + ' ' 490 elif name == 'job_info': 491 self.job_list = [] 492 self.jobinfoCount += 1 683 # NB. We might treat each task as a separate job, like 684 # straight qstat output, but the web interface expects jobs to 685 # be identified by integers, not, say, <job number>.<task>. 493 686 494 def characters(self, ch): 687 # So, I lied. If the job list is empty, we get invalid XML 688 # like this, which we need to defend against: 495 689 496 self.value = self.value + ch 690 # <unknown_jobs xmlns:xsd="http://www.w3.org/2001/XMLSchema"> 691 # <> 692 # <ST_name>*</ST_name> 693 # </> 694 # </unknown_jobs> 497 695 498 def endElement(self, name): 696 def startElement(self, name, attrs): 697 self.value = "" 698 if name == "djob_info": # job list 699 self.in_joblist = True 700 elif name == "qmaster_response" and self.in_joblist: # job 701 self.job = {"job_state": "U", "slots": 0, 702 "nodes": [], "queued_timestamp": "", 703 "queued_timestamp": "", "queue": "", 704 "ppn": "0", "RN_max": 0, 705 # fixme in endElement 706 "requested_memory": 0, "requested_time": 0 707 } 708 self.joblist.append(self.job) 709 elif name == "qstat_l_requests": # resource request 710 self.lrequest = True 711 elif name == "unknown_jobs": 712 raise NoJobs 499 713 500 if len(self.value.strip()) > 0 : 714 def characters(self, ch): 715 self.value += ch 501 716 502 self.currentJobInfo += name + '=' + self.value.strip() + ' ' 503 elif name != 'job_list': 717 def endElement(self, name): 718 """Snarf job elements contents into job dictionary. 719 Translate keys if appropriate.""" 504 720 505 self.currentJobInfo += name + '=Unknown ' 721 name_trans = { 722 "JB_job_number": "number", 723 "JB_job_name": "name", "JB_owner": "owner", 724 "queue_name": "queue", "JAT_start_time": "start_timestamp", 725 "JB_submission_time": "queued_timestamp" 726 } 727 value = self.value 506 728 507 if name == 'JB_job_number': 729 if name == "djob_info": 730 self.in_joblist = False 731 self.job = {} 732 elif name == "JAT_master_queue": 733 self.job["queue"] = value.split("@")[0] 734 elif name == "JG_qhostname": 735 if not (value in self.job["nodes"]): 736 self.job["nodes"].append(value) 737 elif name == "JG_slots": # slots in use 738 self.job["slots"] += int(value) 739 elif name == "RN_max": # requested slots (tasks or parallel) 740 self.job["RN_max"] = max (self.job["RN_max"], 741 int(value)) 742 elif name == "JAT_state": # job state (bitwise or) 743 value = int (value) 744 # Status values from sge_jobL.h 745 #define JIDLE 0x00000000 746 #define JHELD 0x00000010 747 #define JMIGRATING 0x00000020 748 #define JQUEUED 0x00000040 749 #define JRUNNING 0x00000080 750 #define JSUSPENDED 0x00000100 751 #define JTRANSFERING 0x00000200 752 #define JDELETED 0x00000400 753 #define JWAITING 0x00000800 754 #define JEXITING 0x00001000 755 #define JWRITTEN 0x00002000 756 #define JSUSPENDED_ON_THRESHOLD 0x00010000 757 #define JFINISHED 0x00010000 758 if value & 0x80: 759 self.job["status"] = "R" 760 elif value & 0x40: 761 self.job["status"] = "Q" 762 else: 763 self.job["status"] = "O" # `other' 764 elif name == "CE_name" and self.lrequest and self.value in \ 765 ("h_cpu", "s_cpu", "cpu", "h_core", "s_core"): 766 # We're in a container for an interesting resource 767 # request; record which type. 768 self.lrequest = self.value 769 elif name == "CE_doubleval" and self.lrequest: 770 # if we're in a container for an interesting 771 # resource request, use the maxmimum of the hard 772 # and soft requests to record the requested CPU 773 # or core. Fixme: I'm not sure if this logic is 774 # right. 775 if self.lrequest in ("h_core", "s_core"): 776 self.job["requested_memory"] = \ 777 max (float (value), 778 self.job["requested_memory"]) 779 # Fixme: Check what cpu means, c.f [hs]_cpu. 780 elif self.lrequest in ("h_cpu", "s_cpu", "cpu"): 781 self.job["requested_time"] = \ 782 max (float (value), 783 self.job["requested_time"]) 784 elif name == "qstat_l_requests": 785 self.lrequest = False 786 elif self.job and self.in_joblist: 787 if name in name_trans: 788 name = name_trans[name] 789 self.job[name] = value 508 790 509 self.jobID = self.value.strip() 510 self.job_list.append(self.jobID) 791 # Abstracted from PBS original. 792 # Fixme: Is it worth (or appropriate for PBS) sorting the result? 793 def do_nodelist (nodes): 794 """Translate node list as appropriate.""" 795 nodeslist = [ ] 796 my_domain = fqdn_parts(socket.getfqdn())[1] 797 for node in nodes: 798 host = node.split( '/' )[0] # not relevant for SGE 799 h, host_domain = fqdn_parts(host) 800 if host_domain == my_domain: 801 host = h 802 if nodeslist.count( host ) == 0: 803 for translate_pattern in BATCH_HOST_TRANSLATE: 804 if translate_pattern.find( '/' ) != -1: 805 translate_orig = \ 806 translate_pattern.split( '/' )[1] 807 translate_new = \ 808 translate_pattern.split( '/' )[2] 809 host = re.sub( translate_orig, 810 translate_new, host ) 811 if not host in nodeslist: 812 nodeslist.append( host ) 813 return nodeslist 511 814 512 if name == 'job_list':513 514 if self.attribs.has_key(self.jobID) == False:515 self.attribs[self.jobID] = self.currentJobInfo516 elif self.attribs.has_key(self.jobID) and self.attribs[self.jobID] != self.currentJobInfo:517 self.attribs[self.jobID] = self.currentJobInfo518 self.currentJobInfo = ''519 self.jobID = ''520 521 elif name == 'job_info' and self.jobinfoCount == 2:522 523 deljobs = []524 for id in self.attribs:525 try:526 self.job_list.index(str(id))527 except ValueError:528 deljobs.append(id)529 for i in deljobs:530 del self.attribs[i]531 deljobs = []532 self.jobinfoCount = 0533 534 self.value = ''535 536 815 class SgeDataGatherer(DataGatherer): 537 816 538 jobs = { } 539 SGE_QSTAT_XML_FILE = '/tmp/.jobmonarch.sge.qstat' 817 jobs = {} 540 818 541 819 def __init__( self ): 542 """Setup appropriate variables""" 543 544 self.jobs = { } 820 self.jobs = {} 545 821 self.timeoffset = 0 546 822 self.dp = DataProcessor() 547 self.initSgeJobInfo()548 823 549 def initSgeJobInfo( self ): 550 """This is outside the scope of DRMAA; Get the current jobs in SGE""" 551 """This is a hack because we cant get info about jobs beyond""" 552 """those in the current DRMAA session""" 553 554 self.qstatparser = SgeQstatXMLParser( self.SGE_QSTAT_XML_FILE ) 555 556 # Obtain the qstat information from SGE in XML format 557 # This would change to DRMAA-specific calls from 6.0u9 558 559 def getJobData(self): 824 def getJobData( self ): 560 825 """Gather all data on current jobs in SGE""" 561 826 562 # Get the information about the current jobs in the SGE queue 563 info = os.popen("qstat -ext -xml").readlines() 564 f = open(self.SGE_QSTAT_XML_FILE,'w') 565 for lines in info: 566 f.write(lines) 567 f.close() 827 import popen2 568 828 569 # Parse the input 570 f = open(self.qstatparser.qstatfile, 'r') 571 xml.sax.parse(f, self.qstatparser) 572 f.close() 573 829 self.cur_time = 0 830 queues = "" 831 if QUEUE: # only for specific queues 832 # Fixme: assumes queue names don't contain single 833 # quote or comma. Don't know what the SGE rules are. 834 queues = " -q '" + string.join (QUEUE, ",") + "'" 835 # Note the comment in SgeQstatXMLParser about scaling with 836 # this method of getting data. I haven't found better one. 837 # Output with args `-xml -ext -f -r' is easier to parse 838 # in some ways, harder in others, but it doesn't provide 839 # the submission time, at least. 840 piping = popen2.Popen3("qstat -u '*' -j '*' -xml" + queues, 841 True) 842 qstatparser = SgeQstatXMLParser() 843 parse_err = 0 844 try: 845 xml.sax.parse(piping.fromchild, qstatparser) 846 except NoJobs: 847 pass 848 except: 849 parse_err = 1 850 if piping.wait(): 851 debug_msg(10, 852 "qstat error, skipping until next polling interval: " 853 + piping.childerr.readline()) 854 return None 855 elif parse_err: 856 debug_msg(10, "Bad XML output from qstat"()) 857 exit (1) 858 for f in piping.fromchild, piping.tochild, piping.childerr: 859 f.close() 574 860 self.cur_time = time.time() 861 jobs_processed = [] 862 for job in qstatparser.joblist: 863 job_id = job["number"] 864 if job["status"] in [ 'Q', 'R' ]: 865 jobs_processed.append(job_id) 866 if job["status"] == "R": 867 job["nodes"] = do_nodelist (job["nodes"]) 868 # Fixme: Is this right? 869 job["ppn"] = float(job["slots"]) / \ 870 len(job["nodes"]) 871 if DETECT_TIME_DIFFS: 872 # If a job start is later than our 873 # current date, that must mean 874 # the SGE server's time is later 875 # than our local time. 876 start_timestamp = \ 877 int (job["start_timestamp"]) 878 if start_timestamp > \ 879 int(self.cur_time) + \ 880 int(self.timeoffset): 575 881 576 return self.qstatparser.attribs 882 self.timeoffset = \ 883 start_timestamp - \ 884 int(self.cur_time) 885 else: 886 # fixme: Note sure what this should be: 887 job["ppn"] = job["RN_max"] 888 job["nodes"] = "1" 577 889 578 def submitJobData(self): 579 """Submit job info list""" 890 myAttrs = {} 891 for attr in ["name", "queue", "owner", 892 "requested_time", "status", 893 "requested_memory", "ppn", 894 "start_timestamp", "queued_timestamp"]: 895 myAttrs[attr] = str(job[attr]) 896 myAttrs["nodes"] = job["nodes"] 897 myAttrs["reported"] = str(int(self.cur_time) + \ 898 int(self.timeoffset)) 899 myAttrs["domain"] = fqdn_parts(socket.getfqdn())[1] 900 myAttrs["poll_interval"] = str(BATCH_POLL_INTERVAL) 580 901 581 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) ) 582 # Now let's spread the knowledge 583 # 584 metric_increment = 0 585 for jobid, jobattrs in self.qstatparser.attribs.items(): 902 if self.jobDataChanged(self.jobs, job_id, myAttrs) \ 903 and myAttrs["status"] in ["R", "Q"]: 904 self.jobs[job_id] = myAttrs 905 for id, attrs in self.jobs.items(): 906 if id not in jobs_processed: 907 del self.jobs[id] 586 908 587 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), jobattrs)588 589 909 class PbsDataGatherer( DataGatherer ): 590 910 591 911 """This is the DataGatherer for PBS and Torque""" … … 612 932 else: 613 933 self.pq = PBSQuery() 614 934 615 def getAttr( self, attrs, name ):616 617 """Return certain attribute from dictionary, if exists"""618 619 if attrs.has_key( name ):620 621 return attrs[ name ]622 else:623 return ''624 625 def jobDataChanged( self, jobs, job_id, attrs ):626 627 """Check if job with attrs and job_id in jobs has changed"""628 629 if jobs.has_key( job_id ):630 631 oldData = jobs[ job_id ]632 else:633 return 1634 635 for name, val in attrs.items():636 637 if oldData.has_key( name ):638 639 if oldData[ name ] != attrs[ name ]:640 641 return 1642 643 else:644 return 1645 646 return 0647 648 935 def getJobData( self ): 649 936 650 937 """Gather all data on current jobs in Torque""" … … 663 950 664 951 jobs_processed = [ ] 665 952 666 my_domain = string.join( socket.getfqdn().split( '.' )[1:], '.' )667 668 953 for name, attrs in joblist.items(): 669 954 670 955 job_id = name.split( '.' )[0] … … 709 994 start_timestamp = self.getAttr( attrs, 'mtime' ) 710 995 nodes = self.getAttr( attrs, 'exec_host' ).split( '+' ) 711 996 712 nodeslist = [ ]997 nodeslist = do_nodelist( nodes ) 713 998 714 for node in nodes:715 716 host = node.split( '/' )[0]717 718 host_domain = string.join( host.split( '.' )[1:], '.' )719 720 if host_domain == my_domain:721 722 host = host.split( '.' )[0]723 724 if nodeslist.count( host ) == 0:725 726 for translate_pattern in BATCH_HOST_TRANSLATE:727 728 if translate_pattern.find( '/' ) != -1:729 730 translate_orig = translate_pattern.split( '/' )[1]731 translate_new = translate_pattern.split( '/' )[2]732 733 host = re.sub( translate_orig, translate_new, host )734 735 if not host in nodeslist:736 737 nodeslist.append( host )738 739 999 if DETECT_TIME_DIFFS: 740 1000 741 1001 # If a job start if later than our current date, … … 825 1085 myAttrs[ 'queued_timestamp' ] = str( queued_timestamp ) 826 1086 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 827 1087 myAttrs[ 'nodes' ] = nodeslist 828 myAttrs[ 'domain' ] = string.join( socket.getfqdn().split( '.' )[1:], '.' )1088 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 829 1089 myAttrs[ 'poll_interval' ] = str( BATCH_POLL_INTERVAL ) 830 1090 831 1091 if self.jobDataChanged( self.jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: … … 840 1100 # 841 1101 del self.jobs[ id ] 842 1102 843 def submitJobData( self ):844 845 """Submit job info list"""846 847 self.dp.multicastGmetric( 'MONARCH-HEARTBEAT', str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) )848 849 running_jobs = 0850 queued_jobs = 0851 852 # Count how many running/queued jobs we found853 #854 for jobid, jobattrs in self.jobs.items():855 856 if jobattrs[ 'status' ] == 'Q':857 858 queued_jobs += 1859 860 elif jobattrs[ 'status' ] == 'R':861 862 running_jobs += 1863 864 # Report running/queued jobs as seperate metric for a nice RRD graph865 #866 self.dp.multicastGmetric( 'MONARCH-RJ', str( running_jobs ), 'uint32', 'jobs' )867 self.dp.multicastGmetric( 'MONARCH-QJ', str( queued_jobs ), 'uint32', 'jobs' )868 869 # Now let's spread the knowledge870 #871 for jobid, jobattrs in self.jobs.items():872 873 # Make gmetric values for each job: respect max gmetric value length874 #875 gmetric_val = self.compileGmetricVal( jobid, jobattrs )876 metric_increment = 0877 878 # If we have more job info than max gmetric value length allows, split it up879 # amongst multiple metrics880 #881 for val in gmetric_val:882 883 self.dp.multicastGmetric( 'MONARCH-JOB-' + jobid + '-' + str(metric_increment), val )884 885 # Increase follow number if this jobinfo is split up amongst more than 1 gmetric886 #887 metric_increment = metric_increment + 1888 889 def compileGmetricVal( self, jobid, jobattrs ):890 891 """Create a val string for gmetric of jobinfo"""892 893 gval_lists = [ ]894 mystr = None895 val_list = { }896 897 for val_name, val_value in jobattrs.items():898 899 # These are our own metric names, i.e.: status, start_timestamp, etc900 #901 val_list_names_len = len( string.join( val_list.keys() ) ) + len(val_list.keys())902 903 # These are their corresponding values904 #905 val_list_vals_len = len( string.join( val_list.values() ) ) + len(val_list.values())906 907 if val_name == 'nodes' and jobattrs['status'] == 'R':908 909 node_str = None910 911 for node in val_value:912 913 if node_str:914 915 node_str = node_str + ';' + node916 else:917 node_str = node918 919 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN920 #921 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(node_str) ) > METRIC_MAX_VAL_LEN:922 923 # It's too big, we need to make a new gmetric for the additional info924 #925 val_list[ val_name ] = node_str926 927 gval_lists.append( val_list )928 929 val_list = { }930 node_str = None931 932 val_list[ val_name ] = node_str933 934 gval_lists.append( val_list )935 936 val_list = { }937 938 elif val_value != '':939 940 # Make sure if we add this new info, that the total metric's value length does not exceed METRIC_MAX_VAL_LEN941 #942 if (val_list_names_len + len(val_name) ) + (val_list_vals_len + len(str(val_value)) ) > METRIC_MAX_VAL_LEN:943 944 # It's too big, we need to make a new gmetric for the additional info945 #946 gval_lists.append( val_list )947 948 val_list = { }949 950 val_list[ val_name ] = val_value951 952 if len( val_list ) > 0:953 954 gval_lists.append( val_list )955 956 str_list = [ ]957 958 # Now append the value names and values together, i.e.: stop_timestamp=value, etc959 #960 for val_list in gval_lists:961 962 my_val_str = None963 964 for val_name, val_value in val_list.items():965 966 if my_val_str:967 968 my_val_str = my_val_str + ' ' + val_name + '=' + val_value969 else:970 my_val_str = val_name + '=' + val_value971 972 str_list.append( my_val_str )973 974 return str_list975 976 1103 # 977 1104 # Gmetric by Nick Galbreath - nickg(a.t)modp(d.o.t)com 978 1105 # Version 1.0 - 21-April2-2007 … … 1134 1261 1135 1262 elif BATCH_API == 'sge': 1136 1263 1137 debug_msg( 0, "FATAL ERROR: BATCH_API 'sge' implementation is currently broken, check future releases" ) 1264 # Tested with SGE 6.0u11. 1265 # debug_msg( 0, "FATAL ERROR: BATCH_API 'sge' implementation is currently broken, check future releases" ) 1138 1266 1139 sys.exit( 1 )1267 # sys.exit( 1 ) 1140 1268 1141 1269 gather = SgeDataGatherer() 1142 1270 -
jobmond/jobmond.conf
20 20 BATCH_SERVER : localhost 21 21 22 22 # Which queue(s) to report jobs of 23 # (optional and only supported for pbs)23 # (optional) 24 24 # 25 25 #QUEUE : long, short 26 26 -
web/addons/job_monarch/templates/overview.tpl
1 <BR><BR> 1 <P> 2 All tasks of parallel and array jobs appear as a single ‘job’. 3 <BR></P> 2 4 3 5 <CENTER> 4 6 <TABLE cellpadding="15"> … … 137 139 <BR> 138 140 139 141 <SCRIPT TYPE="text/javascript" SRC="libtoga.js"></SCRIPT> 142 <NOSCRIPT><P>[Sorting by column header requires JavaScript]<BR><BR></P></NOSCRIPT> 140 143 141 144 <INPUT TYPE="HIDDEN" NAME="sortby" VALUE="{sortby}"> 142 145 <INPUT TYPE="HIDDEN" NAME="sortorder" VALUE="{sortorder}">