001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.bulk.computation; 021 022import static java.lang.Math.min; 023import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED; 026import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.RUNNING; 027import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCROLLING_RUNNING; 028 029import java.time.Instant; 030import java.util.ArrayList; 031import java.util.List; 032 033import javax.security.auth.login.LoginContext; 034import javax.security.auth.login.LoginException; 035 036import org.apache.logging.log4j.LogManager; 037import org.apache.logging.log4j.Logger; 038import org.nuxeo.ecm.core.api.CloseableCoreSession; 039import org.nuxeo.ecm.core.api.CoreInstance; 040import org.nuxeo.ecm.core.api.DocumentNotFoundException; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.ScrollResult; 043import org.nuxeo.ecm.core.bulk.BulkAdminService; 044import org.nuxeo.ecm.core.bulk.BulkCodecs; 045import org.nuxeo.ecm.core.bulk.BulkService; 046import org.nuxeo.ecm.core.bulk.message.BulkBucket; 047import org.nuxeo.ecm.core.bulk.message.BulkCommand; 048import org.nuxeo.ecm.core.bulk.message.BulkStatus; 049import org.nuxeo.ecm.core.query.QueryParseException; 050import org.nuxeo.lib.stream.computation.AbstractComputation; 051import org.nuxeo.lib.stream.computation.ComputationContext; 052import org.nuxeo.lib.stream.computation.Record; 053import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.transaction.TransactionHelper; 056 057/** 058 * Materializes the document set for a command. 059 * <p> 060 * Inputs: 061 * <ul> 062 * <li>i1: Reads a stream of {@link BulkCommand} sharded by action</li> 063 * </ul> 064 * <p> 065 * Outputs: 066 * <ul> 067 * <li>- "actionName": Writes {@link BulkBucket} into the action stream</li> 068 * <li>- "status": Writes {@link BulkStatus} into the action stream</li> 069 * </ul> 070 * 071 * @since 10.2 072 */ 073public class BulkScrollerComputation extends AbstractComputation { 074 075 private static final Logger log = LogManager.getLogger(BulkScrollerComputation.class); 076 077 public static final int MAX_SCROLL_SIZE = 4_000; 078 079 protected final int scrollBatchSize; 080 081 protected final int scrollKeepAliveSeconds; 082 083 protected final List<String> documentIds; 084 085 private final boolean produceImmediate; 086 087 /** 088 * @param name the computation name 089 * @param nbOutputStreams the number of registered bulk action streams 090 * @param scrollBatchSize the batch size to scroll 091 * @param scrollKeepAliveSeconds the scroll lifetime 092 * @param produceImmediate whether or not the record should be produced immedialitely while scrolling 093 */ 094 public BulkScrollerComputation(String name, int nbOutputStreams, int scrollBatchSize, int scrollKeepAliveSeconds, 095 boolean produceImmediate) { 096 super(name, 1, nbOutputStreams); 097 this.scrollBatchSize = scrollBatchSize; 098 this.scrollKeepAliveSeconds = scrollKeepAliveSeconds; 099 this.produceImmediate = produceImmediate; 100 documentIds = new ArrayList<>(scrollBatchSize); 101 } 102 103 @Override 104 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 105 TransactionHelper.runInTransaction(() -> processRecord(context, record)); 106 } 107 108 protected void processRecord(ComputationContext context, Record record) { 109 BulkCommand command = null; 110 try { 111 command = BulkCodecs.getCommandCodec().decode(record.getData()); 112 String commandId = command.getId(); 113 int bucketSize = command.getBucketSize() > 0 ? command.getBucketSize() 114 : Framework.getService(BulkAdminService.class).getBucketSize(command.getAction()); 115 int scrollSize = scrollBatchSize; 116 if (bucketSize > scrollSize) { 117 if (bucketSize <= MAX_SCROLL_SIZE) { 118 scrollSize = bucketSize; 119 } else { 120 log.warn("Bucket size: %d too big for command: %s, reduce to: %d", bucketSize, command, 121 MAX_SCROLL_SIZE); 122 scrollSize = bucketSize = MAX_SCROLL_SIZE; 123 } 124 } 125 updateStatusAsScrolling(context, commandId); 126 LoginContext loginContext = Framework.loginAsUser(command.getUsername()); 127 try (CloseableCoreSession session = CoreInstance.openCoreSession(command.getRepository())) { 128 // scroll documents 129 ScrollResult<String> scroll = session.scroll(command.getQuery(), scrollSize, scrollKeepAliveSeconds); 130 long documentCount = 0; 131 long bucketNumber = 1; 132 while (scroll.hasResults()) { 133 if (isAbortedCommand(commandId)) { 134 log.debug("Skipping aborted command: {}", commandId); 135 context.askForCheckpoint(); 136 return; 137 } 138 List<String> docIds = scroll.getResults(); 139 documentIds.addAll(docIds); 140 while (documentIds.size() >= bucketSize) { 141 produceBucket(context, command.getAction(), commandId, bucketSize, bucketNumber++); 142 } 143 144 documentCount += docIds.size(); 145 // next batch 146 scroll = session.scroll(scroll.getScrollId()); 147 TransactionHelper.commitOrRollbackTransaction(); 148 TransactionHelper.startTransaction(); 149 } 150 // send remaining document ids 151 // there's at most one record because we loop while scrolling 152 if (!documentIds.isEmpty()) { 153 produceBucket(context, command.getAction(), commandId, bucketSize, bucketNumber++); 154 } 155 updateStatusAfterScroll(context, commandId, documentCount); 156 } catch (IllegalArgumentException | QueryParseException | DocumentNotFoundException e) { 157 log.error("Invalid query results in an empty document set: {}", command, e); 158 updateStatusAfterScroll(context, commandId, "Invalid query"); 159 } finally { 160 loginContext.logout(); 161 } 162 } catch (NuxeoException | LoginException e) { 163 if (command != null) { 164 log.error("Invalid command produces an empty document set: {}", command, e); 165 updateStatusAfterScroll(context, command.getId(), "Invalid command"); 166 } else { 167 log.error("Discard invalid record: {}", record, e); 168 } 169 170 } 171 context.askForCheckpoint(); 172 } 173 174 protected boolean isAbortedCommand(String commandId) { 175 BulkService bulkService = Framework.getService(BulkService.class); 176 BulkStatus status = bulkService.getStatus(commandId); 177 return ABORTED.equals(status.getState()); 178 } 179 180 protected void updateStatusAsScrolling(ComputationContext context, String commandId) { 181 BulkStatus delta = BulkStatus.deltaOf(commandId); 182 delta.setState(SCROLLING_RUNNING); 183 delta.setScrollStartTime(Instant.now()); 184 ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId, 185 BulkCodecs.getStatusCodec().encode(delta)); 186 } 187 188 protected void updateStatusAfterScroll(ComputationContext context, String commandId, String errorMessage) { 189 updateStatusAfterScroll(context, commandId, 0, errorMessage); 190 } 191 192 protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount) { 193 updateStatusAfterScroll(context, commandId, documentCount, null); 194 } 195 196 protected void updateStatusAfterScroll(ComputationContext context, String commandId, long documentCount, 197 String errorMessage) { 198 BulkStatus delta = BulkStatus.deltaOf(commandId); 199 if (errorMessage != null) { 200 delta.inError(errorMessage); 201 } 202 if (documentCount == 0) { 203 delta.setState(COMPLETED); 204 delta.setCompletedTime(Instant.now()); 205 } else { 206 delta.setState(RUNNING); 207 } 208 delta.setScrollEndTime(Instant.now()); 209 delta.setTotal(documentCount); 210 ((ComputationContextImpl) context).produceRecordImmediate(STATUS_STREAM, commandId, 211 BulkCodecs.getStatusCodec().encode(delta)); 212 } 213 214 /** 215 * Produces a bucket as a record to appropriate bulk action stream. 216 */ 217 protected void produceBucket(ComputationContext context, String action, String commandId, int bucketSize, 218 long bucketNumber) { 219 List<String> ids = documentIds.subList(0, min(bucketSize, documentIds.size())); 220 BulkBucket bucket = new BulkBucket(commandId, ids); 221 String key = commandId + ":" + Long.toString(bucketNumber); 222 Record record = Record.of(key, BulkCodecs.getBucketCodec().encode(bucket)); 223 if (produceImmediate) { 224 ((ComputationContextImpl) context).produceRecordImmediate(action, record); 225 } else { 226 context.produceRecord(action, record); 227 } 228 ids.clear(); // this clear the documentIds part that has been sent 229 } 230 231}