- Timestamp:
- 03/13/08 17:23:30 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/jobmond/jobmond.py
r520 r524 300 300 gmetric_dest_ip = ganglia_cfg.getStr( 'udp_send_channel', 'host' ) 301 301 302 gmetric_dest_port = g cp.getStr( 'udp_send_channel', 'port' )302 gmetric_dest_port = ganglia_cfg.getStr( 'udp_send_channel', 'port' ) 303 303 304 304 if gmetric_dest_ip and gmetric_dest_port: … … 1054 1054 del self.jobs[id] 1055 1055 1056 # LSF code by Mahmoud Hanafi <hanafim@users.sourceforge.nt> 1057 # Requres LSFObject http://sourceforge.net/projects/lsfobject 1058 # 1059 class LsfDataGatherer(DataGatherer): 1060 """This is the DataGatherer for LSf""" 1061 1062 global lsfObject 1063 1064 def __init__( self ): 1065 self.jobs = { } 1066 self.timeoffset = 0 1067 self.dp = DataProcessor() 1068 self.initLsfQuery() 1069 1070 ######################## 1071 ## THIS IS TAKEN FROM 1072 ## http://bigbadcode.com/2007/04/04/count-the-duplicates-in-a-python-list/ 1073 from sets import Set 1074 # 1075 def _countDuplicatesInList(self,dupedList): 1076 uniqueSet = self.Set(item for item in dupedList) 1077 return [(item, dupedList.count(item)) for item in uniqueSet] 1078 # 1079 #lst = ['I1','I2','I1','I3','I4','I4','I7','I7','I7','I7','I7'] 1080 #print _countDuplicatesInList(lst) 1081 #[('I1', 2), ('I3', 1), ('I2', 1), ('I4', 2), ('I7', 5)] 1082 ######################## 1083 1084 def initLsfQuery( self ): 1085 self.pq = None 1086 self.pq = lsfObject.jobInfoEntObject() 1087 1088 def getAttr( self, attrs, name ): 1089 """Return certain attribute from dictionary, if exists""" 1090 if attrs.has_key( name ): 1091 return attrs[name] 1092 else: 1093 return '' 1094 1095 def getJobData( self, known_jobs="" ): 1096 """Gather all data on current jobs in LSF""" 1097 if len( known_jobs ) > 0: 1098 jobs = known_jobs 1099 else: 1100 jobs = { } 1101 joblist = {} 1102 joblist = self.pq.getJobInfo() 1103 nodelist = '' 1104 1105 self.cur_time = time.time() 1106 1107 jobs_processed = [ ] 1108 1109 for name, attrs in joblist.items(): 1110 job_id = str(name) 1111 jobs_processed.append( job_id ) 1112 name = self.getAttr( attrs, 'jobName' ) 1113 queue = self.getAttr( self.getAttr( attrs, 'submit') , 'queue' ) 1114 owner = self.getAttr( attrs, 'user' ) 1115 1116 ### THIS IS THE rLimit List index values 1117 #define LSF_RLIMIT_CPU 0 /* cpu time in milliseconds */ 1118 #define LSF_RLIMIT_FSIZE 1 /* maximum file size */ 1119 #define LSF_RLIMIT_DATA 2 /* data size */ 1120 #define LSF_RLIMIT_STACK 3 /* stack size */ 1121 #define LSF_RLIMIT_CORE 4 /* core file size */ 1122 #define LSF_RLIMIT_RSS 5 /* resident set size */ 1123 #define LSF_RLIMIT_NOFILE 6 /* open files */ 1124 #define LSF_RLIMIT_OPEN_MAX 7 /* (from HP-UX) */ 1125 #define LSF_RLIMIT_VMEM 8 /* maximum swap mem */ 1126 #define LSF_RLIMIT_SWAP 8 1127 #define LSF_RLIMIT_RUN 9 /* max wall-clock time limit */ 1128 #define LSF_RLIMIT_PROCESS 10 /* process number limit */ 1129 #define LSF_RLIMIT_THREAD 11 /* thread number limit (introduced in LSF6.0) */ 1130 #define LSF_RLIM_NLIMITS 12 /* number of resource limits */ 1131 1132 requested_time = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[9] 1133 if requested_time == -1: 1134 requested_time = "" 1135 requested_memory = self.getAttr( self.getAttr( attrs, 'submit') , 'rLimits' )[8] 1136 if requested_memory == -1: 1137 requested_memory = "" 1138 # This tries to get proc per node. We don't support this right now 1139 ppn = 0 #self.getAttr( self.getAttr( attrs, 'SubmitList') , 'numProessors' ) 1140 requested_cpus = self.getAttr( self.getAttr( attrs, 'submit') , 'numProcessors' ) 1141 if requested_cpus == None or requested_cpus == "": 1142 requested_cpus = 1 1143 1144 if QUEUE: 1145 for q in QUEUE: 1146 if q == queue: 1147 display_queue = 1 1148 break 1149 else: 1150 display_queue = 0 1151 continue 1152 if display_queue == 0: 1153 continue 1154 1155 runState = self.getAttr( attrs, 'status' ) 1156 if runState == 4: 1157 status = 'R' 1158 else: 1159 status = 'Q' 1160 queued_timestamp = self.getAttr( attrs, 'submitTime' ) 1161 1162 if status == 'R': 1163 start_timestamp = self.getAttr( attrs, 'startTime' ) 1164 nodesCpu = dict(self._countDuplicatesInList(self.getAttr( attrs, 'exHosts' ))) 1165 nodelist = nodesCpu.keys() 1166 1167 if DETECT_TIME_DIFFS: 1168 1169 # If a job start if later than our current date, 1170 # that must mean the Torque server's time is later 1171 # than our local time. 1172 1173 if int(start_timestamp) > int( int(self.cur_time) + int(self.timeoffset) ): 1174 1175 self.timeoffset = int( int(start_timestamp) - int(self.cur_time) ) 1176 1177 elif status == 'Q': 1178 start_timestamp = '' 1179 count_mynodes = 0 1180 numeric_node = 1 1181 nodelist = '' 1182 1183 myAttrs = { } 1184 if name == "": 1185 myAttrs['name'] = "none" 1186 else: 1187 myAttrs['name'] = name 1188 1189 myAttrs[ 'owner' ] = owner 1190 myAttrs[ 'requested_time' ] = str(requested_time) 1191 myAttrs[ 'requested_memory' ] = str(requested_memory) 1192 myAttrs[ 'requested_cpus' ] = str(requested_cpus) 1193 myAttrs[ 'ppn' ] = str( ppn ) 1194 myAttrs[ 'status' ] = status 1195 myAttrs[ 'start_timestamp' ] = str(start_timestamp) 1196 myAttrs[ 'queue' ] = str(queue) 1197 myAttrs[ 'queued_timestamp' ] = str(queued_timestamp) 1198 myAttrs[ 'reported' ] = str( int( int( self.cur_time ) + int( self.timeoffset ) ) ) 1199 myAttrs[ 'nodes' ] = do_nodelist( nodelist ) 1200 myAttrs[ 'domain' ] = fqdn_parts( socket.getfqdn() )[1] 1201 myAttrs[ 'poll_interval' ] = str(BATCH_POLL_INTERVAL) 1202 1203 if self.jobDataChanged( jobs, job_id, myAttrs ) and myAttrs['status'] in [ 'R', 'Q' ]: 1204 jobs[ job_id ] = myAttrs 1205 1206 #debug_msg( 10, printTime() + ' job %s state changed' %(job_id) ) 1207 1208 for id, attrs in jobs.items(): 1209 if id not in jobs_processed: 1210 # This one isn't there anymore; toedeledoki! 1211 # 1212 del jobs[ id ] 1213 self.jobs=jobs 1214 1215 1056 1216 class PbsDataGatherer( DataGatherer ): 1057 1217 1058 1218 """This is the DataGatherer for PBS and Torque""" 1059 1219 1060 global PBSQuery 1220 global PBSQuery, PBSError 1061 1221 1062 1222 def __init__( self ): … … 1391 1551 """Application start""" 1392 1552 1393 global PBSQuery, PBSError 1553 global PBSQuery, PBSError, lsfObject 1394 1554 global SYSLOG_FACILITY, USE_SYSLOG, BATCH_API, DAEMONIZE 1395 1555 … … 1416 1576 1417 1577 # Tested with SGE 6.0u11. 1418 # debug_msg( 0, "FATAL ERROR: BATCH_API 'sge' implementation is currently broken, check future releases" ) 1419 1420 # sys.exit( 1 ) 1421 1578 # 1422 1579 gather = SgeDataGatherer() 1580 1581 elif BATCH_API == 'lsf': 1582 1583 try: 1584 from lsfObject import lsfObject 1585 except: 1586 debug_msg(0, "fatal error: BATCH_API set to 'lsf' but python module is not found or installed") 1587 sys.exit( 1) 1588 1589 gather = LsfDataGatherer() 1423 1590 1424 1591 else: … … 1430 1597 1431 1598 syslog.openlog( 'jobmond', syslog.LOG_NOWAIT, SYSLOG_FACILITY ) 1432 1433 1599 1434 1600 if DAEMONIZE:
Note: See TracChangeset
for help on using the changeset viewer.