001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 * 020 */ 021package org.nuxeo.ecm.automation.server.jaxrs.batch; 022 023import static org.apache.commons.lang3.StringUtils.isEmpty; 024 025import java.io.IOException; 026import java.io.InputStream; 027import java.security.Principal; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Set; 034import java.util.concurrent.atomic.AtomicInteger; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.automation.AutomationService; 039import org.nuxeo.ecm.automation.OperationContext; 040import org.nuxeo.ecm.automation.OperationException; 041import org.nuxeo.ecm.automation.core.util.BlobList; 042import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 043import org.nuxeo.ecm.automation.server.AutomationServer; 044import org.nuxeo.ecm.automation.server.RestBinding; 045import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchHandlerDescriptor; 046import org.nuxeo.ecm.core.api.Blob; 047import org.nuxeo.ecm.core.api.Blobs; 048import org.nuxeo.ecm.core.api.CoreSession; 049import org.nuxeo.ecm.core.api.NuxeoException; 050import org.nuxeo.ecm.core.api.NuxeoPrincipal; 051import org.nuxeo.ecm.core.transientstore.api.TransientStore; 052import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 053import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.model.ComponentContext; 056import org.nuxeo.runtime.model.ComponentInstance; 057import org.nuxeo.runtime.model.DefaultComponent; 058 059/** 060 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 061 * 062 * @since 5.4.2 063 */ 064public class BatchManagerComponent extends DefaultComponent implements BatchManager { 065 066 protected static final Log log = LogFactory.getLog(BatchManagerComponent.class); 067 068 protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache"; 069 070 public static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId"; 071 072 /** 073 * The default batch handler name. 074 * 075 * @since 10.1 076 */ 077 public static final String DEFAULT_BATCH_HANDLER = "default"; 078 079 /** @since 10.1 */ 080 public static final String EP_BATCH_HANDLER = "handlers"; 081 082 protected Map<String, BatchHandler> handlers; 083 084 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 085 086 static { 087 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 088 } 089 090 @Override 091 public void activate(ComponentContext context) { 092 super.activate(context); 093 handlers = new HashMap<>(); 094 } 095 096 @Override 097 public void deactivate(ComponentContext context) { 098 handlers = null; 099 super.deactivate(context); 100 } 101 102 @Override 103 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 104 if (EP_BATCH_HANDLER.equals(extensionPoint)) { 105 if (contribution instanceof BatchHandlerDescriptor) { 106 BatchHandlerDescriptor contributionDescriptor = (BatchHandlerDescriptor) contribution; 107 String name = contributionDescriptor.getName(); 108 BatchHandler batchHandler = contributionDescriptor.newInstance(); 109 batchHandler.initialize(name, contributionDescriptor.getProperties()); 110 handlers.put(name, batchHandler); 111 } else { 112 throw new NuxeoException("Invalid class: " + contribution.getClass().getName()); 113 } 114 } else { 115 throw new NuxeoException("Invalid extension point: " + extensionPoint); 116 } 117 } 118 119 @Override 120 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 121 if (EP_BATCH_HANDLER.equals(extensionPoint)) { 122 if (BatchHandlerDescriptor.class.isAssignableFrom(contribution.getClass())) { 123 BatchHandlerDescriptor contributionDescriptor = (BatchHandlerDescriptor) contribution; 124 String name = contributionDescriptor.getName(); 125 handlers.remove(name); 126 } 127 } 128 } 129 130 @Override 131 public TransientStore getTransientStore() { 132 TransientStoreService tss = Framework.getService(TransientStoreService.class); 133 return tss.getStore(TRANSIENT_STORE_NAME); 134 } 135 136 @Override 137 public Set<String> getSupportedHandlers() { 138 return Collections.unmodifiableSet(handlers.keySet()); 139 } 140 141 @Override 142 public BatchHandler getHandler(String handlerName) { 143 return handlers.get(handlerName); 144 } 145 146 @Override 147 public String initBatch() { 148 Batch batch = initBatchInternal(null); 149 return batch.getKey(); 150 } 151 152 @Override 153 @Deprecated 154 public String initBatch(String batchId, String contextName) { 155 Batch batch = initBatchInternal(batchId); 156 return batch.getKey(); 157 } 158 159 protected Batch initBatchInternal(String batchId) { 160 BatchHandler batchHandler = handlers.get(DEFAULT_BATCH_HANDLER); 161 return batchHandler.newBatch(batchId); 162 } 163 164 @Override 165 public Batch initBatch(String handlerName) { 166 if (isEmpty(handlerName)) { 167 handlerName = DEFAULT_BATCH_HANDLER; 168 } 169 BatchHandler batchHandler = handlers.get(handlerName); 170 if (batchHandler == null) { 171 throw new IllegalArgumentException("Batch handler does not exist: " + handlerName); 172 } 173 return batchHandler.newBatch(null); 174 } 175 176 @Override 177 public Batch getBatch(String batchId) { 178 return handlers.values() 179 .stream() 180 .map(batchHandler -> batchHandler.getBatch(batchId)) 181 .filter(Objects::nonNull) 182 .findFirst() 183 .orElse(null); 184 } 185 186 @Override 187 public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException { 188 Blob blob = Blobs.createBlob(is); 189 addBlob(batchId, index, blob, name, mime); 190 } 191 192 @Override 193 public void addBlob(String batchId, String index, Blob blob, String name, String mime) throws IOException { 194 uploadInProgress.incrementAndGet(); 195 try { 196 Batch batch = getBatch(batchId); 197 if (batch == null) { 198 batch = initBatchInternal(batchId); 199 } 200 batch.addFile(index, blob, name, mime); 201 log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey())); 202 } finally { 203 uploadInProgress.decrementAndGet(); 204 } 205 } 206 207 @Override 208 public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name, 209 String mime, long fileSize) throws IOException { 210 Blob blob = Blobs.createBlob(is); 211 addBlob(batchId, index, blob, chunkCount, chunkIndex, name, mime, fileSize); 212 } 213 214 @Override 215 public void addBlob(String batchId, String index, Blob blob, int chunkCount, int chunkIndex, String name, 216 String mime, long fileSize) throws IOException { 217 uploadInProgress.incrementAndGet(); 218 try { 219 Batch batch = getBatch(batchId); 220 if (batch == null) { 221 batch = initBatchInternal(batchId); 222 } 223 batch.addChunk(index, blob, chunkCount, chunkIndex, name, mime, fileSize); 224 log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name, 225 batch.getKey())); 226 } finally { 227 uploadInProgress.decrementAndGet(); 228 } 229 } 230 231 @Override 232 public boolean hasBatch(String batchId) { 233 return handlers.values().stream().anyMatch(batchHandler -> batchHandler.getBatch(batchId) != null); 234 } 235 236 @Override 237 public List<Blob> getBlobs(String batchId) { 238 return getBlobs(batchId, 0); 239 } 240 241 @Override 242 public List<Blob> getBlobs(String batchId, int timeoutS) { 243 if (uploadInProgress.get() > 0 && timeoutS > 0) { 244 for (int i = 0; i < timeoutS * 5; i++) { 245 try { 246 Thread.sleep(200); 247 } catch (InterruptedException e) { 248 Thread.currentThread().interrupt(); 249 } 250 if (uploadInProgress.get() == 0) { 251 break; 252 } 253 } 254 } 255 Batch batch = getBatch(batchId); 256 if (batch == null) { 257 log.error("Unable to find batch with id " + batchId); 258 return Collections.emptyList(); 259 } 260 return batch.getBlobs(); 261 } 262 263 @Override 264 public Blob getBlob(String batchId, String fileIndex) { 265 return getBlob(batchId, fileIndex, 0); 266 } 267 268 @Override 269 public Blob getBlob(String batchId, String fileIndex, int timeoutS) { 270 Blob blob = getBatchBlob(batchId, fileIndex); 271 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 272 for (int i = 0; i < timeoutS * 5; i++) { 273 try { 274 Thread.sleep(200); 275 } catch (InterruptedException e) { 276 Thread.currentThread().interrupt(); 277 } 278 blob = getBatchBlob(batchId, fileIndex); 279 if (blob != null) { 280 break; 281 } 282 } 283 } 284 if (!hasBatch(batchId)) { 285 log.error("Unable to find batch with id " + batchId); 286 return null; 287 } 288 return blob; 289 } 290 291 protected Blob getBatchBlob(String batchId, String fileIndex) { 292 Blob blob = null; 293 Batch batch = getBatch(batchId); 294 if (batch != null) { 295 blob = batch.getBlob(fileIndex); 296 } 297 return blob; 298 } 299 300 @Override 301 public List<BatchFileEntry> getFileEntries(String batchId) { 302 Batch batch = getBatch(batchId); 303 if (batch == null) { 304 return null; 305 } 306 return batch.getFileEntries(); 307 } 308 309 @Override 310 public BatchFileEntry getFileEntry(String batchId, String fileIndex) { 311 Batch batch = getBatch(batchId); 312 if (batch == null) { 313 return null; 314 } 315 return batch.getFileEntry(fileIndex); 316 } 317 318 @Override 319 public void clean(String batchId) { 320 Batch batch = getBatch(batchId); 321 if (batch != null) { 322 batch.clean(); 323 } 324 } 325 326 @Override 327 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 328 Map<String, Object> contextParams, Map<String, Object> operationParams) { 329 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 330 if (blobs == null) { 331 String message = String.format("Unable to find batch associated with id '%s'", batchId); 332 log.error(message); 333 throw new NuxeoException(message); 334 } 335 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 336 } 337 338 @Override 339 public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session, 340 Map<String, Object> contextParams, Map<String, Object> operationParams) { 341 Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout()); 342 if (blob == null) { 343 String message = String.format( 344 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, 345 fileIndex); 346 log.error(message); 347 throw new NuxeoException(message); 348 } 349 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 350 } 351 352 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 353 Map<String, Object> contextParams, Map<String, Object> operationParams) { 354 if (contextParams == null) { 355 contextParams = new HashMap<>(); 356 } 357 if (operationParams == null) { 358 operationParams = new HashMap<>(); 359 } 360 361 try (OperationContext ctx = new OperationContext(session)) { 362 363 AutomationServer server = Framework.getService(AutomationServer.class); 364 RestBinding binding = server.getOperationBinding(chainOrOperationId); 365 366 if (binding != null && binding.isAdministrator()) { 367 Principal principal = ctx.getPrincipal(); 368 if (!(principal instanceof NuxeoPrincipal && ((NuxeoPrincipal) principal).isAdministrator())) { 369 String message = "Not allowed. You must be administrator to use this operation"; 370 log.error(message); 371 throw new WebSecurityException(message); 372 } 373 } 374 375 ctx.setInput(blobInput); 376 ctx.putAll(contextParams); 377 378 AutomationService as = Framework.getService(AutomationService.class); 379 // Drag and Drop action category is accessible from the chain sub context as chain parameters 380 return as.run(ctx, chainOrOperationId, operationParams); 381 } catch (OperationException e) { 382 log.error("Error while executing automation batch ", e); 383 throw new NuxeoException(e); 384 } 385 } 386 387 protected int getUploadWaitTimeout() { 388 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 389 try { 390 return Integer.parseInt(t); 391 } catch (NumberFormatException e) { 392 log.error("Wrong number format for upload wait timeout property", e); 393 return 5; 394 } 395 } 396 397 @Override 398 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 399 Map<String, Object> contextParams, Map<String, Object> operationParams) { 400 try { 401 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 402 } finally { 403 clean(batchId); 404 } 405 } 406 407 @Override 408 public boolean removeFileEntry(String batchId, String filedIdx) { 409 Batch batch = getBatch(batchId); 410 return batch != null && batch.removeFileEntry(filedIdx); 411 } 412}