001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Thierry Delprat <tdelprat@nuxeo.com> 016 * Antoine Taillefer <ataillefer@nuxeo.com> 017 * 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.batch; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.Serializable; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.UUID; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import org.apache.commons.lang.StringUtils; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.automation.AutomationService; 035import org.nuxeo.ecm.automation.OperationContext; 036import org.nuxeo.ecm.automation.OperationException; 037import org.nuxeo.ecm.automation.core.util.BlobList; 038import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 039import org.nuxeo.ecm.core.api.Blob; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.transientstore.api.TransientStore; 043import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.model.DefaultComponent; 046import org.nuxeo.runtime.services.config.ConfigurationService; 047 048/** 049 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 050 * 051 * @since 5.4.2 052 */ 053public class BatchManagerComponent extends DefaultComponent implements BatchManager { 054 055 protected static final Log log = LogFactory.getLog(BatchManagerComponent.class); 056 057 protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache"; 058 059 protected static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId"; 060 061 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 062 063 static { 064 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 065 } 066 067 @Override 068 public TransientStore getTransientStore() { 069 TransientStoreService tss = Framework.getService(TransientStoreService.class); 070 return tss.getStore(TRANSIENT_STORE_NAME); 071 } 072 073 @Override 074 public String initBatch() { 075 Batch batch = initBatchInternal(null); 076 return batch.getKey(); 077 } 078 079 @Override 080 @Deprecated 081 public String initBatch(String batchId, String contextName) { 082 Batch batch = initBatchInternal(batchId); 083 return batch.getKey(); 084 } 085 086 protected Batch initBatchInternal(String batchId) { 087 if (StringUtils.isEmpty(batchId)) { 088 batchId = "batchId-" + UUID.randomUUID().toString(); 089 } else if (!Framework.getService(ConfigurationService.class).isBooleanPropertyTrue(CLIENT_BATCH_ID_FLAG)) { 090 throw new NuxeoException( 091 String.format( 092 "Cannot initialize upload batch with a given id since configuration property %s is not set to true", 093 CLIENT_BATCH_ID_FLAG)); 094 } 095 096 // That's the way of storing an empty entry 097 log.debug("Initializing batch with id " + batchId); 098 getTransientStore().setCompleted(batchId, false); 099 return new Batch(batchId); 100 } 101 102 public Batch getBatch(String batchId) { 103 Map<String, Serializable> batchEntryParams = getTransientStore().getParameters(batchId); 104 if (batchEntryParams == null) { 105 if (!hasBatch(batchId)) { 106 return null; 107 } 108 batchEntryParams = new HashMap<>(); 109 } 110 return new Batch(batchId, batchEntryParams); 111 } 112 113 @Override 114 public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException { 115 uploadInProgress.incrementAndGet(); 116 try { 117 Batch batch = getBatch(batchId); 118 if (batch == null) { 119 batch = initBatchInternal(batchId); 120 } 121 batch.addFile(index, is, name, mime); 122 log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey())); 123 } finally { 124 uploadInProgress.decrementAndGet(); 125 } 126 } 127 128 @Override 129 public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name, 130 String mime, long fileSize) throws IOException { 131 uploadInProgress.incrementAndGet(); 132 try { 133 Batch batch = getBatch(batchId); 134 if (batch == null) { 135 batch = initBatchInternal(batchId); 136 } 137 batch.addChunk(index, is, chunkCount, chunkIndex, name, mime, fileSize); 138 log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name, 139 batch.getKey())); 140 } finally { 141 uploadInProgress.decrementAndGet(); 142 } 143 } 144 145 @Override 146 public boolean hasBatch(String batchId) { 147 return batchId != null && getTransientStore().exists(batchId); 148 } 149 150 @Override 151 public List<Blob> getBlobs(String batchId) { 152 return getBlobs(batchId, 0); 153 } 154 155 @Override 156 public List<Blob> getBlobs(String batchId, int timeoutS) { 157 if (uploadInProgress.get() > 0 && timeoutS > 0) { 158 for (int i = 0; i < timeoutS * 5; i++) { 159 try { 160 Thread.sleep(200); 161 } catch (InterruptedException e) { 162 Thread.currentThread().interrupt(); 163 } 164 if (uploadInProgress.get() == 0) { 165 break; 166 } 167 } 168 } 169 Batch batch = getBatch(batchId); 170 if (batch == null) { 171 log.error("Unable to find batch with id " + batchId); 172 return Collections.emptyList(); 173 } 174 return batch.getBlobs(); 175 } 176 177 @Override 178 public Blob getBlob(String batchId, String fileIndex) { 179 return getBlob(batchId, fileIndex, 0); 180 } 181 182 @Override 183 public Blob getBlob(String batchId, String fileIndex, int timeoutS) { 184 Blob blob = getBatchBlob(batchId, fileIndex); 185 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 186 for (int i = 0; i < timeoutS * 5; i++) { 187 try { 188 Thread.sleep(200); 189 } catch (InterruptedException e) { 190 Thread.currentThread().interrupt(); 191 } 192 blob = getBatchBlob(batchId, fileIndex); 193 if (blob != null) { 194 break; 195 } 196 } 197 } 198 if (!hasBatch(batchId)) { 199 log.error("Unable to find batch with id " + batchId); 200 return null; 201 } 202 return blob; 203 } 204 205 protected Blob getBatchBlob(String batchId, String fileIndex) { 206 Blob blob = null; 207 Batch batch = getBatch(batchId); 208 if (batch != null) { 209 blob = batch.getBlob(fileIndex); 210 } 211 return blob; 212 } 213 214 @Override 215 public List<BatchFileEntry> getFileEntries(String batchId) { 216 Batch batch = getBatch(batchId); 217 if (batch == null) { 218 return null; 219 } 220 return batch.getFileEntries(); 221 } 222 223 @Override 224 public BatchFileEntry getFileEntry(String batchId, String fileIndex) { 225 Batch batch = getBatch(batchId); 226 if (batch == null) { 227 return null; 228 } 229 return batch.getFileEntry(fileIndex); 230 } 231 232 @Override 233 public void clean(String batchId) { 234 Batch batch = getBatch(batchId); 235 if (batch != null) { 236 batch.clean(); 237 } 238 } 239 240 @Override 241 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 242 Map<String, Object> contextParams, Map<String, Object> operationParams) { 243 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 244 if (blobs == null) { 245 String message = String.format("Unable to find batch associated with id '%s'", batchId); 246 log.error(message); 247 throw new NuxeoException(message); 248 } 249 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 250 } 251 252 @Override 253 public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session, 254 Map<String, Object> contextParams, Map<String, Object> operationParams) { 255 Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout()); 256 if (blob == null) { 257 String message = String.format( 258 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, 259 fileIndex); 260 log.error(message); 261 throw new NuxeoException(message); 262 } 263 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 264 } 265 266 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 267 Map<String, Object> contextParams, Map<String, Object> operationParams) { 268 if (contextParams == null) { 269 contextParams = new HashMap<>(); 270 } 271 if (operationParams == null) { 272 operationParams = new HashMap<>(); 273 } 274 275 OperationContext ctx = new OperationContext(session); 276 ctx.setInput(blobInput); 277 ctx.putAll(contextParams); 278 279 try { 280 Object result = null; 281 AutomationService as = Framework.getLocalService(AutomationService.class); 282 // Drag and Drop action category is accessible from the chain sub context as chain parameters 283 result = as.run(ctx, chainOrOperationId, operationParams); 284 return result; 285 } catch (OperationException e) { 286 log.error("Error while executing automation batch ", e); 287 throw new NuxeoException(e); 288 } 289 } 290 291 protected int getUploadWaitTimeout() { 292 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 293 try { 294 return Integer.parseInt(t); 295 } catch (NumberFormatException e) { 296 log.error("Wrong number format for upload wait timeout property", e); 297 return 5; 298 } 299 } 300 301 @Override 302 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 303 Map<String, Object> contextParams, Map<String, Object> operationParams) { 304 try { 305 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 306 } finally { 307 clean(batchId); 308 } 309 } 310}