001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019
020package org.nuxeo.ecm.core.bulk.computation;
021
022import static java.lang.Math.min;
023import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED;
026import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.RUNNING;
027import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCROLLING_RUNNING;
028
029import java.time.Instant;
030import java.util.ArrayList;
031import java.util.List;
032
033import javax.security.auth.login.LoginContext;
034import javax.security.auth.login.LoginException;
035
036import org.apache.logging.log4j.LogManager;
037import org.apache.logging.log4j.Logger;
038import org.nuxeo.ecm.core.api.CloseableCoreSession;
039import org.nuxeo.ecm.core.api.CoreInstance;
040import org.nuxeo.ecm.core.api.DocumentNotFoundException;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.ScrollResult;
043import org.nuxeo.ecm.core.bulk.BulkAdminService;
044import org.nuxeo.ecm.core.bulk.BulkCodecs;
045import org.nuxeo.ecm.core.bulk.BulkService;
046import org.nuxeo.ecm.core.bulk.message.BulkBucket;
047import org.nuxeo.ecm.core.bulk.message.BulkCommand;
048import org.nuxeo.ecm.core.bulk.message.BulkStatus;
049import org.nuxeo.ecm.core.query.QueryParseException;
050import org.nuxeo.lib.stream.computation.AbstractComputation;
051import org.nuxeo.lib.stream.computation.ComputationContext;
052import org.nuxeo.lib.stream.computation.Record;
053import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.transaction.TransactionHelper;
056
057/**
058 * Materializes the document set for a command.
059 * <p>
060 * Inputs:
061 * <ul>
062 * <li>i1: Reads a stream of {@link BulkCommand} sharded by action</li>
063 * </ul>
064 * <p>
065 * Outputs:
066 * <ul>
067 * <li>- "actionName": Writes {@link BulkBucket} into the action stream</li>
068 * <li>- "status": Writes {@link BulkStatus} into the action stream</li>
069 * </ul>
070 *
071 * @since 10.2
072 */
073public class BulkScrollerComputation extends AbstractComputation {
074
075    private static final Logger log = LogManager.getLogger(BulkScrollerComputation.class);
076
077    public static final int MAX_SCROLL_SIZE = 4_000;
078
079    protected final int scrollBatchSize;
080
081    protected final int scrollKeepAliveSeconds;
082
083    protected final List<String> documentIds;
084
085    private final boolean produceImmediate;
086
087    /**
088     * @param name the computation name
089     * @param nbOutputStreams the number of registered bulk action streams
090     * @param scrollBatchSize the batch size to scroll
091     * @param scrollKeepAliveSeconds the scroll lifetime
092     * @param produceImmediate whether or not the record should be produced immedialitely while scrolling
093     */
094    public BulkScrollerComputation(String name, int nbOutputStreams, int scrollBatchSize, int scrollKeepAliveSeconds,
095            boolean produceImmediate) {
096        super(name, 1, nbOutputStreams);
097        this.scrollBatchSize = scrollBatchSize;
098        this.scrollKeepAliveSeconds = scrollKeepAliveSeconds;
099        this.produceImmediate = produceImmediate;
100        documentIds = new ArrayList<>(scrollBatchSize);
101    }
102
103    @Override
104    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
105        TransactionHelper.runInTransaction(() -> processRecord(context, record));
106    }
107
108    protected void processRecord(ComputationContext context, Record record) {
109        BulkCommand command = null;
110        try {
111            command = BulkCodecs.getCommandCodec().decode(record.getData());
112            String commandId = command.getId();
113            int bucketSize = command.getBucketSize() > 0 ? command.getBucketSize()
114                    : Framework.getService(BulkAdminService.class).getBucketSize(command.getAction());
115            int scrollSize = scrollBatchSize;
116            if (bucketSize > scrollSize) {
117                if (bucketSize <= MAX_SCROLL_SIZE) {
118                    scrollSize = bucketSize;
119                } else {
120                    log.warn("Bucket size: %d too big for command: %s, reduce to: %d", bucketSize, command,
121                            MAX_SCROLL_SIZE);
122                    scrollSize = bucketSize = MAX_SCROLL_SIZE;
123                }
124            }
125            updateStatusAsScrolling(context, commandId);
126            LoginContext loginContext = Framework.loginAsUser(command.getUsername());
127            try (CloseableCoreSession session = CoreInstance.openCoreSession(command.getRepository())) {
128                // scroll documents
129                ScrollResult<String> scroll = session.scroll(command.getQuery(), scrollSize, scrollKeepAliveSeconds);
130                long documentCount = 0;
131                long bucketNumber = 1;
132                while (scroll.hasResults()) {
133                    if (isAbortedCommand(commandId)) {
134                        log.debug("Skipping aborted command: {}", commandId);
135                        context.askForCheckpoint();
136                        return;
137                    }
138                    List<String> docIds = scroll.getResults();
139                    documentIds.addAll(docIds);
140                    while (documentIds.size() >= bucketSize) {
141                        produceBucket(context, command.getAction(), commandId, bucketSize, bucketNumber++);
142                    }
143
144                    documentCount += docIds.size();
145                    // next batch
146                    scroll = session.scroll(scroll.getScrollId());
147                    TransactionHelper.commitOrRollbackTransaction();
148                    TransactionHelper.startTransaction();
149                }
150                // send remaining document ids
151                // there's at most one record because we loop while scrolling
152                if (!documentIds.isEmpty()) {
153                    produceBucket(context, command.getAction(), commandId, bucketSize, bucketNumber++);
154                }
155                updateStatusAfterScroll(context, commandId, documentCount);
156            } catch (IllegalArgumentException | QueryParseException | DocumentNotFoundException e) {
157                log.error("Invalid query results in an empty document set: {}", command, e);
158                updateStatusAfterScroll(context, commandId, "Invalid query");
159            } finally {
160                loginContext.logout();
161            }
162        } catch (NuxeoException | LoginException e) {
163            if (command != null) {
164                log.error("Invalid command produces an empty document set: {}", command, e);
165                updateStatusAfterScroll(context, command.getId(), "Invalid command");
166            } else {
167                log.error("Discard invalid record: {}", record, e);
168            }
169
170        }
171        context.askForCheckpoint();
172    }
173
174    protected boolean isAbortedCommand(String commandId) {
175        BulkService bulkService = Framework.getService(BulkService.class);
176        BulkStatus status = bulkService.getStatus(commandId);
177        return ABORTED.equals(status.getState());
178    }
179
180    protected void updateStatusAsScrolling(ComputationContext context, String commandId) {
181        BulkStatus delta = BulkStatus.deltaOf(commandId);
182        delta.setState(SCROLLING_RUNNING);
183        delta.setScrollStartTime(Instant.now());
184        ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId,
185                BulkCodecs.getStatusCodec().encode(delta));
186    }
187
188    protected void updateStatusAfterScroll(ComputationContext context, String commandId, String errorMessage) {
189        updateStatusAfterScroll(context, commandId, 0, errorMessage);
190    }
191
192    protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount) {
193        updateStatusAfterScroll(context, commandId, documentCount, null);
194    }
195
196    protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount,
197            String errorMessage) {
198        BulkStatus delta = BulkStatus.deltaOf(commandId);
199        if (errorMessage != null) {
200            delta.inError(errorMessage);
201        }
202        if (documentCount == 0) {
203            delta.setState(COMPLETED);
204            delta.setCompletedTime(Instant.now());
205        } else {
206            delta.setState(RUNNING);
207        }
208        delta.setScrollEndTime(Instant.now());
209        delta.setTotal(documentCount);
210        ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId,
211                BulkCodecs.getStatusCodec().encode(delta));
212    }
213
214    /**
215     * Produces a bucket as a record to appropriate bulk action stream.
216     */
217    protected void produceBucket(ComputationContext context, String action, String commandId, int bucketSize,
218            long bucketNumber) {
219        List<String> ids = documentIds.subList(0, min(bucketSize, documentIds.size()));
220        BulkBucket bucket = new BulkBucket(commandId, ids);
221        String key = commandId + ":" + Long.toString(bucketNumber);
222        Record record = Record.of(key, BulkCodecs.getBucketCodec().encode(bucket));
223        if (produceImmediate) {
224            ((ComputationContextImpl) context).produceRecordImmediate(action, record);
225        } else {
226            context.produceRecord(action, record);
227        }
228        ids.clear(); // this clear the documentIds part that has been sent
229    }
230
231}