001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019
020package org.nuxeo.ecm.core.bulk.actions;
021
022import static org.nuxeo.ecm.core.bulk.BulkRecords.commandIdFrom;
023import static org.nuxeo.ecm.core.bulk.BulkRecords.docIdsFrom;
024import static org.nuxeo.ecm.core.bulk.StreamBulkProcessor.AVRO_CODEC;
025import static org.nuxeo.ecm.core.bulk.StreamBulkProcessor.COUNTER_STREAM_NAME;
026
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Map;
031
032import javax.security.auth.login.LoginContext;
033import javax.security.auth.login.LoginException;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.api.CloseableCoreSession;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.DocumentModel;
040import org.nuxeo.ecm.core.api.IdRef;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.bulk.BulkCommand;
043import org.nuxeo.ecm.core.bulk.BulkCommands;
044import org.nuxeo.ecm.core.bulk.BulkCounter;
045import org.nuxeo.lib.stream.codec.Codec;
046import org.nuxeo.lib.stream.computation.AbstractComputation;
047import org.nuxeo.lib.stream.computation.ComputationContext;
048import org.nuxeo.lib.stream.computation.Record;
049import org.nuxeo.lib.stream.computation.Topology;
050import org.nuxeo.runtime.api.Framework;
051import org.nuxeo.runtime.codec.CodecService;
052import org.nuxeo.runtime.stream.StreamProcessorTopology;
053import org.nuxeo.runtime.transaction.TransactionHelper;
054
055/**
056 * @since 10.2
057 */
058// TODO refactor this computation when the batch policy is introduced
059public class SetPropertiesAction implements StreamProcessorTopology {
060
061    private static final Log log = LogFactory.getLog(SetPropertiesAction.class);
062
063    public static final String COMPUTATION_NAME = "SetProperties";
064
065    public static final String STREAM_NAME = "setProperties";
066
067    public static final String BATCH_SIZE_OPT = "batchSize";
068
069    public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs";
070
071    public static final int DEFAULT_BATCH_SIZE = 10;
072
073    public static final int DEFAULT_BATCH_THRESHOLD_MS = 200;
074
075    @Override
076    public Topology getTopology(Map<String, String> options) {
077        int batchSize = getOptionAsInteger(options, BATCH_SIZE_OPT, DEFAULT_BATCH_SIZE);
078        int batchThresholdMs = getOptionAsInteger(options, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS);
079        return Topology.builder()
080                       .addComputation(() -> new SetPropertyComputation(COMPUTATION_NAME, batchSize, batchThresholdMs),
081                               Arrays.asList("i1:" + STREAM_NAME, "o1:" + COUNTER_STREAM_NAME))
082                       .build();
083    }
084
085    public static class SetPropertyComputation extends AbstractComputation {
086
087        protected final int batchSize;
088
089        protected final int batchThresholdMs;
090
091        protected final List<String> documentIds;
092
093        protected String currentCommandId;
094
095        protected BulkCommand currentCommand;
096
097        public SetPropertyComputation(String name, int batchSize, int batchThresholdMs) {
098            super(name, 1, 1);
099            this.batchSize = batchSize;
100            this.batchThresholdMs = batchThresholdMs;
101            documentIds = new ArrayList<>(batchSize);
102        }
103
104        @Override
105        public void init(ComputationContext context) {
106            log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms",
107                    COMPUTATION_NAME, STREAM_NAME, batchSize, batchThresholdMs));
108            context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs);
109        }
110
111        @Override
112        public void processTimer(ComputationContext context, String key, long timestamp) {
113            processBatch(context);
114            context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs);
115        }
116
117        @Override
118        public void processRecord(ComputationContext context, String inputStreamName, Record record) {
119            String commandId = commandIdFrom(record);
120            if (currentCommandId == null) {
121                // first time we need to process something
122                loadCurrentBulkCommandContext(commandId);
123            } else if (!currentCommandId.equals(commandId)) {
124                // new bulk id computation - send remaining elements
125                processBatch(context);
126                documentIds.clear();
127                loadCurrentBulkCommandContext(commandId);
128            }
129            // process record
130            documentIds.addAll(docIdsFrom(record));
131            if (documentIds.size() >= batchSize) {
132                processBatch(context);
133            }
134        }
135
136        protected void loadCurrentBulkCommandContext(String commandId) {
137            currentCommandId = commandId;
138            currentCommand = BulkCommands.fromKVStore(commandId);
139        }
140
141        @Override
142        public void destroy() {
143            log.debug(String.format("Destroy computation: %s, pending entries: %d", COMPUTATION_NAME,
144                    documentIds.size()));
145        }
146
147        protected void processBatch(ComputationContext context) {
148            if (!documentIds.isEmpty()) {
149                TransactionHelper.runInTransaction(() -> {
150                    // for setProperties, parameters are properties to set
151                    Map<String, String> properties = currentCommand.getParams();
152                    LoginContext loginContext;
153                    try {
154                        loginContext = Framework.loginAsUser(currentCommand.getUsername());
155
156                        try (CloseableCoreSession session = CoreInstance.openCoreSession(
157                                currentCommand.getRepository())) {
158                            for (String docId : documentIds) {
159                                DocumentModel doc = session.getDocument(new IdRef(docId));
160                                properties.forEach(doc::setPropertyValue);
161                                session.saveDocument(doc);
162                            }
163                        } finally {
164                            loginContext.logout();
165                        }
166                    } catch (LoginException e) {
167                        throw new NuxeoException(e);
168                    }
169                });
170                BulkCounter counter = new BulkCounter(currentCommandId, (long) documentIds.size());
171                Codec<BulkCounter> counterCodec = Framework.getService(CodecService.class).getCodec(AVRO_CODEC,
172                        BulkCounter.class);
173                context.produceRecord("o1", currentCommandId, counterCodec.encode(counter));
174                documentIds.clear();
175                context.askForCheckpoint();
176            }
177        }
178    }
179
180    protected int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) {
181        String value = options.get(option);
182        return value == null ? defaultValue : Integer.parseInt(value);
183    }
184}