001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.workmanager; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.event.EventServiceComponent; 024import org.nuxeo.ecm.core.work.WorkManagerImpl; 025import org.nuxeo.ecm.core.work.WorkQueueRegistry; 026import org.nuxeo.ecm.core.work.api.Work; 027import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 028import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 029import org.nuxeo.ecm.platform.importer.mqueues.computation.Record; 030import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings; 031import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology; 032import org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue.MQComputationManager; 033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 035import org.nuxeo.runtime.api.Framework; 036import org.nuxeo.runtime.model.ComponentContext; 037import org.nuxeo.runtime.transaction.TransactionHelper; 038 039import java.lang.reflect.Field; 040import java.time.Duration; 041import java.util.Collections; 042import java.util.HashSet; 043import java.util.List; 044import java.util.Set; 045import java.util.concurrent.TimeUnit; 046 047import javax.naming.NamingException; 048import javax.transaction.RollbackException; 049import javax.transaction.Status; 050import javax.transaction.Synchronization; 051import javax.transaction.SystemException; 052import javax.transaction.Transaction; 053import javax.transaction.TransactionManager; 054 055 056/** 057 * @since 9.2 058 */ 059public abstract class WorkManagerComputation extends WorkManagerImpl { 060 protected static final Log log = LogFactory.getLog(WorkManagerComputation.class); 061 protected static final int DEFAULT_CONCURRENCY = 4; 062 protected Topology topology; 063 protected Settings settings; 064 protected MQComputationManager manager; 065 protected MQManager<Record> mqManager; 066 protected final Set<String> streamIds = new HashSet<>(); 067 068 protected abstract MQManager<Record> initStream(); 069 070 protected abstract int getOverProvisioningFactor(); 071 072 public class WorkScheduling implements Synchronization { 073 public final Work work; 074 public final Scheduling scheduling; 075 076 077 public WorkScheduling(Work work, Scheduling scheduling) { 078 this.work = work; 079 this.scheduling = scheduling; 080 } 081 082 public void beforeCompletion() { 083 } 084 085 public void afterCompletion(int status) { 086 if (status == 3) { 087 WorkManagerComputation.this.schedule(this.work, this.scheduling, false); 088 } else { 089 if (status != 4) { 090 throw new IllegalArgumentException("Unsupported transaction status " + status); 091 } 092 } 093 094 } 095 } 096 097 @Override 098 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 099 String queueId = getStreamForCategory(work.getCategory()); 100 if (log.isDebugEnabled()) { 101 log.debug(String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", 102 work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work)); 103 } 104 if (!isQueuingEnabled(queueId)) { 105 log.info("Queue disabled, scheduling canceled: " + queueId); 106 return; 107 } 108 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 109 return; 110 } 111 WorkSchedulePath.newInstance(work); 112 // TODO: take in account the scheduling state when possible 113 114 // TODO: may be choose a key with a transaction id so all jobs from the same tx are ordered ? 115 String key = work.getId(); 116 MQAppender<Record> appender = mqManager.getAppender(getStreamForCategory(work.getCategory())); 117 if (appender == null) { 118 log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), 119 getStreamForCategory(work.getCategory()))); 120 return; 121 } 122 appender.append(key, Record.of(key, ComputationWork.serialize(work))); 123 } 124 125 public String getStreamForCategory(String category) { 126 if (category != null && streamIds.contains(category)) { 127 return category; 128 } 129 return "default"; 130 } 131 132 @Override 133 public int getApplicationStartedOrder() { 134 // start before the WorkManagerImpl 135 return EventServiceComponent.APPLICATION_STARTED_ORDER - 2; 136 } 137 138 @Override 139 public void start(ComponentContext context) { 140 init(); 141 } 142 143 public void init() { 144 if (started) { 145 return; 146 } 147 log.debug("Initializing"); 148 synchronized (this) { 149 if (started) { 150 return; 151 } 152 supplantWorkManagerImpl(); 153 initTopology(); 154 this.mqManager = initStream(); 155 startComputation(); 156 started = true; 157 log.info("Initialized"); 158 } 159 } 160 161 /** 162 * Hack to steal the WorkManagerImpl queue contributions. 163 */ 164 protected void supplantWorkManagerImpl() { 165 WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service"); 166 Class clazz = WorkManagerImpl.class; 167 Field protectedField; 168 try { 169 protectedField = clazz.getDeclaredField("workQueueConfig"); 170 } catch (NoSuchFieldException e) { 171 throw new RuntimeException(e); 172 } 173 protectedField.setAccessible(true); 174 final WorkQueueRegistry wqr; 175 try { 176 wqr = (WorkQueueRegistry) protectedField.get(wmi); 177 log.debug("Remove contributions from WorkManagerImpl"); 178 // Removes the WorkManagerImpl so it does not create any worker pool 179 protectedField.set(wmi, new WorkQueueRegistry()); 180 // TODO: should we remove workQueuingConfig registry as well ? 181 } catch (IllegalAccessException e) { 182 throw new RuntimeException(e); 183 } 184 wqr.getQueueIds().forEach(id -> workQueueConfig.addContribution(wqr.get(id))); 185 streamIds.addAll(workQueueConfig.getQueueIds()); 186 workQueueConfig.getQueueIds().forEach(id -> log.info("Registering : " + id)); 187 } 188 189 190 protected void startComputation() { 191 this.manager = new MQComputationManager(mqManager, topology, settings); 192 manager.start(); 193 } 194 195 protected void initTopology() { 196 Topology.Builder builder = Topology.builder(); 197 workQueueConfig.getQueueIds().forEach(item -> builder.addComputation(() -> new ComputationWork(item), Collections.singletonList("i1:" + item))); 198 this.topology = builder.build(); 199 this.settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY)); 200 workQueueConfig.getQueueIds().forEach(item -> settings.setConcurrency(item, workQueueConfig.get(item).getMaxThreads())); 201 workQueueConfig.getQueueIds().forEach(item -> settings.setPartitions(item, getPartitions(workQueueConfig.get(item).getMaxThreads()))); 202 } 203 204 protected int getPartitions(int maxThreads) { 205 if (maxThreads == 1) { 206 // when the pool size is 1, we don't want any concurrency 207 return 1; 208 } 209 return getOverProvisioningFactor() * maxThreads; 210 } 211 212 @Override 213 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 214 log.info("Shutdown WorkManager stream: " + queueId); 215 // TODO: decide what to do ? 216 return false; 217 } 218 219 @Override 220 public boolean shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException { 221 log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"); 222 boolean ret = manager.stop(Duration.ofMillis(timeUnit.toMillis(timeout))); 223 try { 224 mqManager.close(); 225 } catch (Exception e) { 226 log.error("Error while closing WorkManager mqManager", e); 227 } 228 return ret; 229 } 230 231 @Override 232 public int getQueueSize(String queueId, Work.State state) { 233 return 0; 234 } 235 236 @Override 237 public WorkQueueMetrics getMetrics(String queueId) { 238 // TODO: find a way to expose some known metrics 239 return new WorkQueueMetrics(queueId, 0, 0, 0, 0); 240 } 241 242 @Override 243 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 244 if (!isStarted()) { 245 return true; 246 } 247 // wait that the low watermark get stable 248 long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 249 long deadline = System.currentTimeMillis() + durationMs; 250 long lowWatermark = getLowWaterMark(queueId); 251 while (System.currentTimeMillis() < deadline) { 252 Thread.sleep(100); 253 long wm = getLowWaterMark(queueId); 254 if (wm == lowWatermark) { 255 log.debug("awaitCompletion for " + ((queueId == null) ? "all" : queueId) + " completed " + wm); 256 return true; 257 } 258 if (log.isDebugEnabled()) { 259 log.debug("awaitCompletion low wm for " + ((queueId == null) ? "all" : queueId) + ":" + wm + " diff: " + (wm - lowWatermark)); 260 } 261 lowWatermark = wm; 262 } 263 log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0)); 264 return false; 265 } 266 267 private long getLowWaterMark(String queueId) { 268 if (queueId != null) { 269 return manager.getLowWatermark(queueId); 270 } 271 return manager.getLowWatermark(); 272 } 273 274 @Override 275 public Work.State getWorkState(String s) { 276 // always not found 277 return null; 278 } 279 280 @Override 281 public Work find(String s, Work.State state) { 282 // always not found 283 return null; 284 } 285 286 @Override 287 public List<Work> listWork(String s, Work.State state) { 288 return Collections.emptyList(); 289 } 290 291 @Override 292 public List<String> listWorkIds(String s, Work.State state) { 293 return Collections.emptyList(); 294 } 295 296 297 @Override 298 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 299 TransactionManager transactionManager; 300 try { 301 transactionManager = TransactionHelper.lookupTransactionManager(); 302 } catch (NamingException e) { 303 transactionManager = null; 304 } 305 if (transactionManager == null) { 306 log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId()); 307 return false; 308 } 309 try { 310 Transaction transaction = transactionManager.getTransaction(); 311 if (transaction == null) { 312 if (log.isDebugEnabled()) { 313 log.debug("Not scheduled work after commit because of missing transaction: " + work.getId()); 314 } 315 return false; 316 } 317 int status = transaction.getStatus(); 318 if (status == Status.STATUS_ACTIVE) { 319 if (log.isDebugEnabled()) { 320 log.debug("Scheduled after commit: " + work.getId()); 321 } 322 transaction.registerSynchronization(new WorkManagerComputation.WorkScheduling(work, scheduling)); 323 return true; 324 } else if (status == Status.STATUS_COMMITTED) { 325 // called in afterCompletion, we can schedule immediately 326 if (log.isDebugEnabled()) { 327 log.debug("Scheduled immediately: " + work.getId()); 328 } 329 return false; 330 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 331 if (log.isDebugEnabled()) { 332 log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId()); 333 } 334 return true; 335 } else { 336 if (log.isDebugEnabled()) { 337 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 338 + work.getId()); 339 } 340 return false; 341 } 342 } catch (SystemException | RollbackException e) { 343 log.error("Cannot schedule after commit", e); 344 return false; 345 } 346 } 347 348}