001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.bulk.actions; 021 022import static org.nuxeo.ecm.core.bulk.BulkRecords.commandIdFrom; 023import static org.nuxeo.ecm.core.bulk.BulkRecords.docIdsFrom; 024import static org.nuxeo.ecm.core.bulk.StreamBulkProcessor.AVRO_CODEC; 025import static org.nuxeo.ecm.core.bulk.StreamBulkProcessor.COUNTER_STREAM_NAME; 026 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Map; 031 032import javax.security.auth.login.LoginContext; 033import javax.security.auth.login.LoginException; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.api.CloseableCoreSession; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.DocumentModel; 040import org.nuxeo.ecm.core.api.IdRef; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.bulk.BulkCommand; 043import org.nuxeo.ecm.core.bulk.BulkCommands; 044import org.nuxeo.ecm.core.bulk.BulkCounter; 045import org.nuxeo.lib.stream.codec.Codec; 046import org.nuxeo.lib.stream.computation.AbstractComputation; 047import org.nuxeo.lib.stream.computation.ComputationContext; 048import org.nuxeo.lib.stream.computation.Record; 049import org.nuxeo.lib.stream.computation.Topology; 050import org.nuxeo.runtime.api.Framework; 051import org.nuxeo.runtime.codec.CodecService; 052import org.nuxeo.runtime.stream.StreamProcessorTopology; 053import org.nuxeo.runtime.transaction.TransactionHelper; 054 055/** 056 * @since 10.2 057 */ 058// TODO refactor this computation when the batch policy is introduced 059public class SetPropertiesAction implements StreamProcessorTopology { 060 061 private static final Log log = LogFactory.getLog(SetPropertiesAction.class); 062 063 public static final String COMPUTATION_NAME = "SetProperties"; 064 065 public static final String STREAM_NAME = "setProperties"; 066 067 public static final String BATCH_SIZE_OPT = "batchSize"; 068 069 public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs"; 070 071 public static final int DEFAULT_BATCH_SIZE = 10; 072 073 public static final int DEFAULT_BATCH_THRESHOLD_MS = 200; 074 075 @Override 076 public Topology getTopology(Map<String, String> options) { 077 int batchSize = getOptionAsInteger(options, BATCH_SIZE_OPT, DEFAULT_BATCH_SIZE); 078 int batchThresholdMs = getOptionAsInteger(options, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS); 079 return Topology.builder() 080 .addComputation(() -> new SetPropertyComputation(COMPUTATION_NAME, batchSize, batchThresholdMs), 081 Arrays.asList("i1:" + STREAM_NAME, "o1:" + COUNTER_STREAM_NAME)) 082 .build(); 083 } 084 085 public static class SetPropertyComputation extends AbstractComputation { 086 087 protected final int batchSize; 088 089 protected final int batchThresholdMs; 090 091 protected final List<String> documentIds; 092 093 protected String currentCommandId; 094 095 protected BulkCommand currentCommand; 096 097 public SetPropertyComputation(String name, int batchSize, int batchThresholdMs) { 098 super(name, 1, 1); 099 this.batchSize = batchSize; 100 this.batchThresholdMs = batchThresholdMs; 101 documentIds = new ArrayList<>(batchSize); 102 } 103 104 @Override 105 public void init(ComputationContext context) { 106 log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms", 107 COMPUTATION_NAME, STREAM_NAME, batchSize, batchThresholdMs)); 108 context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs); 109 } 110 111 @Override 112 public void processTimer(ComputationContext context, String key, long timestamp) { 113 processBatch(context); 114 context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs); 115 } 116 117 @Override 118 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 119 String commandId = commandIdFrom(record); 120 if (currentCommandId == null) { 121 // first time we need to process something 122 loadCurrentBulkCommandContext(commandId); 123 } else if (!currentCommandId.equals(commandId)) { 124 // new bulk id computation - send remaining elements 125 processBatch(context); 126 documentIds.clear(); 127 loadCurrentBulkCommandContext(commandId); 128 } 129 // process record 130 documentIds.addAll(docIdsFrom(record)); 131 if (documentIds.size() >= batchSize) { 132 processBatch(context); 133 } 134 } 135 136 protected void loadCurrentBulkCommandContext(String commandId) { 137 currentCommandId = commandId; 138 currentCommand = BulkCommands.fromKVStore(commandId); 139 } 140 141 @Override 142 public void destroy() { 143 log.debug(String.format("Destroy computation: %s, pending entries: %d", COMPUTATION_NAME, 144 documentIds.size())); 145 } 146 147 protected void processBatch(ComputationContext context) { 148 if (!documentIds.isEmpty()) { 149 TransactionHelper.runInTransaction(() -> { 150 // for setProperties, parameters are properties to set 151 Map<String, String> properties = currentCommand.getParams(); 152 LoginContext loginContext; 153 try { 154 loginContext = Framework.loginAsUser(currentCommand.getUsername()); 155 156 try (CloseableCoreSession session = CoreInstance.openCoreSession( 157 currentCommand.getRepository())) { 158 for (String docId : documentIds) { 159 DocumentModel doc = session.getDocument(new IdRef(docId)); 160 properties.forEach(doc::setPropertyValue); 161 session.saveDocument(doc); 162 } 163 } finally { 164 loginContext.logout(); 165 } 166 } catch (LoginException e) { 167 throw new NuxeoException(e); 168 } 169 }); 170 BulkCounter counter = new BulkCounter(currentCommandId, (long) documentIds.size()); 171 Codec<BulkCounter> counterCodec = Framework.getService(CodecService.class).getCodec(AVRO_CODEC, 172 BulkCounter.class); 173 context.produceRecord("o1", currentCommandId, counterCodec.encode(counter)); 174 documentIds.clear(); 175 context.askForCheckpoint(); 176 } 177 } 178 } 179 180 protected int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) { 181 String value = options.get(option); 182 return value == null ? defaultValue : Integer.parseInt(value); 183 } 184}