001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.bulk.computation; 021 022import static java.lang.Math.min; 023import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED; 026import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.RUNNING; 027import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCROLLING_RUNNING; 028 029import java.time.Duration; 030import java.time.Instant; 031import java.util.ArrayList; 032import java.util.List; 033 034import org.apache.logging.log4j.LogManager; 035import org.apache.logging.log4j.Logger; 036import org.nuxeo.ecm.core.api.DocumentNotFoundException; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.ecm.core.api.scroll.Scroll; 039import org.nuxeo.ecm.core.api.scroll.ScrollRequest; 040import org.nuxeo.ecm.core.api.scroll.ScrollService; 041import org.nuxeo.ecm.core.bulk.BulkAdminService; 042import org.nuxeo.ecm.core.bulk.BulkCodecs; 043import org.nuxeo.ecm.core.bulk.BulkService; 044import org.nuxeo.ecm.core.bulk.message.BulkBucket; 045import org.nuxeo.ecm.core.bulk.message.BulkCommand; 046import org.nuxeo.ecm.core.bulk.message.BulkStatus; 047import org.nuxeo.ecm.core.query.QueryParseException; 048import org.nuxeo.ecm.core.scroll.DocumentScrollRequest; 049import org.nuxeo.ecm.core.scroll.EmptyScrollRequest; 050import org.nuxeo.ecm.core.scroll.GenericScrollRequest; 051import org.nuxeo.lib.stream.computation.AbstractComputation; 052import org.nuxeo.lib.stream.computation.ComputationContext; 053import org.nuxeo.lib.stream.computation.Record; 054import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 055import org.nuxeo.runtime.api.Framework; 056import org.nuxeo.runtime.transaction.TransactionHelper; 057import org.nuxeo.runtime.transaction.TransactionRuntimeException; 058 059/** 060 * Materializes the document set for a command if scroller is not external. 061 * <p> 062 * Inputs: 063 * <ul> 064 * <li>i1: Reads a stream of {@link BulkCommand} sharded by action</li> 065 * </ul> 066 * <p> 067 * Outputs: 068 * <ul> 069 * <li>- "actionName": Writes {@link BulkBucket} into the action stream</li> 070 * <li>- "status": Writes {@link BulkStatus} into the action stream</li> 071 * </ul> 072 * 073 * @since 10.2 074 */ 075public class BulkScrollerComputation extends AbstractComputation { 076 077 private static final Logger log = LogManager.getLogger(BulkScrollerComputation.class); 078 079 public static final int MAX_SCROLL_SIZE = 4_000; 080 081 protected final int scrollBatchSize; 082 083 protected final int scrollKeepAliveSeconds; 084 085 protected final List<String> documentIds; 086 087 protected final boolean produceImmediate; 088 089 // @since 11.4 090 protected final long produceImmediateThreshold; 091 092 protected final int transactionTimeoutSeconds; 093 094 protected int scrollSize; 095 096 protected int bucketSize; 097 098 protected String actionStream; 099 100 public static Builder builder(String name, int nbOutputStreams) { 101 return new Builder(name, nbOutputStreams); 102 } 103 104 protected BulkScrollerComputation(Builder builder) { 105 super(builder.name, 1, builder.nbOutputStreams); 106 this.scrollBatchSize = builder.scrollBatchSize; 107 this.scrollKeepAliveSeconds = builder.scrollKeepAliveSeconds; 108 this.produceImmediate = builder.produceImmediate; 109 this.produceImmediateThreshold = builder.produceImmediateThreshold; 110 this.transactionTimeoutSeconds = Math.toIntExact(builder.transactionTimeout.toSeconds()); 111 documentIds = new ArrayList<>(scrollBatchSize); 112 } 113 114 /** 115 * @param name the computation name 116 * @param nbOutputStreams the number of registered bulk action streams 117 * @param scrollBatchSize the batch size to scroll 118 * @param scrollKeepAliveSeconds the scroll lifetime 119 * @param produceImmediate whether or not the record should be produced immedialitely while scrolling 120 */ 121 public BulkScrollerComputation(String name, int nbOutputStreams, int scrollBatchSize, int scrollKeepAliveSeconds, 122 boolean produceImmediate) { 123 this(builder(name, nbOutputStreams).setScrollBatchSize(scrollBatchSize) 124 .setScrollKeepAliveSeconds(scrollKeepAliveSeconds) 125 .setProduceImmediate(produceImmediate)); 126 } 127 128 // @since 11.2 129 public BulkScrollerComputation(String name, int nbOutputStreams, int scrollBatchSize, int scrollKeepAliveSeconds, 130 Duration transactionTimeout, boolean produceImmediate) { 131 this(builder(name, nbOutputStreams).setScrollBatchSize(scrollBatchSize) 132 .setScrollKeepAliveSeconds(scrollKeepAliveSeconds) 133 .setProduceImmediate(produceImmediate) 134 .setTransactionTimeout(transactionTimeout)); 135 } 136 137 @Override 138 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 139 boolean newTransaction = true; 140 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 141 newTransaction = false; 142 log.warn("Already inside a transaction, timeout cannot be applied, record: " + record, 143 new Throwable("stack")); 144 } else if (!TransactionHelper.startTransaction(transactionTimeoutSeconds)) { 145 throw new TransactionRuntimeException("Cannot start transaction"); 146 } 147 try { 148 processRecord(context, record); 149 } finally { 150 if (newTransaction) { 151 // Always rollback because we don't write anything 152 TransactionHelper.setTransactionRollbackOnly(); 153 TransactionHelper.commitOrRollbackTransaction(); 154 } 155 } 156 } 157 158 protected void processRecord(ComputationContext context, Record record) { 159 BulkCommand command = null; 160 String commandId = null; 161 try { 162 command = BulkCodecs.getCommandCodec().decode(record.getData()); 163 commandId = command.getId(); 164 getCommandConfiguration(command); 165 updateStatusAsScrolling(context, commandId); 166 167 long documentCount = 0; 168 long bucketNumber = 1; 169 final long queryLimit = getQueryLimit(command); 170 boolean limitReached = false; 171 scrollLoop: try (Scroll scroll = buildScroll(command)) { 172 while (scroll.hasNext()) { 173 if (isAbortedCommand(commandId)) { 174 log.debug("Skipping aborted command: {}", commandId); 175 context.askForCheckpoint(); 176 return; 177 } 178 List<String> docIds = scroll.next(); 179 log.debug("docIds: {}", docIds); 180 int scrollCount = docIds.size(); 181 if (documentCount + scrollCount < queryLimit) { 182 documentIds.addAll(docIds); 183 } else { 184 scrollCount = Math.toIntExact(queryLimit - documentCount); 185 documentIds.addAll(docIds.subList(0, scrollCount)); 186 limitReached = true; 187 } 188 while (documentIds.size() >= bucketSize) { 189 produceBucket(context, commandId, bucketSize, bucketNumber++, documentCount); 190 } 191 documentCount += scrollCount; 192 if (limitReached) { 193 log.warn("Scroll limit {} reached for command {}", queryLimit, commandId); 194 break scrollLoop; 195 } 196 } 197 } 198 199 // send remaining document ids 200 // there's at most one record because we loop while scrolling 201 if (!documentIds.isEmpty()) { 202 produceBucket(context, commandId, bucketSize, bucketNumber++, documentCount); 203 } 204 // update status after scroll when we handle the scroller 205 if (!command.useExternalScroller()) { 206 updateStatusAfterScroll(context, commandId, documentCount, limitReached); 207 } 208 } catch (IllegalArgumentException | QueryParseException | DocumentNotFoundException e) { 209 log.error("Invalid query results in an empty document set: {}", command, e); 210 updateStatusAfterScroll(context, commandId, "Invalid query"); 211 } catch (NuxeoException e) { 212 if (command != null) { 213 log.error("Invalid command produces an empty document set: {}", command, e); 214 updateStatusAfterScroll(context, command.getId(), "Invalid command"); 215 } else { 216 log.error("Discard invalid record: {}", record, e); 217 } 218 } 219 context.askForCheckpoint(); 220 } 221 222 private long getQueryLimit(BulkCommand command) { 223 Long limit = command.getQueryLimit(); 224 if (limit == null || limit <= 0) { 225 return Long.MAX_VALUE; 226 } 227 return limit; 228 } 229 230 protected Scroll buildScroll(BulkCommand command) { 231 ScrollRequest request; 232 String query = command.getQuery(); 233 log.debug("Build scroll with query: {}", query); 234 if (command.useExternalScroller()) { 235 request = EmptyScrollRequest.of(); 236 237 } else if (command.useGenericScroller()) { 238 request = GenericScrollRequest.builder(command.getScroller(), query) 239 .options(command.getParams()) 240 .size(scrollSize) 241 .build(); 242 243 } else { 244 request = DocumentScrollRequest.builder(query) 245 .username(command.getUsername()) 246 .repository(command.getRepository()) 247 .size(scrollSize) 248 .timeout(Duration.ofSeconds(scrollKeepAliveSeconds)) 249 .name(command.getScroller()) 250 .build(); 251 } 252 ScrollService service = Framework.getService(ScrollService.class); 253 return service.scroll(request); 254 } 255 256 protected void getCommandConfiguration(BulkCommand command) { 257 BulkAdminService actionService = Framework.getService(BulkAdminService.class); 258 bucketSize = command.getBucketSize() > 0 ? command.getBucketSize() 259 : actionService.getBucketSize(command.getAction()); 260 scrollSize = scrollBatchSize; 261 if (bucketSize > scrollSize) { 262 if (bucketSize <= MAX_SCROLL_SIZE) { 263 scrollSize = bucketSize; 264 } else { 265 log.warn("Bucket size: {} too big for command: {}, reduce to: {}", bucketSize, command, 266 MAX_SCROLL_SIZE); 267 scrollSize = bucketSize = MAX_SCROLL_SIZE; 268 } 269 } 270 actionStream = actionService.getInputStream(command.getAction()); 271 } 272 273 protected boolean isAbortedCommand(String commandId) { 274 BulkService bulkService = Framework.getService(BulkService.class); 275 BulkStatus status = bulkService.getStatus(commandId); 276 return ABORTED.equals(status.getState()); 277 } 278 279 protected void updateStatusAsScrolling(ComputationContext context, String commandId) { 280 BulkStatus delta = BulkStatus.deltaOf(commandId); 281 delta.setState(SCROLLING_RUNNING); 282 delta.setScrollStartTime(Instant.now()); 283 ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId, 284 BulkCodecs.getStatusCodec().encode(delta)); 285 } 286 287 protected void updateStatusAfterScroll(ComputationContext context, String commandId, String errorMessage) { 288 updateStatusAfterScroll(context, commandId, 0, errorMessage, false); 289 } 290 291 protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount, 292 boolean limited) { 293 updateStatusAfterScroll(context, commandId, documentCount, null, limited); 294 } 295 296 protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount, 297 String errorMessage, boolean limited) { 298 BulkStatus delta = BulkStatus.deltaOf(commandId); 299 if (errorMessage != null) { 300 delta.inError(errorMessage); 301 } 302 if (documentCount == 0) { 303 delta.setState(COMPLETED); 304 delta.setCompletedTime(Instant.now()); 305 } else { 306 delta.setState(RUNNING); 307 } 308 delta.setScrollEndTime(Instant.now()); 309 delta.setTotal(documentCount); 310 delta.setQueryLimitReached(limited); 311 ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId, 312 BulkCodecs.getStatusCodec().encode(delta)); 313 } 314 315 /** 316 * Produces a bucket as a record to appropriate bulk action stream. 317 */ 318 protected void produceBucket(ComputationContext context, String commandId, int bucketSize, long bucketNumber, 319 long documentCount) { 320 List<String> ids = documentIds.subList(0, min(bucketSize, documentIds.size())); 321 BulkBucket bucket = new BulkBucket(commandId, ids); 322 String key = commandId + ":" + Long.toString(bucketNumber); 323 Record record = Record.of(key, BulkCodecs.getBucketCodec().encode(bucket)); 324 if (produceImmediate || (produceImmediateThreshold > 0 && documentCount > produceImmediateThreshold)) { 325 ComputationContextImpl contextImpl = (ComputationContextImpl) context; 326 if (!contextImpl.getRecords(actionStream).isEmpty()) { 327 flushRecords(contextImpl, commandId); 328 } 329 contextImpl.produceRecordImmediate(actionStream, record); 330 } else { 331 context.produceRecord(actionStream, record); 332 } 333 ids.clear(); // this clear the documentIds part that has been sent 334 } 335 336 protected void flushRecords(ComputationContextImpl contextImpl, String commandId) { 337 log.warn("Scroller records threshold reached ({}) for action: {} on command: {}, flushing records downstream", 338 produceImmediateThreshold, actionStream, commandId); 339 contextImpl.getRecords(actionStream) 340 .forEach(record -> contextImpl.produceRecordImmediate(actionStream, record)); 341 contextImpl.getRecords(actionStream).clear(); 342 } 343 344 /** 345 * @since 11.4 346 */ 347 public static class Builder { 348 protected String name; 349 350 protected int nbOutputStreams; 351 352 protected int scrollBatchSize; 353 354 protected int scrollKeepAliveSeconds; 355 356 protected boolean produceImmediate; 357 358 protected int produceImmediateThreshold; 359 360 protected Duration transactionTimeout; 361 362 protected long queryLimit; 363 364 /** 365 * @param name the computation name 366 * @param nbOutputStream the number of registered bulk action streams 367 */ 368 public Builder(String name, int nbOutputStream) { 369 this.name = name; 370 this.nbOutputStreams = nbOutputStream; 371 } 372 373 /** 374 * @param scrollBatchSize the batch size to scroll 375 */ 376 public Builder setScrollBatchSize(int scrollBatchSize) { 377 this.scrollBatchSize = scrollBatchSize; 378 return this; 379 } 380 381 /** 382 * @param scrollKeepAliveSeconds the scroll lifetime between fetch 383 */ 384 public Builder setScrollKeepAliveSeconds(int scrollKeepAliveSeconds) { 385 this.scrollKeepAliveSeconds = scrollKeepAliveSeconds; 386 return this; 387 } 388 389 /** 390 * @param produceImmediate whether or not the record should be produced immediately while scrolling 391 */ 392 public Builder setProduceImmediate(boolean produceImmediate) { 393 this.produceImmediate = produceImmediate; 394 return this; 395 } 396 397 /** 398 * @param produceImmediateThreshold produce record immediately after the threshold to prevent OOM 399 */ 400 public Builder setProduceImmediateThreshold(int produceImmediateThreshold) { 401 this.produceImmediateThreshold = produceImmediateThreshold; 402 return this; 403 } 404 405 /** 406 * @param transactionTimeout set an explicit transaction timeout for the scroll 407 */ 408 public Builder setTransactionTimeout(Duration transactionTimeout) { 409 this.transactionTimeout = transactionTimeout; 410 return this; 411 } 412 413 public BulkScrollerComputation build() { 414 return new BulkScrollerComputation(this); 415 } 416 } 417}