001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *       Kevin Leturc <kleturc@nuxeo.com>
018 */
019package org.nuxeo.ecm.core.bulk;
020
021import static org.apache.commons.lang3.StringUtils.isEmpty;
022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED;
023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED;
024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCHEDULED;
025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN;
026
027import java.io.Serializable;
028import java.time.Duration;
029import java.time.Instant;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.stream.Collectors;
034
035import org.apache.logging.log4j.Logger;
036import org.nuxeo.ecm.core.api.repository.RepositoryManager;
037import org.nuxeo.ecm.core.bulk.message.BulkCommand;
038import org.nuxeo.ecm.core.bulk.message.BulkStatus;
039import org.nuxeo.lib.stream.computation.Record;
040import org.nuxeo.lib.stream.log.LogAppender;
041import org.nuxeo.lib.stream.log.LogManager;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.codec.CodecService;
044import org.nuxeo.runtime.kv.KeyValueService;
045import org.nuxeo.runtime.kv.KeyValueStore;
046import org.nuxeo.runtime.kv.KeyValueStoreProvider;
047import org.nuxeo.runtime.stream.StreamService;
048
049/**
050 * Basic implementation of {@link BulkService}.
051 *
052 * @since 10.2
053 */
054public class BulkServiceImpl implements BulkService {
055
056    private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(BulkServiceImpl.class);
057
058    public static final String BULK_LOG_MANAGER_NAME = "bulk";
059
060    public static final String BULK_KV_STORE_NAME = "bulk";
061
062    public static final String COMMAND_STREAM = "command";
063
064    public static final String STATUS_STREAM = "status";
065
066    public static final String DONE_STREAM = "done";
067
068    public static final String RECORD_CODEC = "avro";
069
070    public static final String COMMAND_PREFIX = "command:";
071
072    public static final String STATUS_PREFIX = "status:";
073
074    public static final String PRODUCE_IMMEDIATE_OPTION = "produceImmediate";
075
076    // How long we keep the command and its status in the kv store once completed
077    public static final long COMPLETED_TTL_SECONDS = 3_600;
078
079    // How long we keep the command and its status in the kv store once aborted
080    public static final long ABORTED_TTL_SECONDS = 7_200;
081
082    @Override
083    public String submit(BulkCommand command) {
084        log.debug("Run action with command={}", command);
085        // check command
086        BulkAdminService adminService = Framework.getService(BulkAdminService.class);
087        if (!adminService.getActions().contains(command.getAction())) {
088            throw new IllegalArgumentException("Unknown action for command: " + command);
089        }
090        BulkActionValidation actionValidation = adminService.getActionValidation(command.getAction());
091
092        // Try to validate the action if a validation class is provided
093        if (actionValidation != null) {
094            actionValidation.validate(command);
095        }
096
097        RepositoryManager repoManager = Framework.getService(RepositoryManager.class);
098        if (isEmpty(command.getRepository())) {
099            command.setRepository(repoManager.getDefaultRepositoryName());
100        } else {
101            if (repoManager.getRepository(command.getRepository()) == null) {
102                throw new IllegalArgumentException("Unknown repository: " + command);
103            }
104        }
105        if (command.getBucketSize() == 0 || command.getBatchSize() == 0) {
106
107            if (command.getBucketSize() == 0) {
108                command.setBucketSize(adminService.getBucketSize(command.getAction()));
109            }
110            if (command.getBatchSize() == 0) {
111                command.setBatchSize(adminService.getBatchSize(command.getAction()));
112            }
113        }
114
115        // store the bulk command and status in the key/value store
116        BulkStatus status = new BulkStatus(command.getId());
117        status.setState(SCHEDULED);
118        status.setAction(command.getAction());
119        status.setUsername(command.getUsername());
120        status.setSubmitTime(Instant.now());
121        setStatus(status);
122        byte[] commandAsBytes = setCommand(command);
123
124        String shardKey;
125        if (adminService.isSequentialCommands(command.getAction())) {
126            // no concurrency all commands for this action goes to the same partition
127            shardKey = command.getAction();
128        } else {
129            // use a random value
130            shardKey = command.getId();
131        }
132        // send command to bulk processor
133        LogManager logManager = Framework.getService(StreamService.class).getLogManager(BULK_LOG_MANAGER_NAME);
134        LogAppender<Record> logAppender = logManager.getAppender(COMMAND_STREAM,
135                Framework.getService(CodecService.class).getCodec(RECORD_CODEC, Record.class));
136        logAppender.append(shardKey, Record.of(command.getId(), commandAsBytes));
137        return command.getId();
138    }
139
140    @Override
141    public BulkStatus getStatus(String commandId) {
142        KeyValueStore keyValueStore = getKvStore();
143        byte[] statusAsBytes = keyValueStore.get(STATUS_PREFIX + commandId);
144        if (statusAsBytes == null) {
145            log.debug("Request status of unknown command: {}", commandId);
146            return BulkStatus.unknownOf(commandId);
147        }
148        return BulkCodecs.getStatusCodec().decode(statusAsBytes);
149    }
150
151    /**
152     * Stores the status in the kv store returns the encoded status
153     */
154    public byte[] setStatus(BulkStatus status) {
155        KeyValueStore kvStore = getKvStore();
156        byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode(status);
157        switch (status.getState()) {
158        case ABORTED:
159            kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, ABORTED_TTL_SECONDS);
160            // we remove the command from the kv store, so computation have to handle abort
161            kvStore.put(COMMAND_PREFIX + status.getId(), (String) null);
162            break;
163        case COMPLETED:
164            kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, COMPLETED_TTL_SECONDS);
165            kvStore.setTTL(COMMAND_PREFIX + status.getId(), COMPLETED_TTL_SECONDS);
166            break;
167        default:
168            kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes);
169        }
170        return statusAsBytes;
171    }
172
173    @Override
174    public BulkCommand getCommand(String commandId) {
175        KeyValueStore keyValueStore = getKvStore();
176        byte[] statusAsBytes = keyValueStore.get(COMMAND_PREFIX + commandId);
177        if (statusAsBytes == null) {
178            return null;
179        }
180        return BulkCodecs.getCommandCodec().decode(statusAsBytes);
181    }
182
183    @Override
184    public BulkStatus abort(String commandId) {
185        BulkStatus status = getStatus(commandId);
186        if (COMPLETED.equals(status.getState())) {
187            log.debug("Cannot abort a completed command: {}", commandId);
188            return status;
189        }
190        status.setState(ABORTED);
191        // set the status in the KV store
192        setStatus(status);
193        // Send a delta to the status computation
194        BulkStatus delta = BulkStatus.deltaOf(commandId);
195        delta.setCompletedTime(Instant.now());
196        delta.setState(ABORTED);
197        byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode(delta);
198        LogManager logManager = Framework.getService(StreamService.class).getLogManager(BULK_LOG_MANAGER_NAME);
199        LogAppender<Record> logAppender = logManager.getAppender(STATUS_STREAM);
200        logAppender.append(commandId, Record.of(commandId, statusAsBytes));
201        return status;
202    }
203
204    @Override
205    public Map<String, Serializable> getResult(String commandId) {
206        return getStatus(commandId).getResult();
207    }
208
209    /**
210     * Stores the command in the kv store, returns the encoded command.
211     */
212    public byte[] setCommand(BulkCommand command) {
213        KeyValueStore kvStore = getKvStore();
214        byte[] commandAsBytes = BulkCodecs.getCommandCodec().encode(command);
215        kvStore.put(COMMAND_PREFIX + command.getId(), commandAsBytes);
216        return commandAsBytes;
217    }
218
219    @Override
220    public boolean await(String commandId, Duration duration) throws InterruptedException {
221        long deadline = System.currentTimeMillis() + duration.toMillis();
222        BulkStatus status;
223        do {
224            status = getStatus(commandId);
225            switch (status.getState()) {
226            case COMPLETED:
227            case ABORTED:
228                return true;
229            case UNKNOWN:
230                log.error("Unknown status for command: {}", commandId);
231                return false;
232            default:
233                // continue
234            }
235            Thread.sleep(100);
236        } while (deadline > System.currentTimeMillis());
237        log.debug("await timeout on {} after {} ms", () -> getStatus(commandId), duration::toMillis);
238        return false;
239    }
240
241    public KeyValueStore getKvStore() {
242        return Framework.getService(KeyValueService.class).getKeyValueStore(BULK_KV_STORE_NAME);
243    }
244
245    @Override
246    public boolean await(Duration duration) throws InterruptedException {
247        KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore();
248        Set<String> commandIds = kv.keyStream(STATUS_PREFIX)
249                                   .map(k -> k.replaceFirst(STATUS_PREFIX, ""))
250                                   .collect(Collectors.toSet());
251        // nanoTime is always monotonous
252        long deadline = System.nanoTime() + duration.toNanos();
253        for (String commandId : commandIds) {
254            for (;;) {
255                BulkStatus status = getStatus(commandId);
256                BulkStatus.State state = status.getState();
257                if (state == COMPLETED || state == ABORTED || state == UNKNOWN) {
258                    break;
259                }
260                Thread.sleep(200);
261                if (deadline < System.nanoTime()) {
262                    log.debug("await timeout, at least one uncompleted command: {}", status);
263                    return false;
264                }
265            }
266        }
267        return true;
268    }
269
270    @Override
271    public List<BulkStatus> getStatuses(String username) {
272        KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore();
273        return kv.keyStream(STATUS_PREFIX)
274                 .map(kv::get)
275                 .map(BulkCodecs.getStatusCodec()::decode)
276                 .filter(status -> username.equals(status.getUsername()))
277                 .collect(Collectors.toList());
278    }
279
280}