001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019package org.nuxeo.ecm.core.bulk.action.computation;
020
021import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
022
023import java.io.Serializable;
024import java.time.Instant;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.TimeUnit;
028
029import javax.security.auth.login.LoginContext;
030import javax.security.auth.login.LoginException;
031
032import org.apache.commons.collections4.map.PassiveExpiringMap;
033import org.apache.logging.log4j.LogManager;
034import org.apache.logging.log4j.Logger;
035import org.nuxeo.ecm.core.api.CloseableCoreSession;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModelList;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
041import org.nuxeo.ecm.core.bulk.BulkCodecs;
042import org.nuxeo.ecm.core.bulk.BulkService;
043import org.nuxeo.ecm.core.bulk.message.BulkBucket;
044import org.nuxeo.ecm.core.bulk.message.BulkCommand;
045import org.nuxeo.ecm.core.bulk.message.BulkStatus;
046import org.nuxeo.lib.stream.computation.AbstractComputation;
047import org.nuxeo.lib.stream.computation.ComputationContext;
048import org.nuxeo.lib.stream.computation.Record;
049import org.nuxeo.runtime.api.Framework;
050import org.nuxeo.runtime.transaction.TransactionHelper;
051
052import com.google.common.collect.Lists;
053
054/**
055 * Base class for bulk action computation.
056 * <p>
057 * Inputs:
058 * <ul>
059 * <li>i1: Reads {@link BulkBucket}</li>
060 * </ul>
061 * Outputs for the last computation of the processor
062 * <ul>
063 * <li>o1: Writes {@link BulkStatus} delta</li>
064 * </ul>
065 *
066 * @since 10.2
067 */
068public abstract class AbstractBulkComputation extends AbstractComputation {
069
070    private static final Logger log = LogManager.getLogger(AbstractBulkComputation.class);
071
072    protected static final String SELECT_DOCUMENTS_IN = "SELECT * FROM Document, Relation WHERE ecm:uuid IN ('%s')";
073
074    protected Map<String, BulkCommand> commands = new PassiveExpiringMap<>(60, TimeUnit.SECONDS);
075
076    protected BulkCommand command;
077
078    protected BulkStatus delta;
079
080    public AbstractBulkComputation(String name) {
081        this(name, 1);
082    }
083
084    public AbstractBulkComputation(String name, int nbOutputStreams) {
085        super(name, 1, nbOutputStreams);
086    }
087
088    @Override
089    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
090        BulkBucket bucket = BulkCodecs.getBucketCodec().decode(record.getData());
091        command = getCommand(bucket.getCommandId());
092        if (command != null) {
093            delta = BulkStatus.deltaOf(command.getId());
094            delta.setProcessingStartTime(Instant.now());
095            delta.setProcessed(bucket.getIds().size());
096            startBucket(record.getKey());
097            for (List<String> batch : Lists.partition(bucket.getIds(), command.getBatchSize())) {
098                processBatchOfDocuments(batch);
099            }
100            delta.setProcessingEndTime(Instant.now());
101            endBucket(context, delta);
102            context.askForCheckpoint();
103        } else {
104            if (isAbortedCommand(bucket.getCommandId())) {
105                log.debug("Skipping aborted command: {}", bucket.getCommandId());
106                context.askForCheckpoint();
107            } else {
108                // this requires a manual intervention, the kv store might have been lost
109                log.error("Stopping processing, unknown command: {}, offset: {}, record: {}.",
110                        bucket.getCommandId(), context.getLastOffset(), record);
111                context.askForTermination();
112            }
113        }
114    }
115
116    protected boolean isAbortedCommand(String commandId) {
117        BulkService bulkService = Framework.getService(BulkService.class);
118        BulkStatus status = bulkService.getStatus(commandId);
119        return ABORTED.equals(status.getState());
120    }
121
122    protected BulkCommand getCommand(String commandId) {
123        // This is to remove expired/completed commands from the cache map
124        commands.size();
125        return commands.computeIfAbsent(commandId, id -> Framework.getService(BulkService.class).getCommand(id));
126    }
127
128    public BulkCommand getCurrentCommand() {
129        return command;
130    }
131
132    protected void processBatchOfDocuments(List<String> batch) {
133        if (batch == null || batch.isEmpty()) {
134            return;
135        }
136        TransactionHelper.runInTransaction(() -> {
137            try {
138                LoginContext loginContext = Framework.loginAsUser(command.getUsername());
139                String repository = command.getRepository();
140                try (CloseableCoreSession session = CoreInstance.openCoreSession(repository)) {
141                    compute(session, batch, command.getParams());
142                } finally {
143                    loginContext.logout();
144                }
145            } catch (LoginException e) {
146                throw new NuxeoException(e);
147            }
148        });
149    }
150
151    /**
152     * Can be overridden to init stuff before processing the bucket
153     */
154    public void startBucket(String bucketKey) {
155        // nothing to do
156    }
157
158    /**
159     * Can be overridden to write to downstream computation or add results to status
160     */
161    public void endBucket(ComputationContext context, BulkStatus delta) {
162        updateStatus(context, delta);
163    }
164
165    @Override
166    public void processFailure(ComputationContext context, Throwable failure) {
167        log.error(String.format("Action: %s fails on record: %s after retries.", metadata.name(),
168                context.getLastOffset()), failure);
169        // The delta will be send only if the policy is set with continueOnFailure = true
170        delta.inError(metadata.name() + " fails on " + context.getLastOffset() + ": " + failure.getMessage());
171        endBucket(context, delta);
172    }
173
174    public static void updateStatus(ComputationContext context, BulkStatus delta) {
175        context.produceRecord(OUTPUT_1, delta.getId(), BulkCodecs.getStatusCodec().encode(delta));
176    }
177
178    protected abstract void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties);
179
180    /**
181     * Helper to load a list of documents. Documents without read access or that does not exists are not returned.
182     */
183    public DocumentModelList loadDocuments(CoreSession session, List<String> documentIds) {
184        if (documentIds == null || documentIds.isEmpty()) {
185            return new DocumentModelListImpl(0);
186        }
187        return session.query(String.format(SELECT_DOCUMENTS_IN, String.join("', '", documentIds)));
188    }
189}