001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Thierry Delprat <tdelprat@nuxeo.com> 016 * Antoine Taillefer <ataillefer@nuxeo.com> 017 * 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.batch; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028import java.util.concurrent.atomic.AtomicInteger; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.ecm.automation.AutomationService; 033import org.nuxeo.ecm.automation.OperationContext; 034import org.nuxeo.ecm.automation.OperationException; 035import org.nuxeo.ecm.automation.core.util.BlobList; 036import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.transientstore.api.StorageEntry; 041import org.nuxeo.ecm.core.transientstore.api.TransientStore; 042import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.model.DefaultComponent; 045 046/** 047 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 048 * 049 * @since 5.4.2 050 */ 051public class BatchManagerComponent extends DefaultComponent implements BatchManager { 052 053 protected static final String DEFAULT_CONTEXT = "None"; 054 055 protected static final Log log = LogFactory.getLog(BatchManagerComponent.class); 056 057 protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache"; 058 059 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 060 061 static { 062 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 063 } 064 065 @Override 066 public TransientStore getTransientStore() { 067 TransientStoreService tss = Framework.getService(TransientStoreService.class); 068 return tss.getStore(TRANSIENT_STORE_NAME); 069 } 070 071 @Override 072 public String initBatch() { 073 return initBatch(null, null); 074 } 075 076 @Override 077 public String initBatch(String batchId, String contextName) { 078 Batch batch = initBatchInternal(batchId, contextName); 079 return batch.getId(); 080 } 081 082 protected Batch initBatchInternal(String batchId, String contextName) { 083 if (batchId != null) { 084 StorageEntry entry = getTransientStore().get(batchId); 085 if (entry != null) { 086 return (Batch) entry; 087 } 088 } 089 090 if (batchId == null || batchId.isEmpty()) { 091 batchId = "batchId-" + UUID.randomUUID().toString(); 092 } 093 if (contextName == null || contextName.isEmpty()) { 094 contextName = DEFAULT_CONTEXT; 095 } 096 097 Batch newBatch = new Batch(batchId); 098 getTransientStore().put(newBatch); 099 return newBatch; 100 } 101 102 @Override 103 public void addStream(String batchId, String idx, InputStream is, String name, String mime) throws IOException { 104 uploadInProgress.incrementAndGet(); 105 try { 106 Batch batch = (Batch) getTransientStore().get(batchId); 107 if (batch == null) { 108 batch = initBatchInternal(batchId, null); 109 } 110 String fileEntryId = batch.addFile(idx, is, name, mime); 111 112 // Need to synchronize manipulation of the batch TransientStore entry params 113 synchronized (this) { 114 batch = (Batch) getTransientStore().get(batch.getId()); 115 batch.put(idx, fileEntryId); 116 getTransientStore().put(batch); 117 } 118 log.debug(String.format("Added file %s [%s] to batch %s", idx, name, batch.getId())); 119 } finally { 120 uploadInProgress.decrementAndGet(); 121 } 122 } 123 124 @Override 125 public void addStream(String batchId, String idx, InputStream is, int chunkCount, int chunkIdx, String name, 126 String mime, long fileSize) throws IOException { 127 uploadInProgress.incrementAndGet(); 128 try { 129 Batch batch = (Batch) getTransientStore().get(batchId); 130 if (batch == null) { 131 batch = initBatchInternal(batchId, null); 132 } 133 String fileEntryId = batch.addChunk(idx, is, chunkCount, chunkIdx, name, mime, fileSize); 134 135 // Need to synchronize manipulation of the batch TransientStore entry params 136 synchronized (this) { 137 batch = (Batch) getTransientStore().get(batch.getId()); 138 batch.put(idx, fileEntryId); 139 getTransientStore().put(batch); 140 } 141 log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIdx, idx, name, batch.getId())); 142 } finally { 143 uploadInProgress.decrementAndGet(); 144 } 145 } 146 147 @Override 148 public boolean hasBatch(String batchId) { 149 return batchId != null && getTransientStore().get(batchId) != null; 150 } 151 152 @Override 153 public List<Blob> getBlobs(String batchId) { 154 return getBlobs(batchId, 0); 155 } 156 157 @Override 158 public List<Blob> getBlobs(String batchId, int timeoutS) { 159 if (uploadInProgress.get() > 0 && timeoutS > 0) { 160 for (int i = 0; i < timeoutS * 5; i++) { 161 try { 162 Thread.sleep(200); 163 } catch (InterruptedException e) { 164 Thread.currentThread().interrupt(); 165 } 166 if (uploadInProgress.get() == 0) { 167 break; 168 } 169 } 170 } 171 Batch batch = (Batch) getTransientStore().get(batchId); 172 if (batch == null) { 173 log.error("Unable to find batch with id " + batchId); 174 return Collections.emptyList(); 175 } 176 return batch.getBlobs(); 177 } 178 179 @Override 180 public Blob getBlob(String batchId, String fileId) { 181 return getBlob(batchId, fileId, 0); 182 } 183 184 @Override 185 public Blob getBlob(String batchId, String fileId, int timeoutS) { 186 Blob blob = getBatchBlob(batchId, fileId); 187 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 188 for (int i = 0; i < timeoutS * 5; i++) { 189 try { 190 Thread.sleep(200); 191 } catch (InterruptedException e) { 192 Thread.currentThread().interrupt(); 193 } 194 blob = getBatchBlob(batchId, fileId); 195 if (blob != null) { 196 break; 197 } 198 } 199 } 200 if (!hasBatch(batchId)) { 201 log.error("Unable to find batch with id " + batchId); 202 return null; 203 } 204 return blob; 205 } 206 207 protected Blob getBatchBlob(String batchId, String fileId) { 208 Blob blob = null; 209 Batch batch = (Batch) getTransientStore().get(batchId); 210 if (batch != null) { 211 blob = batch.getBlob(fileId); 212 } 213 return blob; 214 } 215 216 @Override 217 public List<BatchFileEntry> getFileEntries(String batchId) { 218 Batch batch = (Batch) getTransientStore().get(batchId); 219 if (batch == null) { 220 return null; 221 } 222 return batch.getFileEntries(); 223 } 224 225 @Override 226 public BatchFileEntry getFileEntry(String batchId, String fileId) { 227 Batch batch = (Batch) getTransientStore().get(batchId); 228 if (batch == null) { 229 return null; 230 } 231 return batch.getFileEntry(fileId); 232 } 233 234 @Override 235 public void clean(String batchId) { 236 Batch batch = (Batch) getTransientStore().get(batchId); 237 if (batch != null) { 238 batch.clean(); 239 } 240 } 241 242 @Override 243 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 244 Map<String, Object> contextParams, Map<String, Object> operationParams) { 245 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 246 if (blobs == null) { 247 String message = String.format("Unable to find batch associated with id '%s'", batchId); 248 log.error(message); 249 throw new NuxeoException(message); 250 } 251 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 252 } 253 254 @Override 255 public Object execute(String batchId, String fileIdx, String chainOrOperationId, CoreSession session, 256 Map<String, Object> contextParams, Map<String, Object> operationParams) { 257 Blob blob = getBlob(batchId, fileIdx, getUploadWaitTimeout()); 258 if (blob == null) { 259 String message = String.format( 260 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, fileIdx); 261 log.error(message); 262 throw new NuxeoException(message); 263 } 264 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 265 } 266 267 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 268 Map<String, Object> contextParams, Map<String, Object> operationParams) { 269 if (contextParams == null) { 270 contextParams = new HashMap<>(); 271 } 272 if (operationParams == null) { 273 operationParams = new HashMap<>(); 274 } 275 276 OperationContext ctx = new OperationContext(session); 277 ctx.setInput(blobInput); 278 ctx.putAll(contextParams); 279 280 try { 281 Object result = null; 282 AutomationService as = Framework.getLocalService(AutomationService.class); 283 // Drag and Drop action category is accessible from the chain sub context as chain parameters 284 result = as.run(ctx, chainOrOperationId, operationParams); 285 return result; 286 } catch (OperationException e) { 287 log.error("Error while executing automation batch ", e); 288 throw new NuxeoException(e); 289 } 290 } 291 292 protected int getUploadWaitTimeout() { 293 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 294 try { 295 return Integer.parseInt(t); 296 } catch (NumberFormatException e) { 297 log.error("Wrong number format for upload wait timeout property", e); 298 return 5; 299 } 300 } 301 302 @Override 303 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 304 Map<String, Object> contextParams, Map<String, Object> operationParams) { 305 try { 306 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 307 } finally { 308 clean(batchId); 309 } 310 } 311}