001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *       Kevin Leturc <kleturc@nuxeo.com>
018 */
019package org.nuxeo.ecm.core.bulk;
020
021import static org.apache.commons.lang3.StringUtils.isBlank;
022import static org.apache.commons.lang3.StringUtils.isEmpty;
023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED;
025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.RUNNING;
026import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCHEDULED;
027import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN;
028
029import java.io.Serializable;
030import java.time.Duration;
031import java.time.Instant;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicLong;
037import java.util.stream.Collectors;
038
039import org.apache.commons.collections4.map.PassiveExpiringMap;
040import org.apache.logging.log4j.Logger;
041import org.nuxeo.ecm.core.api.repository.RepositoryManager;
042import org.nuxeo.ecm.core.api.scroll.ScrollService;
043import org.nuxeo.ecm.core.bulk.message.BulkBucket;
044import org.nuxeo.ecm.core.bulk.message.BulkCommand;
045import org.nuxeo.ecm.core.bulk.message.BulkStatus;
046import org.nuxeo.ecm.core.scroll.DocumentScrollRequest;
047import org.nuxeo.ecm.core.scroll.GenericScrollRequest;
048import org.nuxeo.lib.stream.computation.Record;
049import org.nuxeo.lib.stream.log.LogAppender;
050import org.nuxeo.lib.stream.log.LogManager;
051import org.nuxeo.lib.stream.log.Name;
052import org.nuxeo.runtime.api.Framework;
053import org.nuxeo.runtime.kv.KeyValueService;
054import org.nuxeo.runtime.kv.KeyValueStore;
055import org.nuxeo.runtime.kv.KeyValueStoreProvider;
056import org.nuxeo.runtime.stream.StreamService;
057
058/**
059 * Basic implementation of {@link BulkService}.
060 *
061 * @since 10.2
062 */
063public class BulkServiceImpl implements BulkService {
064
065    private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(BulkServiceImpl.class);
066
067    // @deprecated since 11.1 log config is not needed anymore
068    @Deprecated
069    public static final String BULK_LOG_MANAGER_NAME = "bulk";
070
071    public static final String BULK_KV_STORE_NAME = "bulk";
072
073    public static final String COMMAND_STREAM = "bulk/command";
074
075    // @since 11.1
076    public static final Name COMMAND_STREAM_NAME = Name.ofUrn(COMMAND_STREAM);
077
078    public static final String STATUS_STREAM = "bulk/status";
079
080    // @since 11.1
081    public static final Name STATUS_STREAM_NAME = Name.ofUrn(STATUS_STREAM);
082
083    public static final String DONE_STREAM = "bulk/done";
084
085    // @since 11.1
086    public static final Name DONE_STREAM_NAME = Name.ofUrn(DONE_STREAM);
087
088    public static final String COMMAND_PREFIX = "command:";
089
090    // @deprecated since 11.4 not needed anymore
091    @Deprecated(since = "11.4")
092    public static final String RECORD_CODEC = "avro";
093
094    public static final String STATUS_PREFIX = "status:";
095
096    public static final String PRODUCE_IMMEDIATE_OPTION = "produceImmediate";
097
098    // How long we keep the command and its status in the kv store once completed
099    public static final long COMPLETED_TTL_SECONDS = 3_600;
100
101    // How long we keep the command and its status in the kv store once aborted
102    public static final long ABORTED_TTL_SECONDS = 7_200;
103
104    // @since 11.3
105    protected final AtomicLong externalScrollerCounter = new AtomicLong();
106
107    // @since 11.3
108    protected final Map<String, BulkCommand> externalCommands = new PassiveExpiringMap<>(60, TimeUnit.SECONDS);
109
110    @Override
111    public String submit(BulkCommand command) {
112        log.debug("Run action with command={}", command);
113        // check command
114        BulkAdminService adminService = Framework.getService(BulkAdminService.class);
115        if (!adminService.getActions().contains(command.getAction())) {
116            throw new IllegalArgumentException("Unknown action for command: " + command);
117        }
118        BulkActionValidation actionValidation = adminService.getActionValidation(command.getAction());
119
120        // Try to validate the action if a validation class is provided
121        if (actionValidation != null) {
122            actionValidation.validate(command);
123        }
124        RepositoryManager repoManager = Framework.getService(RepositoryManager.class);
125        if (repoManager != null) {
126            if (isEmpty(command.getRepository())) {
127                command.setRepository(repoManager.getDefaultRepositoryName());
128            } else if (repoManager.getRepository(command.getRepository()) == null) {
129                throw new IllegalArgumentException("Unknown repository: " + command);
130            }
131        }
132        if (command.getBucketSize() == 0 || command.getBatchSize() == 0) {
133            if (command.getBucketSize() == 0) {
134                command.setBucketSize(adminService.getBucketSize(command.getAction()));
135            }
136            if (command.getBatchSize() == 0) {
137                command.setBatchSize(adminService.getBatchSize(command.getAction()));
138            }
139        }
140        if (command.getQueryLimit() == null) {
141            command.setQueryLimit(adminService.getQueryLimit(command.getAction()));
142        }
143        if (command.getScroller() == null && !command.useExternalScroller()) {
144            String actionScroller = adminService.getDefaultScroller(command.getAction());
145            if (!isBlank(actionScroller)) {
146                command.setScroller(actionScroller);
147            }
148        }
149        checkIfScrollerExists(command);
150
151        // store the bulk command and status in the key/value store
152        BulkStatus status = new BulkStatus(command.getId());
153        status.setState(SCHEDULED);
154        status.setAction(command.getAction());
155        status.setUsername(command.getUsername());
156        status.setSubmitTime(Instant.now());
157        setStatus(status);
158        byte[] commandAsBytes = setCommand(command);
159
160        String shardKey;
161        if (adminService.isSequentialCommands(command.getAction())) {
162            // no concurrency all commands for this action goes to the same partition
163            shardKey = command.getAction();
164        } else {
165            // use a random value
166            shardKey = command.getId();
167        }
168        // send command to bulk processor
169        log.debug("Submit action with command: {}", command);
170        return submit(shardKey, command.getId(), commandAsBytes);
171    }
172
173    protected void checkIfScrollerExists(BulkCommand command) {
174        ScrollService scrollService = Framework.getService(ScrollService.class);
175        if (command.useExternalScroller()) {
176            // nothing to do
177        } else if (command.useGenericScroller()) {
178            if (!scrollService.exists(
179                    GenericScrollRequest.builder(command.getScroller(), command.getQuery()).build())) {
180                throw new IllegalArgumentException("Unknown Generic Scroller for command: " + command);
181            }
182        } else if (!scrollService.exists(
183                DocumentScrollRequest.builder(command.getQuery()).name(command.getScroller()).build())) {
184            throw new IllegalArgumentException("Unknown Document Scroller for command: " + command);
185        }
186    }
187
188    @SuppressWarnings("resource") // LogManager not ours to close
189    protected String submit(String shardKey, String key, byte[] bytes) {
190        LogManager logManager = Framework.getService(StreamService.class).getLogManager();
191        LogAppender<Record> logAppender = logManager.getAppender(COMMAND_STREAM_NAME);
192        Record record = Record.of(key, bytes);
193        log.debug("Append shardKey: {}, record: {}", shardKey, record);
194        logAppender.append(shardKey, record);
195        return key;
196    }
197
198    @Override
199    public BulkStatus getStatus(String commandId) {
200        KeyValueStore keyValueStore = getKvStore();
201        byte[] statusAsBytes = keyValueStore.get(STATUS_PREFIX + commandId);
202        if (statusAsBytes == null) {
203            log.debug("Request status of unknown command: {}", commandId);
204            return BulkStatus.unknownOf(commandId);
205        }
206        return BulkCodecs.getStatusCodec().decode(statusAsBytes);
207    }
208
209    /**
210     * Stores the status in the kv store returns the encoded status
211     */
212    public byte[] setStatus(BulkStatus status) {
213        KeyValueStore kvStore = getKvStore();
214        byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode(status);
215        switch (status.getState()) {
216        case ABORTED:
217            kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, ABORTED_TTL_SECONDS);
218            // we remove the command from the kv store, so computation have to handle abort
219            kvStore.put(COMMAND_PREFIX + status.getId(), (String) null);
220            break;
221        case COMPLETED:
222            kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, COMPLETED_TTL_SECONDS);
223            kvStore.setTTL(COMMAND_PREFIX + status.getId(), COMPLETED_TTL_SECONDS);
224            break;
225        default:
226            kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes);
227        }
228        return statusAsBytes;
229    }
230
231    @Override
232    public BulkCommand getCommand(String commandId) {
233        KeyValueStore keyValueStore = getKvStore();
234        byte[] statusAsBytes = keyValueStore.get(COMMAND_PREFIX + commandId);
235        if (statusAsBytes == null) {
236            return null;
237        }
238        return BulkCodecs.getCommandCodec().decode(statusAsBytes);
239    }
240
241    @Override
242    public BulkStatus abort(String commandId) {
243        BulkStatus status = getStatus(commandId);
244        if (COMPLETED.equals(status.getState())) {
245            log.debug("Cannot abort a completed command: {}", commandId);
246            return status;
247        }
248        status.setState(ABORTED);
249        // set the status in the KV store
250        setStatus(status);
251        // Send a delta to the status computation
252        BulkStatus delta = BulkStatus.deltaOf(commandId);
253        delta.setCompletedTime(Instant.now());
254        delta.setState(ABORTED);
255        Record record = Record.of(commandId, BulkCodecs.getStatusCodec().encode(delta));
256        Framework.getService(StreamService.class).getStreamManager().append(STATUS_STREAM, record);
257        return status;
258    }
259
260    @Override
261    public Map<String, Serializable> getResult(String commandId) {
262        return getStatus(commandId).getResult();
263    }
264
265    /**
266     * Stores the command in the kv store, returns the encoded command.
267     */
268    public byte[] setCommand(BulkCommand command) {
269        KeyValueStore kvStore = getKvStore();
270        byte[] commandAsBytes = BulkCodecs.getCommandCodec().encode(command);
271        kvStore.put(COMMAND_PREFIX + command.getId(), commandAsBytes);
272        return commandAsBytes;
273    }
274
275    @Override
276    public boolean await(String commandId, Duration duration) throws InterruptedException {
277        long deadline = System.currentTimeMillis() + duration.toMillis();
278        BulkStatus status;
279        do {
280            status = getStatus(commandId);
281            switch (status.getState()) {
282            case COMPLETED:
283            case ABORTED:
284                return true;
285            case UNKNOWN:
286                log.error("Unknown status for command: {}", commandId);
287                return false;
288            default:
289                // continue
290            }
291            Thread.sleep(100);
292        } while (deadline > System.currentTimeMillis());
293        log.debug("await timeout on {} after {} ms", () -> getStatus(commandId), duration::toMillis);
294        return false;
295    }
296
297    public KeyValueStore getKvStore() {
298        return Framework.getService(KeyValueService.class).getKeyValueStore(BULK_KV_STORE_NAME);
299    }
300
301    @Override
302    public boolean await(Duration duration) throws InterruptedException {
303        KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore();
304        Set<String> commandIds = kv.keyStream(STATUS_PREFIX)
305                                   .map(k -> k.replaceFirst(STATUS_PREFIX, ""))
306                                   .collect(Collectors.toSet());
307        log.debug("Wait for command ids: {}", commandIds);
308        // nanoTime is always monotonous
309        long deadline = System.nanoTime() + duration.toNanos();
310        for (String commandId : commandIds) {
311            log.debug("Wait for command id: {}", commandId);
312            for (;;) {
313                BulkStatus status = getStatus(commandId);
314                log.debug("Status of command: {} = {}", commandId, status);
315                BulkStatus.State state = status.getState();
316                log.debug("State of command: {} = {}", commandId, state);
317                if (state == COMPLETED || state == ABORTED || state == UNKNOWN) {
318                    break;
319                }
320                Thread.sleep(200);
321                if (deadline < System.nanoTime()) {
322                    log.debug("await timeout, at least one uncompleted command: {}", status);
323                    return false;
324                }
325            }
326        }
327        return true;
328    }
329
330    @Override
331    public List<BulkStatus> getStatuses(String username) {
332        KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore();
333        return kv.keyStream(STATUS_PREFIX)
334                 .map(kv::get)
335                 .map(BulkCodecs.getStatusCodec()::decode)
336                 .filter(status -> username.equals(status.getUsername()))
337                 .collect(Collectors.toList());
338    }
339
340    @Override
341    public void appendExternalBucket(BulkBucket bucket) {
342        String commandId = bucket.getCommandId();
343
344        BulkCommand command = externalCommands.computeIfAbsent(commandId, this::getCommand);
345        String stream = Framework.getService(BulkAdminService.class).getInputStream(command.getAction());
346
347        String key = commandId + ":" + externalScrollerCounter.incrementAndGet();
348        Record record = Record.of(key, BulkCodecs.getBucketCodec().encode(bucket));
349
350        log.debug("Append key: {}, record: {}", key, record);
351        Framework.getService(StreamService.class).getStreamManager().append(stream, record);
352    }
353
354    @Override
355    public void completeExternalScroll(String commandId, long count) {
356        BulkStatus delta = BulkStatus.deltaOf(commandId);
357        delta.setState(RUNNING);
358        delta.setScrollEndTime(Instant.now());
359        delta.setTotal(count);
360
361        Record record = Record.of(commandId, BulkCodecs.getStatusCodec().encode(delta));
362
363        log.debug("Complete external scroll with key: {}, count: {}, record: {}", commandId, count, record);
364        Framework.getService(StreamService.class).getStreamManager().append(STATUS_STREAM, record);
365    }
366}