001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 * 020 */ 021package org.nuxeo.ecm.automation.server.jaxrs.batch; 022 023import static org.apache.commons.lang3.StringUtils.isEmpty; 024 025import java.io.IOException; 026import java.io.InputStream; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Objects; 032import java.util.Set; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.logging.log4j.LogManager; 036import org.apache.logging.log4j.Logger; 037import org.nuxeo.ecm.automation.AutomationService; 038import org.nuxeo.ecm.automation.OperationContext; 039import org.nuxeo.ecm.automation.OperationException; 040import org.nuxeo.ecm.automation.core.util.BlobList; 041import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 042import org.nuxeo.ecm.automation.server.AutomationServer; 043import org.nuxeo.ecm.automation.server.RestBinding; 044import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchHandlerDescriptor; 045import org.nuxeo.ecm.core.api.Blob; 046import org.nuxeo.ecm.core.api.Blobs; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.NuxeoException; 049import org.nuxeo.ecm.core.api.NuxeoPrincipal; 050import org.nuxeo.ecm.core.transientstore.api.TransientStore; 051import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException; 052import org.nuxeo.runtime.api.Framework; 053import org.nuxeo.runtime.model.ComponentContext; 054import org.nuxeo.runtime.model.DefaultComponent; 055 056/** 057 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 058 * 059 * @since 5.4.2 060 */ 061public class BatchManagerComponent extends DefaultComponent implements BatchManager { 062 063 private static final Logger log = LogManager.getLogger(BatchManagerComponent.class); 064 065 public static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId"; 066 067 /** 068 * The default batch handler name. 069 * 070 * @since 10.1 071 */ 072 public static final String DEFAULT_BATCH_HANDLER = "default"; 073 074 /** @since 10.1 */ 075 public static final String XP_BATCH_HANDLER = "handlers"; 076 077 protected Map<String, BatchHandler> handlers = new HashMap<>(); 078 079 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 080 081 static { 082 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 083 } 084 085 @Override 086 public void start(ComponentContext context) { 087 super.start(context); 088 List<BatchHandlerDescriptor> descriptors = getDescriptors(XP_BATCH_HANDLER); 089 descriptors.forEach(d -> { 090 try { 091 BatchHandler handler = d.klass.getDeclaredConstructor().newInstance(); 092 handler.initialize(d.name, d.properties); 093 handlers.put(d.name, handler); 094 } catch (ReflectiveOperationException e) { 095 log.error("Unable to instantiate batch handler", e); 096 } 097 }); 098 } 099 100 @Override 101 public void stop(ComponentContext context) throws InterruptedException { 102 super.stop(context); 103 handlers.clear(); 104 } 105 106 @Override 107 @Deprecated 108 public TransientStore getTransientStore() { 109 return getHandler(DEFAULT_BATCH_HANDLER).getTransientStore(); 110 } 111 112 @Override 113 public Set<String> getSupportedHandlers() { 114 return Collections.unmodifiableSet(handlers.keySet()); 115 } 116 117 @Override 118 public BatchHandler getHandler(String handlerName) { 119 return handlers.get(handlerName); 120 } 121 122 @Override 123 public String initBatch() { 124 Batch batch = initBatchInternal(null); 125 return batch.getKey(); 126 } 127 128 @Override 129 @Deprecated 130 public String initBatch(String batchId, String contextName) { 131 Batch batch = initBatchInternal(batchId); 132 return batch.getKey(); 133 } 134 135 protected Batch initBatchInternal(String batchId) { 136 BatchHandler batchHandler = handlers.get(DEFAULT_BATCH_HANDLER); 137 return batchHandler.newBatch(batchId); 138 } 139 140 @Override 141 public Batch initBatch(String handlerName) { 142 if (isEmpty(handlerName)) { 143 handlerName = DEFAULT_BATCH_HANDLER; 144 } 145 BatchHandler batchHandler = handlers.get(handlerName); 146 if (batchHandler == null) { 147 throw new IllegalArgumentException("Batch handler does not exist: " + handlerName); 148 } 149 return batchHandler.newBatch(null); 150 } 151 152 @Override 153 public Batch getBatch(String batchId) { 154 return handlers.values() 155 .stream() 156 .map(batchHandler -> batchHandler.getBatch(batchId)) 157 .filter(Objects::nonNull) 158 .findFirst() 159 .orElse(null); 160 } 161 162 @Override 163 public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException { 164 Blob blob = Blobs.createBlob(is); 165 addBlob(batchId, index, blob, name, mime); 166 } 167 168 @Override 169 public void addBlob(String batchId, String index, Blob blob, String name, String mime) throws IOException { 170 uploadInProgress.incrementAndGet(); 171 try { 172 Batch batch = getBatch(batchId); 173 if (batch == null) { 174 batch = initBatchInternal(batchId); 175 } 176 batch.addFile(index, blob, name, mime); 177 log.debug("Added file {} [{}] to batch {}", index, name, batch.getKey()); 178 } finally { 179 uploadInProgress.decrementAndGet(); 180 } 181 } 182 183 @Override 184 public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name, 185 String mime, long fileSize) throws IOException { 186 Blob blob = Blobs.createBlob(is); 187 addBlob(batchId, index, blob, chunkCount, chunkIndex, name, mime, fileSize); 188 } 189 190 @Override 191 public void addBlob(String batchId, String index, Blob blob, int chunkCount, int chunkIndex, String name, 192 String mime, long fileSize) throws IOException { 193 uploadInProgress.incrementAndGet(); 194 try { 195 Batch batch = getBatch(batchId); 196 if (batch == null) { 197 batch = initBatchInternal(batchId); 198 } 199 batch.addChunk(index, blob, chunkCount, chunkIndex, name, mime, fileSize); 200 log.debug("Added chunk {} to file {} [{}] in batch {}", chunkIndex, index, name, batch.getKey()); 201 } finally { 202 uploadInProgress.decrementAndGet(); 203 } 204 } 205 206 @Override 207 public boolean hasBatch(String batchId) { 208 return handlers.values().stream().anyMatch(batchHandler -> batchHandler.getBatch(batchId) != null); 209 } 210 211 @Override 212 public List<Blob> getBlobs(String batchId) { 213 return getBlobs(batchId, 0); 214 } 215 216 @Override 217 public List<Blob> getBlobs(String batchId, int timeoutS) { 218 if (uploadInProgress.get() > 0 && timeoutS > 0) { 219 for (int i = 0; i < timeoutS * 5; i++) { 220 try { 221 Thread.sleep(200); 222 } catch (InterruptedException e) { 223 Thread.currentThread().interrupt(); 224 } 225 if (uploadInProgress.get() == 0) { 226 break; 227 } 228 } 229 } 230 Batch batch = getBatch(batchId); 231 if (batch == null) { 232 log.error("Unable to find batch with id {}", batchId); 233 return Collections.emptyList(); 234 } 235 return batch.getBlobs(); 236 } 237 238 @Override 239 public Blob getBlob(String batchId, String fileIndex) { 240 return getBlob(batchId, fileIndex, 0); 241 } 242 243 @Override 244 public Blob getBlob(String batchId, String fileIndex, int timeoutS) { 245 Blob blob = getBatchBlob(batchId, fileIndex); 246 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 247 for (int i = 0; i < timeoutS * 5; i++) { 248 try { 249 Thread.sleep(200); 250 } catch (InterruptedException e) { 251 Thread.currentThread().interrupt(); 252 } 253 blob = getBatchBlob(batchId, fileIndex); 254 if (blob != null) { 255 break; 256 } 257 } 258 } 259 if (!hasBatch(batchId)) { 260 log.error("Unable to find batch with id {}", batchId); 261 return null; 262 } 263 return blob; 264 } 265 266 protected Blob getBatchBlob(String batchId, String fileIndex) { 267 Blob blob = null; 268 Batch batch = getBatch(batchId); 269 if (batch != null) { 270 blob = batch.getBlob(fileIndex); 271 } 272 return blob; 273 } 274 275 @Override 276 public List<BatchFileEntry> getFileEntries(String batchId) { 277 Batch batch = getBatch(batchId); 278 if (batch == null) { 279 return null; 280 } 281 return batch.getFileEntries(); 282 } 283 284 @Override 285 public BatchFileEntry getFileEntry(String batchId, String fileIndex) { 286 Batch batch = getBatch(batchId); 287 if (batch == null) { 288 return null; 289 } 290 return batch.getFileEntry(fileIndex); 291 } 292 293 @Override 294 public void clean(String batchId) { 295 Batch batch = getBatch(batchId); 296 if (batch != null) { 297 batch.clean(); 298 } 299 } 300 301 @Override 302 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 303 Map<String, Object> contextParams, Map<String, Object> operationParams) { 304 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 305 if (blobs == null) { 306 String message = String.format("Unable to find batch associated with id '%s'", batchId); 307 log.error(message); 308 throw new NuxeoException(message); 309 } 310 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 311 } 312 313 @Override 314 public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session, 315 Map<String, Object> contextParams, Map<String, Object> operationParams) { 316 Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout()); 317 if (blob == null) { 318 String message = String.format( 319 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, 320 fileIndex); 321 log.error(message); 322 throw new NuxeoException(message); 323 } 324 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 325 } 326 327 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 328 Map<String, Object> contextParams, Map<String, Object> operationParams) { 329 if (contextParams == null) { 330 contextParams = new HashMap<>(); 331 } 332 if (operationParams == null) { 333 operationParams = new HashMap<>(); 334 } 335 336 try (OperationContext ctx = new OperationContext(session)) { 337 338 AutomationServer server = Framework.getService(AutomationServer.class); 339 RestBinding binding = server.getOperationBinding(chainOrOperationId); 340 341 if (binding != null && binding.isAdministrator) { 342 NuxeoPrincipal principal = ctx.getPrincipal(); 343 if (!principal.isAdministrator()) { 344 String message = "Not allowed. You must be administrator to use this operation"; 345 log.error(message); 346 throw new WebSecurityException(message); 347 } 348 } 349 350 ctx.setInput(blobInput); 351 ctx.putAll(contextParams); 352 353 AutomationService as = Framework.getService(AutomationService.class); 354 // Drag and Drop action category is accessible from the chain sub context as chain parameters 355 return as.run(ctx, chainOrOperationId, operationParams); 356 } catch (OperationException e) { 357 log.error("Error while executing automation batch ", e); 358 throw new NuxeoException(e); 359 } 360 } 361 362 protected int getUploadWaitTimeout() { 363 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 364 try { 365 return Integer.parseInt(t); 366 } catch (NumberFormatException e) { 367 log.error("Wrong number format for upload wait timeout property", e); 368 return 5; 369 } 370 } 371 372 @Override 373 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 374 Map<String, Object> contextParams, Map<String, Object> operationParams) { 375 try { 376 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 377 } finally { 378 clean(batchId); 379 } 380 } 381 382 @Override 383 public boolean removeFileEntry(String batchId, String filedIdx) { 384 Batch batch = getBatch(batchId); 385 return batch != null && batch.removeFileEntry(filedIdx); 386 } 387}