001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019package org.nuxeo.ecm.core.bulk.action.computation; 020 021import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 022 023import java.io.Serializable; 024import java.time.Instant; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.TimeUnit; 028 029import javax.security.auth.login.LoginContext; 030import javax.security.auth.login.LoginException; 031 032import org.apache.commons.collections4.map.PassiveExpiringMap; 033import org.apache.logging.log4j.LogManager; 034import org.apache.logging.log4j.Logger; 035import org.nuxeo.ecm.core.api.CloseableCoreSession; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModelList; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 041import org.nuxeo.ecm.core.bulk.BulkCodecs; 042import org.nuxeo.ecm.core.bulk.BulkService; 043import org.nuxeo.ecm.core.bulk.message.BulkBucket; 044import org.nuxeo.ecm.core.bulk.message.BulkCommand; 045import org.nuxeo.ecm.core.bulk.message.BulkStatus; 046import org.nuxeo.lib.stream.computation.AbstractComputation; 047import org.nuxeo.lib.stream.computation.ComputationContext; 048import org.nuxeo.lib.stream.computation.Record; 049import org.nuxeo.runtime.api.Framework; 050import org.nuxeo.runtime.transaction.TransactionHelper; 051 052import com.google.common.collect.Lists; 053 054/** 055 * Base class for bulk action computation. 056 * <p> 057 * Inputs: 058 * <ul> 059 * <li>i1: Reads {@link BulkBucket}</li> 060 * </ul> 061 * Outputs for the last computation of the processor 062 * <ul> 063 * <li>o1: Writes {@link BulkStatus} delta</li> 064 * </ul> 065 * 066 * @since 10.2 067 */ 068public abstract class AbstractBulkComputation extends AbstractComputation { 069 070 private static final Logger log = LogManager.getLogger(AbstractBulkComputation.class); 071 072 protected static final String SELECT_DOCUMENTS_IN = "SELECT * FROM Document, Relation WHERE ecm:uuid IN ('%s')"; 073 074 protected Map<String, BulkCommand> commands = new PassiveExpiringMap<>(60, TimeUnit.SECONDS); 075 076 protected BulkCommand command; 077 078 protected BulkStatus delta; 079 080 public AbstractBulkComputation(String name) { 081 this(name, 1); 082 } 083 084 public AbstractBulkComputation(String name, int nbOutputStreams) { 085 super(name, 1, nbOutputStreams); 086 } 087 088 @Override 089 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 090 BulkBucket bucket = BulkCodecs.getBucketCodec().decode(record.getData()); 091 command = getCommand(bucket.getCommandId()); 092 if (command != null) { 093 delta = BulkStatus.deltaOf(command.getId()); 094 delta.setProcessingStartTime(Instant.now()); 095 delta.setProcessed(bucket.getIds().size()); 096 startBucket(record.getKey()); 097 for (List<String> batch : Lists.partition(bucket.getIds(), command.getBatchSize())) { 098 processBatchOfDocuments(batch); 099 } 100 delta.setProcessingEndTime(Instant.now()); 101 endBucket(context, delta); 102 context.askForCheckpoint(); 103 } else { 104 if (isAbortedCommand(bucket.getCommandId())) { 105 log.debug("Skipping aborted command: {}", bucket.getCommandId()); 106 context.askForCheckpoint(); 107 } else { 108 // this requires a manual intervention, the kv store might have been lost 109 log.error("Stopping processing, unknown command: {}, offset: {}, record: {}.", 110 bucket.getCommandId(), context.getLastOffset(), record); 111 context.askForTermination(); 112 } 113 } 114 } 115 116 protected boolean isAbortedCommand(String commandId) { 117 BulkService bulkService = Framework.getService(BulkService.class); 118 BulkStatus status = bulkService.getStatus(commandId); 119 return ABORTED.equals(status.getState()); 120 } 121 122 protected BulkCommand getCommand(String commandId) { 123 // This is to remove expired/completed commands from the cache map 124 commands.size(); 125 return commands.computeIfAbsent(commandId, id -> Framework.getService(BulkService.class).getCommand(id)); 126 } 127 128 public BulkCommand getCurrentCommand() { 129 return command; 130 } 131 132 protected void processBatchOfDocuments(List<String> batch) { 133 if (batch == null || batch.isEmpty()) { 134 return; 135 } 136 TransactionHelper.runInTransaction(() -> { 137 try { 138 LoginContext loginContext = Framework.loginAsUser(command.getUsername()); 139 String repository = command.getRepository(); 140 try (CloseableCoreSession session = CoreInstance.openCoreSession(repository)) { 141 compute(session, batch, command.getParams()); 142 } finally { 143 loginContext.logout(); 144 } 145 } catch (LoginException e) { 146 throw new NuxeoException(e); 147 } 148 }); 149 } 150 151 /** 152 * Can be overridden to init stuff before processing the bucket 153 */ 154 public void startBucket(String bucketKey) { 155 // nothing to do 156 } 157 158 /** 159 * Can be overridden to write to downstream computation or add results to status 160 */ 161 public void endBucket(ComputationContext context, BulkStatus delta) { 162 updateStatus(context, delta); 163 } 164 165 @Override 166 public void processFailure(ComputationContext context, Throwable failure) { 167 log.error(String.format("Action: %s fails on record: %s after retries.", metadata.name(), 168 context.getLastOffset()), failure); 169 // The delta will be send only if the policy is set with continueOnFailure = true 170 delta.inError(metadata.name() + " fails on " + context.getLastOffset() + ": " + failure.getMessage()); 171 endBucket(context, delta); 172 } 173 174 public static void updateStatus(ComputationContext context, BulkStatus delta) { 175 context.produceRecord(OUTPUT_1, delta.getId(), BulkCodecs.getStatusCodec().encode(delta)); 176 } 177 178 protected abstract void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties); 179 180 /** 181 * Helper to load a list of documents. Documents without read access or that does not exists are not returned. 182 */ 183 public DocumentModelList loadDocuments(CoreSession session, List<String> documentIds) { 184 if (documentIds == null || documentIds.isEmpty()) { 185 return new DocumentModelListImpl(0); 186 } 187 return session.query(String.format(SELECT_DOCUMENTS_IN, String.join("', '", documentIds))); 188 } 189}