001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 * 020 */ 021package org.nuxeo.ecm.automation.server.jaxrs.batch; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.Serializable; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.UUID; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.commons.lang.StringUtils; 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.automation.AutomationService; 037import org.nuxeo.ecm.automation.OperationContext; 038import org.nuxeo.ecm.automation.OperationException; 039import org.nuxeo.ecm.automation.core.util.BlobList; 040import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 041import org.nuxeo.ecm.core.api.Blob; 042import org.nuxeo.ecm.core.api.CoreSession; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.transientstore.api.TransientStore; 045import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.model.DefaultComponent; 048import org.nuxeo.runtime.services.config.ConfigurationService; 049 050/** 051 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 052 * 053 * @since 5.4.2 054 */ 055public class BatchManagerComponent extends DefaultComponent implements BatchManager { 056 057 protected static final Log log = LogFactory.getLog(BatchManagerComponent.class); 058 059 protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache"; 060 061 protected static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId"; 062 063 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 064 065 static { 066 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 067 } 068 069 @Override 070 public TransientStore getTransientStore() { 071 TransientStoreService tss = Framework.getService(TransientStoreService.class); 072 return tss.getStore(TRANSIENT_STORE_NAME); 073 } 074 075 @Override 076 public String initBatch() { 077 Batch batch = initBatchInternal(null); 078 return batch.getKey(); 079 } 080 081 @Override 082 @Deprecated 083 public String initBatch(String batchId, String contextName) { 084 Batch batch = initBatchInternal(batchId); 085 return batch.getKey(); 086 } 087 088 protected Batch initBatchInternal(String batchId) { 089 if (StringUtils.isEmpty(batchId)) { 090 batchId = "batchId-" + UUID.randomUUID().toString(); 091 } else if (!Framework.getService(ConfigurationService.class).isBooleanPropertyTrue(CLIENT_BATCH_ID_FLAG)) { 092 throw new NuxeoException( 093 String.format( 094 "Cannot initialize upload batch with a given id since configuration property %s is not set to true", 095 CLIENT_BATCH_ID_FLAG)); 096 } 097 098 // That's the way of storing an empty entry 099 log.debug("Initializing batch with id " + batchId); 100 getTransientStore().setCompleted(batchId, false); 101 return new Batch(batchId); 102 } 103 104 public Batch getBatch(String batchId) { 105 Map<String, Serializable> batchEntryParams = getTransientStore().getParameters(batchId); 106 if (batchEntryParams == null) { 107 if (!hasBatch(batchId)) { 108 return null; 109 } 110 batchEntryParams = new HashMap<>(); 111 } 112 return new Batch(batchId, batchEntryParams); 113 } 114 115 @Override 116 public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException { 117 uploadInProgress.incrementAndGet(); 118 try { 119 Batch batch = getBatch(batchId); 120 if (batch == null) { 121 batch = initBatchInternal(batchId); 122 } 123 batch.addFile(index, is, name, mime); 124 log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey())); 125 } finally { 126 uploadInProgress.decrementAndGet(); 127 } 128 } 129 130 @Override 131 public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name, 132 String mime, long fileSize) throws IOException { 133 uploadInProgress.incrementAndGet(); 134 try { 135 Batch batch = getBatch(batchId); 136 if (batch == null) { 137 batch = initBatchInternal(batchId); 138 } 139 batch.addChunk(index, is, chunkCount, chunkIndex, name, mime, fileSize); 140 log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name, 141 batch.getKey())); 142 } finally { 143 uploadInProgress.decrementAndGet(); 144 } 145 } 146 147 @Override 148 public boolean hasBatch(String batchId) { 149 return batchId != null && getTransientStore().exists(batchId); 150 } 151 152 @Override 153 public List<Blob> getBlobs(String batchId) { 154 return getBlobs(batchId, 0); 155 } 156 157 @Override 158 public List<Blob> getBlobs(String batchId, int timeoutS) { 159 if (uploadInProgress.get() > 0 && timeoutS > 0) { 160 for (int i = 0; i < timeoutS * 5; i++) { 161 try { 162 Thread.sleep(200); 163 } catch (InterruptedException e) { 164 Thread.currentThread().interrupt(); 165 } 166 if (uploadInProgress.get() == 0) { 167 break; 168 } 169 } 170 } 171 Batch batch = getBatch(batchId); 172 if (batch == null) { 173 log.error("Unable to find batch with id " + batchId); 174 return Collections.emptyList(); 175 } 176 return batch.getBlobs(); 177 } 178 179 @Override 180 public Blob getBlob(String batchId, String fileIndex) { 181 return getBlob(batchId, fileIndex, 0); 182 } 183 184 @Override 185 public Blob getBlob(String batchId, String fileIndex, int timeoutS) { 186 Blob blob = getBatchBlob(batchId, fileIndex); 187 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 188 for (int i = 0; i < timeoutS * 5; i++) { 189 try { 190 Thread.sleep(200); 191 } catch (InterruptedException e) { 192 Thread.currentThread().interrupt(); 193 } 194 blob = getBatchBlob(batchId, fileIndex); 195 if (blob != null) { 196 break; 197 } 198 } 199 } 200 if (!hasBatch(batchId)) { 201 log.error("Unable to find batch with id " + batchId); 202 return null; 203 } 204 return blob; 205 } 206 207 protected Blob getBatchBlob(String batchId, String fileIndex) { 208 Blob blob = null; 209 Batch batch = getBatch(batchId); 210 if (batch != null) { 211 blob = batch.getBlob(fileIndex); 212 } 213 return blob; 214 } 215 216 @Override 217 public List<BatchFileEntry> getFileEntries(String batchId) { 218 Batch batch = getBatch(batchId); 219 if (batch == null) { 220 return null; 221 } 222 return batch.getFileEntries(); 223 } 224 225 @Override 226 public BatchFileEntry getFileEntry(String batchId, String fileIndex) { 227 Batch batch = getBatch(batchId); 228 if (batch == null) { 229 return null; 230 } 231 return batch.getFileEntry(fileIndex); 232 } 233 234 @Override 235 public void clean(String batchId) { 236 Batch batch = getBatch(batchId); 237 if (batch != null) { 238 batch.clean(); 239 } 240 } 241 242 @Override 243 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 244 Map<String, Object> contextParams, Map<String, Object> operationParams) { 245 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 246 if (blobs == null) { 247 String message = String.format("Unable to find batch associated with id '%s'", batchId); 248 log.error(message); 249 throw new NuxeoException(message); 250 } 251 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 252 } 253 254 @Override 255 public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session, 256 Map<String, Object> contextParams, Map<String, Object> operationParams) { 257 Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout()); 258 if (blob == null) { 259 String message = String.format( 260 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, 261 fileIndex); 262 log.error(message); 263 throw new NuxeoException(message); 264 } 265 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 266 } 267 268 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 269 Map<String, Object> contextParams, Map<String, Object> operationParams) { 270 if (contextParams == null) { 271 contextParams = new HashMap<>(); 272 } 273 if (operationParams == null) { 274 operationParams = new HashMap<>(); 275 } 276 277 OperationContext ctx = new OperationContext(session); 278 ctx.setInput(blobInput); 279 ctx.putAll(contextParams); 280 281 try { 282 Object result = null; 283 AutomationService as = Framework.getLocalService(AutomationService.class); 284 // Drag and Drop action category is accessible from the chain sub context as chain parameters 285 result = as.run(ctx, chainOrOperationId, operationParams); 286 return result; 287 } catch (OperationException e) { 288 log.error("Error while executing automation batch ", e); 289 throw new NuxeoException(e); 290 } 291 } 292 293 protected int getUploadWaitTimeout() { 294 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 295 try { 296 return Integer.parseInt(t); 297 } catch (NumberFormatException e) { 298 log.error("Wrong number format for upload wait timeout property", e); 299 return 5; 300 } 301 } 302 303 @Override 304 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 305 Map<String, Object> contextParams, Map<String, Object> operationParams) { 306 try { 307 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 308 } finally { 309 clean(batchId); 310 } 311 } 312}