001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019
020package org.nuxeo.ecm.core.bulk.computation;
021
022import static java.lang.Math.min;
023import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED;
026import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.RUNNING;
027import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCROLLING_RUNNING;
028
029import java.time.Duration;
030import java.time.Instant;
031import java.util.ArrayList;
032import java.util.List;
033
034import org.apache.logging.log4j.LogManager;
035import org.apache.logging.log4j.Logger;
036import org.nuxeo.ecm.core.api.DocumentNotFoundException;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.api.scroll.Scroll;
039import org.nuxeo.ecm.core.api.scroll.ScrollRequest;
040import org.nuxeo.ecm.core.api.scroll.ScrollService;
041import org.nuxeo.ecm.core.bulk.BulkAdminService;
042import org.nuxeo.ecm.core.bulk.BulkCodecs;
043import org.nuxeo.ecm.core.bulk.BulkService;
044import org.nuxeo.ecm.core.bulk.message.BulkBucket;
045import org.nuxeo.ecm.core.bulk.message.BulkCommand;
046import org.nuxeo.ecm.core.bulk.message.BulkStatus;
047import org.nuxeo.ecm.core.query.QueryParseException;
048import org.nuxeo.ecm.core.scroll.DocumentScrollRequest;
049import org.nuxeo.ecm.core.scroll.EmptyScrollRequest;
050import org.nuxeo.ecm.core.scroll.GenericScrollRequest;
051import org.nuxeo.lib.stream.computation.AbstractComputation;
052import org.nuxeo.lib.stream.computation.ComputationContext;
053import org.nuxeo.lib.stream.computation.Record;
054import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
055import org.nuxeo.runtime.api.Framework;
056import org.nuxeo.runtime.transaction.TransactionHelper;
057import org.nuxeo.runtime.transaction.TransactionRuntimeException;
058
059/**
060 * Materializes the document set for a command if scroller is not external.
061 * <p>
062 * Inputs:
063 * <ul>
064 * <li>i1: Reads a stream of {@link BulkCommand} sharded by action</li>
065 * </ul>
066 * <p>
067 * Outputs:
068 * <ul>
069 * <li>- "actionName": Writes {@link BulkBucket} into the action stream</li>
070 * <li>- "status": Writes {@link BulkStatus} into the action stream</li>
071 * </ul>
072 *
073 * @since 10.2
074 */
075public class BulkScrollerComputation extends AbstractComputation {
076
077    private static final Logger log = LogManager.getLogger(BulkScrollerComputation.class);
078
079    public static final int MAX_SCROLL_SIZE = 4_000;
080
081    protected final int scrollBatchSize;
082
083    protected final int scrollKeepAliveSeconds;
084
085    protected final List<String> documentIds;
086
087    protected final boolean produceImmediate;
088
089    // @since 11.4
090    protected final long produceImmediateThreshold;
091
092    protected final int transactionTimeoutSeconds;
093
094    protected int scrollSize;
095
096    protected int bucketSize;
097
098    protected String actionStream;
099
100    public static Builder builder(String name, int nbOutputStreams) {
101        return new Builder(name, nbOutputStreams);
102    }
103
104    protected BulkScrollerComputation(Builder builder) {
105        super(builder.name, 1, builder.nbOutputStreams);
106        this.scrollBatchSize = builder.scrollBatchSize;
107        this.scrollKeepAliveSeconds = builder.scrollKeepAliveSeconds;
108        this.produceImmediate = builder.produceImmediate;
109        this.produceImmediateThreshold = builder.produceImmediateThreshold;
110        this.transactionTimeoutSeconds = Math.toIntExact(builder.transactionTimeout.toSeconds());
111        documentIds = new ArrayList<>(scrollBatchSize);
112    }
113
114    /**
115     * @param name the computation name
116     * @param nbOutputStreams the number of registered bulk action streams
117     * @param scrollBatchSize the batch size to scroll
118     * @param scrollKeepAliveSeconds the scroll lifetime
119     * @param produceImmediate whether or not the record should be produced immedialitely while scrolling
120     */
121    public BulkScrollerComputation(String name, int nbOutputStreams, int scrollBatchSize, int scrollKeepAliveSeconds,
122            boolean produceImmediate) {
123        this(builder(name, nbOutputStreams).setScrollBatchSize(scrollBatchSize)
124                                           .setScrollKeepAliveSeconds(scrollKeepAliveSeconds)
125                                           .setProduceImmediate(produceImmediate));
126    }
127
128    // @since 11.2
129    public BulkScrollerComputation(String name, int nbOutputStreams, int scrollBatchSize, int scrollKeepAliveSeconds,
130            Duration transactionTimeout, boolean produceImmediate) {
131        this(builder(name, nbOutputStreams).setScrollBatchSize(scrollBatchSize)
132                                           .setScrollKeepAliveSeconds(scrollKeepAliveSeconds)
133                                           .setProduceImmediate(produceImmediate)
134                                           .setTransactionTimeout(transactionTimeout));
135    }
136
137    @Override
138    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
139        boolean newTransaction = true;
140        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
141            newTransaction = false;
142            log.warn("Already inside a transaction, timeout cannot be applied, record: " + record,
143                    new Throwable("stack"));
144        } else if (!TransactionHelper.startTransaction(transactionTimeoutSeconds)) {
145            throw new TransactionRuntimeException("Cannot start transaction");
146        }
147        try {
148            processRecord(context, record);
149        } finally {
150            if (newTransaction) {
151                // Always rollback because we don't write anything
152                TransactionHelper.setTransactionRollbackOnly();
153                TransactionHelper.commitOrRollbackTransaction();
154            }
155        }
156    }
157
158    protected void processRecord(ComputationContext context, Record record) {
159        BulkCommand command = null;
160        String commandId = null;
161        try {
162            command = BulkCodecs.getCommandCodec().decode(record.getData());
163            commandId = command.getId();
164            getCommandConfiguration(command);
165            updateStatusAsScrolling(context, commandId);
166
167            long documentCount = 0;
168            long bucketNumber = 1;
169            final long queryLimit = getQueryLimit(command);
170            boolean limitReached = false;
171            scrollLoop: try (Scroll scroll = buildScroll(command)) {
172                while (scroll.hasNext()) {
173                    if (isAbortedCommand(commandId)) {
174                        log.debug("Skipping aborted command: {}", commandId);
175                        context.askForCheckpoint();
176                        return;
177                    }
178                    List<String> docIds = scroll.next();
179                    log.debug("docIds: {}", docIds);
180                    int scrollCount = docIds.size();
181                    if (documentCount + scrollCount < queryLimit) {
182                        documentIds.addAll(docIds);
183                    } else {
184                        scrollCount = Math.toIntExact(queryLimit - documentCount);
185                        documentIds.addAll(docIds.subList(0, scrollCount));
186                        limitReached = true;
187                    }
188                    while (documentIds.size() >= bucketSize) {
189                        produceBucket(context, commandId, bucketSize, bucketNumber++, documentCount);
190                    }
191                    documentCount += scrollCount;
192                    if (limitReached) {
193                        log.warn("Scroll limit {} reached for command {}", queryLimit, commandId);
194                        break scrollLoop;
195                    }
196                }
197            }
198
199            // send remaining document ids
200            // there's at most one record because we loop while scrolling
201            if (!documentIds.isEmpty()) {
202                produceBucket(context, commandId, bucketSize, bucketNumber++, documentCount);
203            }
204            // update status after scroll when we handle the scroller
205            if (!command.useExternalScroller()) {
206                updateStatusAfterScroll(context, commandId, documentCount, limitReached);
207            }
208        } catch (IllegalArgumentException | QueryParseException | DocumentNotFoundException e) {
209            log.error("Invalid query results in an empty document set: {}", command, e);
210            updateStatusAfterScroll(context, commandId, "Invalid query");
211        } catch (NuxeoException e) {
212            if (command != null) {
213                log.error("Invalid command produces an empty document set: {}", command, e);
214                updateStatusAfterScroll(context, command.getId(), "Invalid command");
215            } else {
216                log.error("Discard invalid record: {}", record, e);
217            }
218        }
219        context.askForCheckpoint();
220    }
221
222    private long getQueryLimit(BulkCommand command) {
223        Long limit = command.getQueryLimit();
224        if (limit == null || limit <= 0) {
225            return Long.MAX_VALUE;
226        }
227        return limit;
228    }
229
230    protected Scroll buildScroll(BulkCommand command) {
231        ScrollRequest request;
232        String query = command.getQuery();
233        log.debug("Build scroll with query: {}", query);
234        if (command.useExternalScroller()) {
235            request = EmptyScrollRequest.of();
236
237        } else if (command.useGenericScroller()) {
238            request = GenericScrollRequest.builder(command.getScroller(), query)
239                                          .options(command.getParams())
240                                          .size(scrollSize)
241                                          .build();
242
243        } else {
244            request = DocumentScrollRequest.builder(query)
245                                           .username(command.getUsername())
246                                           .repository(command.getRepository())
247                                           .size(scrollSize)
248                                           .timeout(Duration.ofSeconds(scrollKeepAliveSeconds))
249                                           .name(command.getScroller())
250                                           .build();
251        }
252        ScrollService service = Framework.getService(ScrollService.class);
253        return service.scroll(request);
254    }
255
256    protected void getCommandConfiguration(BulkCommand command) {
257        BulkAdminService actionService = Framework.getService(BulkAdminService.class);
258        bucketSize = command.getBucketSize() > 0 ? command.getBucketSize()
259                : actionService.getBucketSize(command.getAction());
260        scrollSize = scrollBatchSize;
261        if (bucketSize > scrollSize) {
262            if (bucketSize <= MAX_SCROLL_SIZE) {
263                scrollSize = bucketSize;
264            } else {
265                log.warn("Bucket size: {} too big for command: {}, reduce to: {}", bucketSize, command,
266                        MAX_SCROLL_SIZE);
267                scrollSize = bucketSize = MAX_SCROLL_SIZE;
268            }
269        }
270        actionStream = actionService.getInputStream(command.getAction());
271    }
272
273    protected boolean isAbortedCommand(String commandId) {
274        BulkService bulkService = Framework.getService(BulkService.class);
275        BulkStatus status = bulkService.getStatus(commandId);
276        return ABORTED.equals(status.getState());
277    }
278
279    protected void updateStatusAsScrolling(ComputationContext context, String commandId) {
280        BulkStatus delta = BulkStatus.deltaOf(commandId);
281        delta.setState(SCROLLING_RUNNING);
282        delta.setScrollStartTime(Instant.now());
283        ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId,
284                BulkCodecs.getStatusCodec().encode(delta));
285    }
286
287    protected void updateStatusAfterScroll(ComputationContext context, String commandId, String errorMessage) {
288        updateStatusAfterScroll(context, commandId, 0, errorMessage, false);
289    }
290
291    protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount,
292            boolean limited) {
293        updateStatusAfterScroll(context, commandId, documentCount, null, limited);
294    }
295
296    protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount,
297            String errorMessage, boolean limited) {
298        BulkStatus delta = BulkStatus.deltaOf(commandId);
299        if (errorMessage != null) {
300            delta.inError(errorMessage);
301        }
302        if (documentCount == 0) {
303            delta.setState(COMPLETED);
304            delta.setCompletedTime(Instant.now());
305        } else {
306            delta.setState(RUNNING);
307        }
308        delta.setScrollEndTime(Instant.now());
309        delta.setTotal(documentCount);
310        delta.setQueryLimitReached(limited);
311        ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId,
312                BulkCodecs.getStatusCodec().encode(delta));
313    }
314
315    /**
316     * Produces a bucket as a record to appropriate bulk action stream.
317     */
318    protected void produceBucket(ComputationContext context, String commandId, int bucketSize, long bucketNumber,
319            long documentCount) {
320        List<String> ids = documentIds.subList(0, min(bucketSize, documentIds.size()));
321        BulkBucket bucket = new BulkBucket(commandId, ids);
322        String key = commandId + ":" + Long.toString(bucketNumber);
323        Record record = Record.of(key, BulkCodecs.getBucketCodec().encode(bucket));
324        if (produceImmediate || (produceImmediateThreshold > 0 && documentCount > produceImmediateThreshold)) {
325            ComputationContextImpl contextImpl = (ComputationContextImpl) context;
326            if (!contextImpl.getRecords(actionStream).isEmpty()) {
327                flushRecords(contextImpl, commandId);
328            }
329            contextImpl.produceRecordImmediate(actionStream, record);
330        } else {
331            context.produceRecord(actionStream, record);
332        }
333        ids.clear(); // this clear the documentIds part that has been sent
334    }
335
336    protected void flushRecords(ComputationContextImpl contextImpl, String commandId) {
337        log.warn("Scroller records threshold reached ({}) for action: {} on command: {}, flushing records downstream",
338                produceImmediateThreshold, actionStream, commandId);
339        contextImpl.getRecords(actionStream)
340                   .forEach(record -> contextImpl.produceRecordImmediate(actionStream, record));
341        contextImpl.getRecords(actionStream).clear();
342    }
343
344    /**
345     * @since 11.4
346     */
347    public static class Builder {
348        protected String name;
349
350        protected int nbOutputStreams;
351
352        protected int scrollBatchSize;
353
354        protected int scrollKeepAliveSeconds;
355
356        protected boolean produceImmediate;
357
358        protected int produceImmediateThreshold;
359
360        protected Duration transactionTimeout;
361
362        protected long queryLimit;
363
364        /**
365         * @param name the computation name
366         * @param nbOutputStream the number of registered bulk action streams
367         */
368        public Builder(String name, int nbOutputStream) {
369            this.name = name;
370            this.nbOutputStreams = nbOutputStream;
371        }
372
373        /**
374         * @param scrollBatchSize the batch size to scroll
375         */
376        public Builder setScrollBatchSize(int scrollBatchSize) {
377            this.scrollBatchSize = scrollBatchSize;
378            return this;
379        }
380
381        /**
382         * @param scrollKeepAliveSeconds the scroll lifetime between fetch
383         */
384        public Builder setScrollKeepAliveSeconds(int scrollKeepAliveSeconds) {
385            this.scrollKeepAliveSeconds = scrollKeepAliveSeconds;
386            return this;
387        }
388
389        /**
390         * @param produceImmediate whether or not the record should be produced immediately while scrolling
391         */
392        public Builder setProduceImmediate(boolean produceImmediate) {
393            this.produceImmediate = produceImmediate;
394            return this;
395        }
396
397        /**
398         * @param produceImmediateThreshold produce record immediately after the threshold to prevent OOM
399         */
400        public Builder setProduceImmediateThreshold(int produceImmediateThreshold) {
401            this.produceImmediateThreshold = produceImmediateThreshold;
402            return this;
403        }
404
405        /**
406         * @param transactionTimeout set an explicit transaction timeout for the scroll
407         */
408        public Builder setTransactionTimeout(Duration transactionTimeout) {
409            this.transactionTimeout = transactionTimeout;
410            return this;
411        }
412
413        public BulkScrollerComputation build() {
414            return new BulkScrollerComputation(this);
415        }
416    }
417}