001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc <kleturc@nuxeo.com> 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import static org.apache.commons.lang3.StringUtils.isEmpty; 022import static org.nuxeo.ecm.core.bulk.BulkComponent.BULK_KV_STORE_NAME; 023import static org.nuxeo.ecm.core.bulk.BulkComponent.BULK_LOG_MANAGER_NAME; 024import static org.nuxeo.ecm.core.bulk.BulkStatus.State.COMPLETED; 025import static org.nuxeo.ecm.core.bulk.BulkStatus.State.SCHEDULED; 026 027import java.time.Duration; 028import java.time.Instant; 029import java.util.UUID; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.bulk.BulkStatus.State; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.lib.stream.log.LogAppender; 036import org.nuxeo.lib.stream.log.LogManager; 037import org.nuxeo.runtime.api.Framework; 038import org.nuxeo.runtime.kv.KeyValueService; 039import org.nuxeo.runtime.kv.KeyValueStore; 040import org.nuxeo.runtime.stream.StreamService; 041 042/** 043 * Basic implementation of {@link BulkService}. 044 * 045 * @since 10.2 046 */ 047public class BulkServiceImpl implements BulkService { 048 049 private static final Log log = LogFactory.getLog(BulkServiceImpl.class); 050 051 protected static final String SET_STREAM_NAME = "documentSet"; 052 053 protected static final String COMMAND = ":command"; 054 055 protected static final String SUBMIT_TIME = ":submitTime"; 056 057 protected static final String SCROLL_START_TIME = ":scrollStartTime"; 058 059 protected static final String SCROLL_END_TIME = ":scrollEndTime"; 060 061 protected static final String STATE = ":state"; 062 063 protected static final String PROCESSED_DOCUMENTS = ":processedDocs"; 064 065 protected static final String SCROLLED_DOCUMENT_COUNT = ":count"; 066 067 @Override 068 public String submit(BulkCommand command) { 069 if (log.isDebugEnabled()) { 070 log.debug("Run action with command=" + command); 071 } 072 // check command 073 if (isEmpty(command.getRepository()) || isEmpty(command.getQuery()) || isEmpty(command.getAction())) { 074 throw new IllegalArgumentException("Missing mandatory values"); 075 } 076 // create the command id and status 077 String commandId = UUID.randomUUID().toString(); 078 byte[] commandAsBytes = BulkCommands.toBytes(command); 079 080 // store the bulk command and status in the key/value store 081 KeyValueStore keyValueStore = getKvStore(); 082 keyValueStore.put(commandId + STATE, SCHEDULED.toString()); 083 keyValueStore.put(commandId + SUBMIT_TIME, Instant.now().toEpochMilli()); 084 keyValueStore.put(commandId + COMMAND, commandAsBytes); 085 086 // send it to nuxeo-stream 087 LogManager logManager = Framework.getService(StreamService.class).getLogManager(BULK_LOG_MANAGER_NAME); 088 LogAppender<Record> logAppender = logManager.getAppender(SET_STREAM_NAME); 089 logAppender.append(commandId, Record.of(commandId, commandAsBytes)); 090 091 return commandId; 092 } 093 094 @Override 095 public BulkStatus getStatus(String commandId) { 096 BulkStatus status = new BulkStatus(); 097 status.setId(commandId); 098 099 // retrieve values from KeyValueStore 100 KeyValueStore keyValueStore = getKvStore(); 101 String state = keyValueStore.getString(commandId + STATE); 102 status.setState(State.valueOf(state)); 103 104 Long submitTime = keyValueStore.getLong(commandId + SUBMIT_TIME); 105 status.setSubmitTime(Instant.ofEpochMilli(submitTime.longValue())); 106 107 BulkCommand command = BulkCommands.fromKVStore(keyValueStore, commandId); 108 status.setCommand(command); 109 110 Long scrollStartTime = keyValueStore.getLong(commandId + SCROLL_START_TIME); 111 if (scrollStartTime != null) { 112 status.setScrollStartTime(Instant.ofEpochMilli(scrollStartTime)); 113 } 114 Long scrollEndTime = keyValueStore.getLong(commandId + SCROLL_END_TIME); 115 if (scrollEndTime != null) { 116 status.setScrollEndTime(Instant.ofEpochMilli(scrollEndTime)); 117 } 118 119 Long processedDocuments = keyValueStore.getLong(commandId + PROCESSED_DOCUMENTS); 120 status.setProcessed(processedDocuments); 121 122 Long scrolledDocumentCount = keyValueStore.getLong(commandId + SCROLLED_DOCUMENT_COUNT); 123 status.setCount(scrolledDocumentCount); 124 125 return status; 126 } 127 128 @Override 129 public boolean await(String commandId, Duration duration) throws InterruptedException { 130 long deadline = System.currentTimeMillis() + duration.toMillis(); 131 KeyValueStore kvStore = getKvStore(); 132 do { 133 if (COMPLETED.toString().equals(kvStore.getString(commandId + STATE))) { 134 return true; 135 } 136 Thread.sleep(500); 137 } while (deadline > System.currentTimeMillis()); 138 log.debug("await timeout for commandId(" + commandId + ") after " + duration.toMillis() + " ms"); 139 return false; 140 } 141 142 public KeyValueStore getKvStore() { 143 return Framework.getService(KeyValueService.class).getKeyValueStore(BULK_KV_STORE_NAME); 144 } 145 146}