001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019package org.nuxeo.ecm.core.bulk.action.computation;
020
021import static org.nuxeo.ecm.core.api.security.SecurityConstants.SYSTEM_USERNAME;
022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
023
024import java.io.Serializable;
025import java.time.Instant;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import javax.security.auth.login.LoginException;
031
032import org.apache.commons.collections4.map.PassiveExpiringMap;
033import org.apache.logging.log4j.LogManager;
034import org.apache.logging.log4j.Logger;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModelList;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
040import org.nuxeo.ecm.core.bulk.BulkCodecs;
041import org.nuxeo.ecm.core.bulk.BulkService;
042import org.nuxeo.ecm.core.bulk.message.BulkBucket;
043import org.nuxeo.ecm.core.bulk.message.BulkCommand;
044import org.nuxeo.ecm.core.bulk.message.BulkStatus;
045import org.nuxeo.lib.stream.computation.AbstractComputation;
046import org.nuxeo.lib.stream.computation.ComputationContext;
047import org.nuxeo.lib.stream.computation.Record;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.api.login.NuxeoLoginContext;
050import org.nuxeo.runtime.transaction.TransactionHelper;
051
052import com.google.common.collect.Lists;
053
054/**
055 * Base class for bulk action computation.
056 * <p>
057 * Inputs:
058 * <ul>
059 * <li>i1: Reads {@link BulkBucket}</li>
060 * </ul>
061 * Outputs for the last computation of the processor
062 * <ul>
063 * <li>o1: Writes {@link BulkStatus} delta</li>
064 * </ul>
065 *
066 * @since 10.2
067 */
068public abstract class AbstractBulkComputation extends AbstractComputation {
069
070    private static final Logger log = LogManager.getLogger(AbstractBulkComputation.class);
071
072    protected static final String SELECT_DOCUMENTS_IN = "SELECT * FROM Document, Relation WHERE ecm:uuid IN ('%s')";
073
074    protected Map<String, BulkCommand> commands = new PassiveExpiringMap<>(60, TimeUnit.SECONDS);
075
076    protected BulkCommand command;
077
078    protected BulkStatus delta;
079
080    public AbstractBulkComputation(String name) {
081        this(name, 1);
082    }
083
084    public AbstractBulkComputation(String name, int nbOutputStreams) {
085        super(name, 1, nbOutputStreams);
086    }
087
088    @Override
089    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
090        BulkBucket bucket = BulkCodecs.getBucketCodec().decode(record.getData());
091        command = getCommand(bucket.getCommandId());
092        if (command != null) {
093            delta = BulkStatus.deltaOf(command.getId());
094            delta.setProcessingStartTime(Instant.now());
095            delta.setProcessed(bucket.getIds().size());
096            startBucket(record.getKey());
097            for (List<String> batch : Lists.partition(bucket.getIds(), command.getBatchSize())) {
098                processBatchOfDocuments(batch);
099            }
100            delta.setProcessingEndTime(Instant.now());
101            endBucket(context, delta);
102            context.askForCheckpoint();
103        } else {
104            if (isAbortedCommand(bucket.getCommandId())) {
105                log.debug("Skipping aborted command: {}", bucket.getCommandId());
106                context.askForCheckpoint();
107            } else {
108                // this requires a manual intervention, the kv store might have been lost
109                throw new IllegalStateException(String.format("Unknown command: %s, offset: %s, record: %s.",
110                        bucket.getCommandId(), context.getLastOffset(), record));
111            }
112        }
113    }
114
115    protected boolean isAbortedCommand(String commandId) {
116        BulkService bulkService = Framework.getService(BulkService.class);
117        BulkStatus status = bulkService.getStatus(commandId);
118        return ABORTED.equals(status.getState());
119    }
120
121    protected BulkCommand getCommand(String commandId) {
122        // This is to remove expired/completed commands from the cache map
123        commands.size();
124        return commands.computeIfAbsent(commandId, id -> Framework.getService(BulkService.class).getCommand(id));
125    }
126
127    public BulkCommand getCurrentCommand() {
128        return command;
129    }
130
131    protected void processBatchOfDocuments(List<String> batch) {
132        if (batch == null || batch.isEmpty()) {
133            return;
134        }
135        TransactionHelper.runInTransaction(() -> {
136            try {
137                String username = command.getUsername();
138                String repository = command.getRepository();
139                try (NuxeoLoginContext ignored = loginSystemOrUser(username)) {
140                    CoreSession session = repository == null ? null : CoreInstance.getCoreSession(repository);
141                    compute(session, batch, command.getParams());
142                }
143            } catch (LoginException e) {
144                throw new NuxeoException(e);
145            }
146        });
147    }
148
149    protected NuxeoLoginContext loginSystemOrUser(String username) throws LoginException {
150        return SYSTEM_USERNAME.equals(username) ? Framework.loginSystem() : Framework.loginUser(username);
151    }
152
153    /**
154     * Can be overridden to init stuff before processing the bucket
155     */
156    public void startBucket(String bucketKey) {
157        // nothing to do
158    }
159
160    /**
161     * Can be overridden to write to downstream computation or add results to status
162     */
163    public void endBucket(ComputationContext context, BulkStatus delta) {
164        updateStatus(context, delta);
165    }
166
167    @Override
168    public void processFailure(ComputationContext context, Throwable failure) {
169        log.error(String.format("Action: %s fails on record: %s after retries.", metadata.name(),
170                context.getLastOffset()), failure);
171        // The delta will be send only if the policy is set with continueOnFailure = true
172        delta.inError(metadata.name() + " fails on " + context.getLastOffset() + ": " + failure.getMessage());
173        endBucket(context, delta);
174    }
175
176    public static void updateStatus(ComputationContext context, BulkStatus delta) {
177        context.produceRecord(OUTPUT_1, delta.getId(), BulkCodecs.getStatusCodec().encode(delta));
178    }
179
180    protected abstract void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties);
181
182    /**
183     * Helper to load a list of documents. Documents without read access or that does not exists are not returned.
184     */
185    public DocumentModelList loadDocuments(CoreSession session, List<String> documentIds) {
186        if (documentIds == null || documentIds.isEmpty()) {
187            return new DocumentModelListImpl(0);
188        }
189        return session.query(String.format(SELECT_DOCUMENTS_IN, String.join("', '", documentIds)));
190    }
191}