001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc <kleturc@nuxeo.com> 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import static org.apache.commons.lang3.StringUtils.isBlank; 022import static org.apache.commons.lang3.StringUtils.isEmpty; 023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED; 025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.RUNNING; 026import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCHEDULED; 027import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN; 028 029import java.io.Serializable; 030import java.time.Duration; 031import java.time.Instant; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicLong; 037import java.util.stream.Collectors; 038 039import org.apache.commons.collections4.map.PassiveExpiringMap; 040import org.apache.logging.log4j.Logger; 041import org.nuxeo.ecm.core.api.repository.RepositoryManager; 042import org.nuxeo.ecm.core.api.scroll.ScrollService; 043import org.nuxeo.ecm.core.bulk.message.BulkBucket; 044import org.nuxeo.ecm.core.bulk.message.BulkCommand; 045import org.nuxeo.ecm.core.bulk.message.BulkStatus; 046import org.nuxeo.ecm.core.scroll.DocumentScrollRequest; 047import org.nuxeo.ecm.core.scroll.GenericScrollRequest; 048import org.nuxeo.lib.stream.computation.Record; 049import org.nuxeo.lib.stream.log.LogAppender; 050import org.nuxeo.lib.stream.log.LogManager; 051import org.nuxeo.lib.stream.log.Name; 052import org.nuxeo.runtime.api.Framework; 053import org.nuxeo.runtime.kv.KeyValueService; 054import org.nuxeo.runtime.kv.KeyValueStore; 055import org.nuxeo.runtime.kv.KeyValueStoreProvider; 056import org.nuxeo.runtime.stream.StreamService; 057 058/** 059 * Basic implementation of {@link BulkService}. 060 * 061 * @since 10.2 062 */ 063public class BulkServiceImpl implements BulkService { 064 065 private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(BulkServiceImpl.class); 066 067 // @deprecated since 11.1 log config is not needed anymore 068 @Deprecated 069 public static final String BULK_LOG_MANAGER_NAME = "bulk"; 070 071 public static final String BULK_KV_STORE_NAME = "bulk"; 072 073 public static final String COMMAND_STREAM = "bulk/command"; 074 075 // @since 11.1 076 public static final Name COMMAND_STREAM_NAME = Name.ofUrn(COMMAND_STREAM); 077 078 public static final String STATUS_STREAM = "bulk/status"; 079 080 // @since 11.1 081 public static final Name STATUS_STREAM_NAME = Name.ofUrn(STATUS_STREAM); 082 083 public static final String DONE_STREAM = "bulk/done"; 084 085 // @since 11.1 086 public static final Name DONE_STREAM_NAME = Name.ofUrn(DONE_STREAM); 087 088 public static final String COMMAND_PREFIX = "command:"; 089 090 // @deprecated since 11.4 not needed anymore 091 @Deprecated(since = "11.4") 092 public static final String RECORD_CODEC = "avro"; 093 094 public static final String STATUS_PREFIX = "status:"; 095 096 public static final String PRODUCE_IMMEDIATE_OPTION = "produceImmediate"; 097 098 // How long we keep the command and its status in the kv store once completed 099 public static final long COMPLETED_TTL_SECONDS = 3_600; 100 101 // How long we keep the command and its status in the kv store once aborted 102 public static final long ABORTED_TTL_SECONDS = 7_200; 103 104 // @since 11.3 105 protected final AtomicLong externalScrollerCounter = new AtomicLong(); 106 107 // @since 11.3 108 protected final Map<String, BulkCommand> externalCommands = new PassiveExpiringMap<>(60, TimeUnit.SECONDS); 109 110 @Override 111 public String submit(BulkCommand command) { 112 log.debug("Run action with command={}", command); 113 // check command 114 BulkAdminService adminService = Framework.getService(BulkAdminService.class); 115 if (!adminService.getActions().contains(command.getAction())) { 116 throw new IllegalArgumentException("Unknown action for command: " + command); 117 } 118 BulkActionValidation actionValidation = adminService.getActionValidation(command.getAction()); 119 120 // Try to validate the action if a validation class is provided 121 if (actionValidation != null) { 122 actionValidation.validate(command); 123 } 124 RepositoryManager repoManager = Framework.getService(RepositoryManager.class); 125 if (repoManager != null) { 126 if (isEmpty(command.getRepository())) { 127 command.setRepository(repoManager.getDefaultRepositoryName()); 128 } else if (repoManager.getRepository(command.getRepository()) == null) { 129 throw new IllegalArgumentException("Unknown repository: " + command); 130 } 131 } 132 if (command.getBucketSize() == 0 || command.getBatchSize() == 0) { 133 if (command.getBucketSize() == 0) { 134 command.setBucketSize(adminService.getBucketSize(command.getAction())); 135 } 136 if (command.getBatchSize() == 0) { 137 command.setBatchSize(adminService.getBatchSize(command.getAction())); 138 } 139 } 140 if (command.getQueryLimit() == null) { 141 command.setQueryLimit(adminService.getQueryLimit(command.getAction())); 142 } 143 if (command.getScroller() == null && !command.useExternalScroller()) { 144 String actionScroller = adminService.getDefaultScroller(command.getAction()); 145 if (!isBlank(actionScroller)) { 146 command.setScroller(actionScroller); 147 } 148 } 149 checkIfScrollerExists(command); 150 151 // store the bulk command and status in the key/value store 152 BulkStatus status = new BulkStatus(command.getId()); 153 status.setState(SCHEDULED); 154 status.setAction(command.getAction()); 155 status.setUsername(command.getUsername()); 156 status.setSubmitTime(Instant.now()); 157 setStatus(status); 158 byte[] commandAsBytes = setCommand(command); 159 160 String shardKey; 161 if (adminService.isSequentialCommands(command.getAction())) { 162 // no concurrency all commands for this action goes to the same partition 163 shardKey = command.getAction(); 164 } else { 165 // use a random value 166 shardKey = command.getId(); 167 } 168 // send command to bulk processor 169 log.debug("Submit action with command: {}", command); 170 return submit(shardKey, command.getId(), commandAsBytes); 171 } 172 173 protected void checkIfScrollerExists(BulkCommand command) { 174 ScrollService scrollService = Framework.getService(ScrollService.class); 175 if (command.useExternalScroller()) { 176 // nothing to do 177 } else if (command.useGenericScroller()) { 178 if (!scrollService.exists( 179 GenericScrollRequest.builder(command.getScroller(), command.getQuery()).build())) { 180 throw new IllegalArgumentException("Unknown Generic Scroller for command: " + command); 181 } 182 } else if (!scrollService.exists( 183 DocumentScrollRequest.builder(command.getQuery()).name(command.getScroller()).build())) { 184 throw new IllegalArgumentException("Unknown Document Scroller for command: " + command); 185 } 186 } 187 188 @SuppressWarnings("resource") // LogManager not ours to close 189 protected String submit(String shardKey, String key, byte[] bytes) { 190 LogManager logManager = Framework.getService(StreamService.class).getLogManager(); 191 LogAppender<Record> logAppender = logManager.getAppender(COMMAND_STREAM_NAME); 192 Record record = Record.of(key, bytes); 193 log.debug("Append shardKey: {}, record: {}", shardKey, record); 194 logAppender.append(shardKey, record); 195 return key; 196 } 197 198 @Override 199 public BulkStatus getStatus(String commandId) { 200 KeyValueStore keyValueStore = getKvStore(); 201 byte[] statusAsBytes = keyValueStore.get(STATUS_PREFIX + commandId); 202 if (statusAsBytes == null) { 203 log.debug("Request status of unknown command: {}", commandId); 204 return BulkStatus.unknownOf(commandId); 205 } 206 return BulkCodecs.getStatusCodec().decode(statusAsBytes); 207 } 208 209 /** 210 * Stores the status in the kv store returns the encoded status 211 */ 212 public byte[] setStatus(BulkStatus status) { 213 KeyValueStore kvStore = getKvStore(); 214 byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode(status); 215 switch (status.getState()) { 216 case ABORTED: 217 kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, ABORTED_TTL_SECONDS); 218 // we remove the command from the kv store, so computation have to handle abort 219 kvStore.put(COMMAND_PREFIX + status.getId(), (String) null); 220 break; 221 case COMPLETED: 222 kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, COMPLETED_TTL_SECONDS); 223 kvStore.setTTL(COMMAND_PREFIX + status.getId(), COMPLETED_TTL_SECONDS); 224 break; 225 default: 226 kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes); 227 } 228 return statusAsBytes; 229 } 230 231 @Override 232 public BulkCommand getCommand(String commandId) { 233 KeyValueStore keyValueStore = getKvStore(); 234 byte[] statusAsBytes = keyValueStore.get(COMMAND_PREFIX + commandId); 235 if (statusAsBytes == null) { 236 return null; 237 } 238 return BulkCodecs.getCommandCodec().decode(statusAsBytes); 239 } 240 241 @Override 242 public BulkStatus abort(String commandId) { 243 BulkStatus status = getStatus(commandId); 244 if (COMPLETED.equals(status.getState())) { 245 log.debug("Cannot abort a completed command: {}", commandId); 246 return status; 247 } 248 status.setState(ABORTED); 249 // set the status in the KV store 250 setStatus(status); 251 // Send a delta to the status computation 252 BulkStatus delta = BulkStatus.deltaOf(commandId); 253 delta.setCompletedTime(Instant.now()); 254 delta.setState(ABORTED); 255 Record record = Record.of(commandId, BulkCodecs.getStatusCodec().encode(delta)); 256 Framework.getService(StreamService.class).getStreamManager().append(STATUS_STREAM, record); 257 return status; 258 } 259 260 @Override 261 public Map<String, Serializable> getResult(String commandId) { 262 return getStatus(commandId).getResult(); 263 } 264 265 /** 266 * Stores the command in the kv store, returns the encoded command. 267 */ 268 public byte[] setCommand(BulkCommand command) { 269 KeyValueStore kvStore = getKvStore(); 270 byte[] commandAsBytes = BulkCodecs.getCommandCodec().encode(command); 271 kvStore.put(COMMAND_PREFIX + command.getId(), commandAsBytes); 272 return commandAsBytes; 273 } 274 275 @Override 276 public boolean await(String commandId, Duration duration) throws InterruptedException { 277 long deadline = System.currentTimeMillis() + duration.toMillis(); 278 BulkStatus status; 279 do { 280 status = getStatus(commandId); 281 switch (status.getState()) { 282 case COMPLETED: 283 case ABORTED: 284 return true; 285 case UNKNOWN: 286 log.error("Unknown status for command: {}", commandId); 287 return false; 288 default: 289 // continue 290 } 291 Thread.sleep(100); 292 } while (deadline > System.currentTimeMillis()); 293 log.debug("await timeout on {} after {} ms", () -> getStatus(commandId), duration::toMillis); 294 return false; 295 } 296 297 public KeyValueStore getKvStore() { 298 return Framework.getService(KeyValueService.class).getKeyValueStore(BULK_KV_STORE_NAME); 299 } 300 301 @Override 302 public boolean await(Duration duration) throws InterruptedException { 303 KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore(); 304 Set<String> commandIds = kv.keyStream(STATUS_PREFIX) 305 .map(k -> k.replaceFirst(STATUS_PREFIX, "")) 306 .collect(Collectors.toSet()); 307 log.debug("Wait for command ids: {}", commandIds); 308 // nanoTime is always monotonous 309 long deadline = System.nanoTime() + duration.toNanos(); 310 for (String commandId : commandIds) { 311 log.debug("Wait for command id: {}", commandId); 312 for (;;) { 313 BulkStatus status = getStatus(commandId); 314 log.debug("Status of command: {} = {}", commandId, status); 315 BulkStatus.State state = status.getState(); 316 log.debug("State of command: {} = {}", commandId, state); 317 if (state == COMPLETED || state == ABORTED || state == UNKNOWN) { 318 break; 319 } 320 Thread.sleep(200); 321 if (deadline < System.nanoTime()) { 322 log.debug("await timeout, at least one uncompleted command: {}", status); 323 return false; 324 } 325 } 326 } 327 return true; 328 } 329 330 @Override 331 public List<BulkStatus> getStatuses(String username) { 332 KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore(); 333 return kv.keyStream(STATUS_PREFIX) 334 .map(kv::get) 335 .map(BulkCodecs.getStatusCodec()::decode) 336 .filter(status -> username.equals(status.getUsername())) 337 .collect(Collectors.toList()); 338 } 339 340 @Override 341 public void appendExternalBucket(BulkBucket bucket) { 342 String commandId = bucket.getCommandId(); 343 344 BulkCommand command = externalCommands.computeIfAbsent(commandId, this::getCommand); 345 String stream = Framework.getService(BulkAdminService.class).getInputStream(command.getAction()); 346 347 String key = commandId + ":" + externalScrollerCounter.incrementAndGet(); 348 Record record = Record.of(key, BulkCodecs.getBucketCodec().encode(bucket)); 349 350 log.debug("Append key: {}, record: {}", key, record); 351 Framework.getService(StreamService.class).getStreamManager().append(stream, record); 352 } 353 354 @Override 355 public void completeExternalScroll(String commandId, long count) { 356 BulkStatus delta = BulkStatus.deltaOf(commandId); 357 delta.setState(RUNNING); 358 delta.setScrollEndTime(Instant.now()); 359 delta.setTotal(count); 360 361 Record record = Record.of(commandId, BulkCodecs.getStatusCodec().encode(delta)); 362 363 log.debug("Complete external scroll with key: {}, count: {}, record: {}", commandId, count, record); 364 Framework.getService(StreamService.class).getStreamManager().append(STATUS_STREAM, record); 365 } 366}