001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.workmanager;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.event.EventServiceComponent;
024import org.nuxeo.ecm.core.work.WorkManagerImpl;
025import org.nuxeo.ecm.core.work.WorkQueueRegistry;
026import org.nuxeo.ecm.core.work.api.Work;
027import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
028import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
029import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
030import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings;
031import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology;
032import org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue.MQComputationManager;
033import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
034import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.model.ComponentContext;
037import org.nuxeo.runtime.transaction.TransactionHelper;
038
039import java.lang.reflect.Field;
040import java.time.Duration;
041import java.util.Collections;
042import java.util.HashSet;
043import java.util.List;
044import java.util.Set;
045import java.util.concurrent.TimeUnit;
046
047import javax.naming.NamingException;
048import javax.transaction.RollbackException;
049import javax.transaction.Status;
050import javax.transaction.Synchronization;
051import javax.transaction.SystemException;
052import javax.transaction.Transaction;
053import javax.transaction.TransactionManager;
054
055
056/**
057 * @since 9.2
058 */
059public abstract class WorkManagerComputation extends WorkManagerImpl {
060    protected static final Log log = LogFactory.getLog(WorkManagerComputation.class);
061    protected static final int DEFAULT_CONCURRENCY = 4;
062    protected Topology topology;
063    protected Settings settings;
064    protected MQComputationManager manager;
065    protected MQManager<Record> mqManager;
066    protected final Set<String> streamIds = new HashSet<>();
067
068    protected abstract MQManager<Record> initStream();
069
070    protected abstract int getOverProvisioningFactor();
071
072    public class WorkScheduling implements Synchronization {
073        public final Work work;
074        public final Scheduling scheduling;
075
076
077        public WorkScheduling(Work work, Scheduling scheduling) {
078            this.work = work;
079            this.scheduling = scheduling;
080        }
081
082        public void beforeCompletion() {
083        }
084
085        public void afterCompletion(int status) {
086            if (status == 3) {
087                WorkManagerComputation.this.schedule(this.work, this.scheduling, false);
088            } else {
089                if (status != 4) {
090                    throw new IllegalArgumentException("Unsupported transaction status " + status);
091                }
092            }
093
094        }
095    }
096
097    @Override
098    public void schedule(Work work, Scheduling scheduling, boolean afterCommit) {
099        String queueId = getStreamForCategory(work.getCategory());
100        if (log.isDebugEnabled()) {
101            log.debug(String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s",
102                    work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
103        }
104        if (!isQueuingEnabled(queueId)) {
105            log.info("Queue disabled, scheduling canceled: " + queueId);
106            return;
107        }
108        if (afterCommit && scheduleAfterCommit(work, scheduling)) {
109            return;
110        }
111        WorkSchedulePath.newInstance(work);
112        // TODO: take in account the scheduling state when possible
113
114        // TODO: may be choose a key with a transaction id so all jobs from the same tx are ordered ?
115        String key = work.getId();
116        MQAppender<Record> appender = mqManager.getAppender(getStreamForCategory(work.getCategory()));
117        if (appender == null) {
118            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(),
119                    getStreamForCategory(work.getCategory())));
120            return;
121        }
122        appender.append(key, Record.of(key, ComputationWork.serialize(work)));
123    }
124
125    public String getStreamForCategory(String category) {
126        if (category != null && streamIds.contains(category)) {
127            return category;
128        }
129        return "default";
130    }
131
132    @Override
133    public int getApplicationStartedOrder() {
134        // start before the WorkManagerImpl
135        return EventServiceComponent.APPLICATION_STARTED_ORDER - 2;
136    }
137
138    @Override
139    public void start(ComponentContext context) {
140        init();
141    }
142
143    public void init() {
144        if (started) {
145            return;
146        }
147        log.debug("Initializing");
148        synchronized (this) {
149            if (started) {
150                return;
151            }
152            supplantWorkManagerImpl();
153            initTopology();
154            this.mqManager = initStream();
155            startComputation();
156            started = true;
157            log.info("Initialized");
158        }
159    }
160
161    /**
162     * Hack to steal the WorkManagerImpl queue contributions.
163     */
164    protected void supplantWorkManagerImpl() {
165        WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
166        Class clazz = WorkManagerImpl.class;
167        Field protectedField;
168        try {
169            protectedField = clazz.getDeclaredField("workQueueConfig");
170        } catch (NoSuchFieldException e) {
171            throw new RuntimeException(e);
172        }
173        protectedField.setAccessible(true);
174        final WorkQueueRegistry wqr;
175        try {
176            wqr = (WorkQueueRegistry) protectedField.get(wmi);
177            log.debug("Remove contributions from WorkManagerImpl");
178            // Removes the WorkManagerImpl so it does not create any worker pool
179            protectedField.set(wmi, new WorkQueueRegistry());
180            // TODO: should we remove workQueuingConfig registry as well ?
181        } catch (IllegalAccessException e) {
182            throw new RuntimeException(e);
183        }
184        wqr.getQueueIds().forEach(id -> workQueueConfig.addContribution(wqr.get(id)));
185        streamIds.addAll(workQueueConfig.getQueueIds());
186        workQueueConfig.getQueueIds().forEach(id -> log.info("Registering : " + id));
187    }
188
189
190    protected void startComputation() {
191        this.manager = new MQComputationManager(mqManager, topology, settings);
192        manager.start();
193    }
194
195    protected void initTopology() {
196        Topology.Builder builder = Topology.builder();
197        workQueueConfig.getQueueIds().forEach(item -> builder.addComputation(() -> new ComputationWork(item), Collections.singletonList("i1:" + item)));
198        this.topology = builder.build();
199        this.settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY));
200        workQueueConfig.getQueueIds().forEach(item -> settings.setConcurrency(item, workQueueConfig.get(item).getMaxThreads()));
201        workQueueConfig.getQueueIds().forEach(item -> settings.setPartitions(item, getPartitions(workQueueConfig.get(item).getMaxThreads())));
202    }
203
204    protected int getPartitions(int maxThreads) {
205        if (maxThreads == 1) {
206            // when the pool size is 1, we don't want any concurrency
207            return 1;
208        }
209        return getOverProvisioningFactor() * maxThreads;
210    }
211
212    @Override
213    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
214        log.info("Shutdown WorkManager stream: " + queueId);
215        // TODO: decide what to do ?
216        return false;
217    }
218
219    @Override
220    public boolean shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
221        log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms");
222        boolean ret = manager.stop(Duration.ofMillis(timeUnit.toMillis(timeout)));
223        try {
224            mqManager.close();
225        } catch (Exception e) {
226            log.error("Error while closing WorkManager mqManager", e);
227        }
228        return ret;
229    }
230
231    @Override
232    public int getQueueSize(String queueId, Work.State state) {
233        return 0;
234    }
235
236    @Override
237    public WorkQueueMetrics getMetrics(String queueId) {
238        // TODO: find a way to expose some known metrics
239        return new WorkQueueMetrics(queueId, 0, 0, 0, 0);
240    }
241
242    @Override
243    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
244        if (!isStarted()) {
245            return true;
246        }
247        // wait that the low watermark get stable
248        long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow
249        long deadline = System.currentTimeMillis() + durationMs;
250        long lowWatermark = getLowWaterMark(queueId);
251        while (System.currentTimeMillis() < deadline) {
252            Thread.sleep(100);
253            long wm = getLowWaterMark(queueId);
254            if (wm == lowWatermark) {
255                log.debug("awaitCompletion for " + ((queueId == null) ? "all" : queueId) + " completed " + wm);
256                return true;
257            }
258            if (log.isDebugEnabled()) {
259                log.debug("awaitCompletion low wm  for " + ((queueId == null) ? "all" : queueId) + ":" + wm + " diff: " + (wm - lowWatermark));
260            }
261            lowWatermark = wm;
262        }
263        log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0));
264        return false;
265    }
266
267    private long getLowWaterMark(String queueId) {
268        if (queueId != null) {
269            return manager.getLowWatermark(queueId);
270        }
271        return manager.getLowWatermark();
272    }
273
274    @Override
275    public Work.State getWorkState(String s) {
276        // always not found
277        return null;
278    }
279
280    @Override
281    public Work find(String s, Work.State state) {
282        // always not found
283        return null;
284    }
285
286    @Override
287    public List<Work> listWork(String s, Work.State state) {
288        return Collections.emptyList();
289    }
290
291    @Override
292    public List<String> listWorkIds(String s, Work.State state) {
293        return Collections.emptyList();
294    }
295
296
297    @Override
298    protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) {
299        TransactionManager transactionManager;
300        try {
301            transactionManager = TransactionHelper.lookupTransactionManager();
302        } catch (NamingException e) {
303            transactionManager = null;
304        }
305        if (transactionManager == null) {
306            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
307            return false;
308        }
309        try {
310            Transaction transaction = transactionManager.getTransaction();
311            if (transaction == null) {
312                if (log.isDebugEnabled()) {
313                    log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
314                }
315                return false;
316            }
317            int status = transaction.getStatus();
318            if (status == Status.STATUS_ACTIVE) {
319                if (log.isDebugEnabled()) {
320                    log.debug("Scheduled after commit: " + work.getId());
321                }
322                transaction.registerSynchronization(new WorkManagerComputation.WorkScheduling(work, scheduling));
323                return true;
324            } else if (status == Status.STATUS_COMMITTED) {
325                // called in afterCompletion, we can schedule immediately
326                if (log.isDebugEnabled()) {
327                    log.debug("Scheduled immediately: " + work.getId());
328                }
329                return false;
330            } else if (status == Status.STATUS_MARKED_ROLLBACK) {
331                if (log.isDebugEnabled()) {
332                    log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
333                }
334                return true;
335            } else {
336                if (log.isDebugEnabled()) {
337                    log.debug("Not scheduling work after commit because transaction is in status " + status + ": "
338                            + work.getId());
339                }
340                return false;
341            }
342        } catch (SystemException | RollbackException e) {
343            log.error("Cannot schedule after commit", e);
344            return false;
345        }
346    }
347
348}