001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019package org.nuxeo.ecm.core.bulk.action.computation; 020 021import static org.nuxeo.ecm.core.api.security.SecurityConstants.SYSTEM_USERNAME; 022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 023 024import java.io.Serializable; 025import java.time.Instant; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import javax.security.auth.login.LoginException; 031 032import org.apache.commons.collections4.map.PassiveExpiringMap; 033import org.apache.logging.log4j.LogManager; 034import org.apache.logging.log4j.Logger; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModelList; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl; 040import org.nuxeo.ecm.core.bulk.BulkCodecs; 041import org.nuxeo.ecm.core.bulk.BulkService; 042import org.nuxeo.ecm.core.bulk.message.BulkBucket; 043import org.nuxeo.ecm.core.bulk.message.BulkCommand; 044import org.nuxeo.ecm.core.bulk.message.BulkStatus; 045import org.nuxeo.lib.stream.computation.AbstractComputation; 046import org.nuxeo.lib.stream.computation.ComputationContext; 047import org.nuxeo.lib.stream.computation.Record; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.api.login.NuxeoLoginContext; 050import org.nuxeo.runtime.transaction.TransactionHelper; 051 052import com.google.common.collect.Lists; 053 054/** 055 * Base class for bulk action computation. 056 * <p> 057 * Inputs: 058 * <ul> 059 * <li>i1: Reads {@link BulkBucket}</li> 060 * </ul> 061 * Outputs for the last computation of the processor 062 * <ul> 063 * <li>o1: Writes {@link BulkStatus} delta</li> 064 * </ul> 065 * 066 * @since 10.2 067 */ 068public abstract class AbstractBulkComputation extends AbstractComputation { 069 070 private static final Logger log = LogManager.getLogger(AbstractBulkComputation.class); 071 072 protected static final String SELECT_DOCUMENTS_IN = "SELECT * FROM Document, Relation WHERE ecm:uuid IN ('%s')"; 073 074 protected Map<String, BulkCommand> commands = new PassiveExpiringMap<>(60, TimeUnit.SECONDS); 075 076 protected BulkCommand command; 077 078 protected BulkStatus delta; 079 080 public AbstractBulkComputation(String name) { 081 this(name, 1); 082 } 083 084 public AbstractBulkComputation(String name, int nbOutputStreams) { 085 super(name, 1, nbOutputStreams); 086 } 087 088 @Override 089 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 090 BulkBucket bucket = BulkCodecs.getBucketCodec().decode(record.getData()); 091 command = getCommand(bucket.getCommandId()); 092 if (command != null) { 093 delta = BulkStatus.deltaOf(command.getId()); 094 delta.setProcessingStartTime(Instant.now()); 095 delta.setProcessed(bucket.getIds().size()); 096 startBucket(record.getKey()); 097 for (List<String> batch : Lists.partition(bucket.getIds(), command.getBatchSize())) { 098 processBatchOfDocuments(batch); 099 } 100 delta.setProcessingEndTime(Instant.now()); 101 endBucket(context, delta); 102 context.askForCheckpoint(); 103 } else { 104 if (isAbortedCommand(bucket.getCommandId())) { 105 log.debug("Skipping aborted command: {}", bucket.getCommandId()); 106 context.askForCheckpoint(); 107 } else { 108 // this requires a manual intervention, the kv store might have been lost 109 throw new IllegalStateException(String.format("Unknown command: %s, offset: %s, record: %s.", 110 bucket.getCommandId(), context.getLastOffset(), record)); 111 } 112 } 113 } 114 115 protected boolean isAbortedCommand(String commandId) { 116 BulkService bulkService = Framework.getService(BulkService.class); 117 BulkStatus status = bulkService.getStatus(commandId); 118 return ABORTED.equals(status.getState()); 119 } 120 121 protected BulkCommand getCommand(String commandId) { 122 // This is to remove expired/completed commands from the cache map 123 commands.size(); 124 return commands.computeIfAbsent(commandId, id -> Framework.getService(BulkService.class).getCommand(id)); 125 } 126 127 public BulkCommand getCurrentCommand() { 128 return command; 129 } 130 131 protected void processBatchOfDocuments(List<String> batch) { 132 if (batch == null || batch.isEmpty()) { 133 return; 134 } 135 TransactionHelper.runInTransaction(() -> { 136 try { 137 String username = command.getUsername(); 138 String repository = command.getRepository(); 139 try (NuxeoLoginContext ignored = loginSystemOrUser(username)) { 140 CoreSession session = repository == null ? null : CoreInstance.getCoreSession(repository); 141 compute(session, batch, command.getParams()); 142 } 143 } catch (LoginException e) { 144 throw new NuxeoException(e); 145 } 146 }); 147 } 148 149 protected NuxeoLoginContext loginSystemOrUser(String username) throws LoginException { 150 return SYSTEM_USERNAME.equals(username) ? Framework.loginSystem() : Framework.loginUser(username); 151 } 152 153 /** 154 * Can be overridden to init stuff before processing the bucket 155 */ 156 public void startBucket(String bucketKey) { 157 // nothing to do 158 } 159 160 /** 161 * Can be overridden to write to downstream computation or add results to status 162 */ 163 public void endBucket(ComputationContext context, BulkStatus delta) { 164 updateStatus(context, delta); 165 } 166 167 @Override 168 public void processFailure(ComputationContext context, Throwable failure) { 169 log.error(String.format("Action: %s fails on record: %s after retries.", metadata.name(), 170 context.getLastOffset()), failure); 171 // The delta will be send only if the policy is set with continueOnFailure = true 172 delta.inError(metadata.name() + " fails on " + context.getLastOffset() + ": " + failure.getMessage()); 173 endBucket(context, delta); 174 } 175 176 public static void updateStatus(ComputationContext context, BulkStatus delta) { 177 context.produceRecord(OUTPUT_1, delta.getId(), BulkCodecs.getStatusCodec().encode(delta)); 178 } 179 180 protected abstract void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties); 181 182 /** 183 * Helper to load a list of documents. Documents without read access or that does not exists are not returned. 184 */ 185 public DocumentModelList loadDocuments(CoreSession session, List<String> documentIds) { 186 if (documentIds == null || documentIds.isEmpty()) { 187 return new DocumentModelListImpl(0); 188 } 189 return session.query(String.format(SELECT_DOCUMENTS_IN, String.join("', '", documentIds))); 190 } 191}