001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *       Kevin Leturc <kleturc@nuxeo.com>
018 */
019package org.nuxeo.ecm.core.bulk;
020
021import static org.apache.commons.lang3.StringUtils.isEmpty;
022import static org.nuxeo.ecm.core.bulk.BulkComponent.BULK_KV_STORE_NAME;
023import static org.nuxeo.ecm.core.bulk.BulkComponent.BULK_LOG_MANAGER_NAME;
024import static org.nuxeo.ecm.core.bulk.BulkStatus.State.COMPLETED;
025import static org.nuxeo.ecm.core.bulk.BulkStatus.State.SCHEDULED;
026
027import java.time.Duration;
028import java.time.Instant;
029import java.util.UUID;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.bulk.BulkStatus.State;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.lib.stream.log.LogAppender;
036import org.nuxeo.lib.stream.log.LogManager;
037import org.nuxeo.runtime.api.Framework;
038import org.nuxeo.runtime.kv.KeyValueService;
039import org.nuxeo.runtime.kv.KeyValueStore;
040import org.nuxeo.runtime.stream.StreamService;
041
042/**
043 * Basic implementation of {@link BulkService}.
044 *
045 * @since 10.2
046 */
047public class BulkServiceImpl implements BulkService {
048
049    private static final Log log = LogFactory.getLog(BulkServiceImpl.class);
050
051    protected static final String SET_STREAM_NAME = "documentSet";
052
053    protected static final String COMMAND = ":command";
054
055    protected static final String SUBMIT_TIME = ":submitTime";
056
057    protected static final String SCROLL_START_TIME = ":scrollStartTime";
058
059    protected static final String SCROLL_END_TIME = ":scrollEndTime";
060
061    protected static final String STATE = ":state";
062
063    protected static final String PROCESSED_DOCUMENTS = ":processedDocs";
064
065    protected static final String SCROLLED_DOCUMENT_COUNT = ":count";
066
067    @Override
068    public String submit(BulkCommand command) {
069        if (log.isDebugEnabled()) {
070            log.debug("Run action with command=" + command);
071        }
072        // check command
073        if (isEmpty(command.getRepository()) || isEmpty(command.getQuery()) || isEmpty(command.getAction())) {
074            throw new IllegalArgumentException("Missing mandatory values");
075        }
076        // create the command id and status
077        String commandId = UUID.randomUUID().toString();
078        byte[] commandAsBytes = BulkCommands.toBytes(command);
079
080        // store the bulk command and status in the key/value store
081        KeyValueStore keyValueStore = getKvStore();
082        keyValueStore.put(commandId + STATE, SCHEDULED.toString());
083        keyValueStore.put(commandId + SUBMIT_TIME, Instant.now().toEpochMilli());
084        keyValueStore.put(commandId + COMMAND, commandAsBytes);
085
086        // send it to nuxeo-stream
087        LogManager logManager = Framework.getService(StreamService.class).getLogManager(BULK_LOG_MANAGER_NAME);
088        LogAppender<Record> logAppender = logManager.getAppender(SET_STREAM_NAME);
089        logAppender.append(commandId, Record.of(commandId, commandAsBytes));
090
091        return commandId;
092    }
093
094    @Override
095    public BulkStatus getStatus(String commandId) {
096        BulkStatus status = new BulkStatus();
097        status.setId(commandId);
098
099        // retrieve values from KeyValueStore
100        KeyValueStore keyValueStore = getKvStore();
101        String state = keyValueStore.getString(commandId + STATE);
102        status.setState(State.valueOf(state));
103
104        Long submitTime = keyValueStore.getLong(commandId + SUBMIT_TIME);
105        status.setSubmitTime(Instant.ofEpochMilli(submitTime.longValue()));
106
107        BulkCommand command = BulkCommands.fromKVStore(keyValueStore, commandId);
108        status.setCommand(command);
109
110        Long scrollStartTime = keyValueStore.getLong(commandId + SCROLL_START_TIME);
111        if (scrollStartTime != null) {
112            status.setScrollStartTime(Instant.ofEpochMilli(scrollStartTime));
113        }
114        Long scrollEndTime = keyValueStore.getLong(commandId + SCROLL_END_TIME);
115        if (scrollEndTime != null) {
116            status.setScrollEndTime(Instant.ofEpochMilli(scrollEndTime));
117        }
118
119        Long processedDocuments = keyValueStore.getLong(commandId + PROCESSED_DOCUMENTS);
120        status.setProcessed(processedDocuments);
121
122        Long scrolledDocumentCount = keyValueStore.getLong(commandId + SCROLLED_DOCUMENT_COUNT);
123        status.setCount(scrolledDocumentCount);
124
125        return status;
126    }
127
128    @Override
129    public boolean await(String commandId, Duration duration) throws InterruptedException {
130        long deadline = System.currentTimeMillis() + duration.toMillis();
131        KeyValueStore kvStore = getKvStore();
132        do {
133            if (COMPLETED.toString().equals(kvStore.getString(commandId + STATE))) {
134                return true;
135            }
136            Thread.sleep(500);
137        } while (deadline > System.currentTimeMillis());
138        log.debug("await timeout for commandId(" + commandId + ") after " + duration.toMillis() + " ms");
139        return false;
140    }
141
142    public KeyValueStore getKvStore() {
143        return Framework.getService(KeyValueService.class).getKeyValueStore(BULK_KV_STORE_NAME);
144    }
145
146}