001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 * 020 */ 021package org.nuxeo.ecm.automation.server.jaxrs.batch; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.Serializable; 026import java.security.Principal; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.UUID; 032import java.util.concurrent.atomic.AtomicInteger; 033 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.automation.AutomationService; 038import org.nuxeo.ecm.automation.OperationContext; 039import org.nuxeo.ecm.automation.OperationException; 040import org.nuxeo.ecm.automation.core.util.BlobList; 041import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 042import org.nuxeo.ecm.automation.server.AutomationServer; 043import org.nuxeo.ecm.automation.server.RestBinding; 044import org.nuxeo.ecm.core.api.Blob; 045import org.nuxeo.ecm.core.api.CoreSession; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.api.NuxeoPrincipal; 048import org.nuxeo.ecm.core.transientstore.api.TransientStore; 049import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 050import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException; 051import org.nuxeo.runtime.api.Framework; 052import org.nuxeo.runtime.model.DefaultComponent; 053import org.nuxeo.runtime.services.config.ConfigurationService; 054 055/** 056 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 057 * 058 * @since 5.4.2 059 */ 060public class BatchManagerComponent extends DefaultComponent implements BatchManager { 061 062 protected static final Log log = LogFactory.getLog(BatchManagerComponent.class); 063 064 protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache"; 065 066 protected static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId"; 067 068 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 069 070 static { 071 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 072 } 073 074 @Override 075 public TransientStore getTransientStore() { 076 TransientStoreService tss = Framework.getService(TransientStoreService.class); 077 return tss.getStore(TRANSIENT_STORE_NAME); 078 } 079 080 @Override 081 public String initBatch() { 082 Batch batch = initBatchInternal(null); 083 return batch.getKey(); 084 } 085 086 @Override 087 @Deprecated 088 public String initBatch(String batchId, String contextName) { 089 Batch batch = initBatchInternal(batchId); 090 return batch.getKey(); 091 } 092 093 protected Batch initBatchInternal(String batchId) { 094 if (StringUtils.isEmpty(batchId)) { 095 batchId = "batchId-" + UUID.randomUUID().toString(); 096 } else if (!Framework.getService(ConfigurationService.class).isBooleanPropertyTrue(CLIENT_BATCH_ID_FLAG)) { 097 throw new NuxeoException(String.format( 098 "Cannot initialize upload batch with a given id since configuration property %s is not set to true", 099 CLIENT_BATCH_ID_FLAG)); 100 } 101 102 // That's the way of storing an empty entry 103 log.debug("Initializing batch with id " + batchId); 104 getTransientStore().setCompleted(batchId, false); 105 return new Batch(batchId); 106 } 107 108 public Batch getBatch(String batchId) { 109 Map<String, Serializable> batchEntryParams = getTransientStore().getParameters(batchId); 110 if (batchEntryParams == null) { 111 if (!hasBatch(batchId)) { 112 return null; 113 } 114 batchEntryParams = new HashMap<>(); 115 } 116 return new Batch(batchId, batchEntryParams); 117 } 118 119 @Override 120 public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException { 121 uploadInProgress.incrementAndGet(); 122 try { 123 Batch batch = getBatch(batchId); 124 if (batch == null) { 125 batch = initBatchInternal(batchId); 126 } 127 batch.addFile(index, is, name, mime); 128 log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey())); 129 } finally { 130 uploadInProgress.decrementAndGet(); 131 } 132 } 133 134 @Override 135 public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name, 136 String mime, long fileSize) throws IOException { 137 uploadInProgress.incrementAndGet(); 138 try { 139 Batch batch = getBatch(batchId); 140 if (batch == null) { 141 batch = initBatchInternal(batchId); 142 } 143 batch.addChunk(index, is, chunkCount, chunkIndex, name, mime, fileSize); 144 log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name, 145 batch.getKey())); 146 } finally { 147 uploadInProgress.decrementAndGet(); 148 } 149 } 150 151 @Override 152 public boolean hasBatch(String batchId) { 153 return batchId != null && getTransientStore().exists(batchId); 154 } 155 156 @Override 157 public List<Blob> getBlobs(String batchId) { 158 return getBlobs(batchId, 0); 159 } 160 161 @Override 162 public List<Blob> getBlobs(String batchId, int timeoutS) { 163 if (uploadInProgress.get() > 0 && timeoutS > 0) { 164 for (int i = 0; i < timeoutS * 5; i++) { 165 try { 166 Thread.sleep(200); 167 } catch (InterruptedException e) { 168 Thread.currentThread().interrupt(); 169 } 170 if (uploadInProgress.get() == 0) { 171 break; 172 } 173 } 174 } 175 Batch batch = getBatch(batchId); 176 if (batch == null) { 177 log.error("Unable to find batch with id " + batchId); 178 return Collections.emptyList(); 179 } 180 return batch.getBlobs(); 181 } 182 183 @Override 184 public Blob getBlob(String batchId, String fileIndex) { 185 return getBlob(batchId, fileIndex, 0); 186 } 187 188 @Override 189 public Blob getBlob(String batchId, String fileIndex, int timeoutS) { 190 Blob blob = getBatchBlob(batchId, fileIndex); 191 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 192 for (int i = 0; i < timeoutS * 5; i++) { 193 try { 194 Thread.sleep(200); 195 } catch (InterruptedException e) { 196 Thread.currentThread().interrupt(); 197 } 198 blob = getBatchBlob(batchId, fileIndex); 199 if (blob != null) { 200 break; 201 } 202 } 203 } 204 if (!hasBatch(batchId)) { 205 log.error("Unable to find batch with id " + batchId); 206 return null; 207 } 208 return blob; 209 } 210 211 protected Blob getBatchBlob(String batchId, String fileIndex) { 212 Blob blob = null; 213 Batch batch = getBatch(batchId); 214 if (batch != null) { 215 blob = batch.getBlob(fileIndex); 216 } 217 return blob; 218 } 219 220 @Override 221 public List<BatchFileEntry> getFileEntries(String batchId) { 222 Batch batch = getBatch(batchId); 223 if (batch == null) { 224 return null; 225 } 226 return batch.getFileEntries(); 227 } 228 229 @Override 230 public BatchFileEntry getFileEntry(String batchId, String fileIndex) { 231 Batch batch = getBatch(batchId); 232 if (batch == null) { 233 return null; 234 } 235 return batch.getFileEntry(fileIndex); 236 } 237 238 @Override 239 public void clean(String batchId) { 240 Batch batch = getBatch(batchId); 241 if (batch != null) { 242 batch.clean(); 243 } 244 } 245 246 @Override 247 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 248 Map<String, Object> contextParams, Map<String, Object> operationParams) { 249 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 250 if (blobs == null) { 251 String message = String.format("Unable to find batch associated with id '%s'", batchId); 252 log.error(message); 253 throw new NuxeoException(message); 254 } 255 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 256 } 257 258 @Override 259 public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session, 260 Map<String, Object> contextParams, Map<String, Object> operationParams) { 261 Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout()); 262 if (blob == null) { 263 String message = String.format( 264 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, 265 fileIndex); 266 log.error(message); 267 throw new NuxeoException(message); 268 } 269 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 270 } 271 272 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 273 Map<String, Object> contextParams, Map<String, Object> operationParams) { 274 if (contextParams == null) { 275 contextParams = new HashMap<>(); 276 } 277 if (operationParams == null) { 278 operationParams = new HashMap<>(); 279 } 280 281 try (OperationContext ctx = new OperationContext(session)) { 282 283 AutomationServer server = Framework.getService(AutomationServer.class); 284 RestBinding binding = server.getOperationBinding(chainOrOperationId); 285 286 if (binding != null && binding.isAdministrator()) { 287 Principal principal = ctx.getPrincipal(); 288 if (!(principal instanceof NuxeoPrincipal && ((NuxeoPrincipal) principal).isAdministrator())) { 289 String message = "Not allowed. You must be administrator to use this operation"; 290 log.error(message); 291 throw new WebSecurityException(message); 292 } 293 } 294 295 ctx.setInput(blobInput); 296 ctx.putAll(contextParams); 297 298 Object result = null; 299 AutomationService as = Framework.getLocalService(AutomationService.class); 300 // Drag and Drop action category is accessible from the chain sub context as chain parameters 301 result = as.run(ctx, chainOrOperationId, operationParams); 302 return result; 303 } catch (OperationException e) { 304 log.error("Error while executing automation batch ", e); 305 throw new NuxeoException(e); 306 } 307 } 308 309 protected int getUploadWaitTimeout() { 310 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 311 try { 312 return Integer.parseInt(t); 313 } catch (NumberFormatException e) { 314 log.error("Wrong number format for upload wait timeout property", e); 315 return 5; 316 } 317 } 318 319 @Override 320 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 321 Map<String, Object> contextParams, Map<String, Object> operationParams) { 322 try { 323 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 324 } finally { 325 clean(batchId); 326 } 327 } 328 329 @Override 330 public boolean removeFileEntry(String batchId, String filedIdx) { 331 Batch batch = getBatch(batchId); 332 if (batch == null) { 333 return false; 334 } 335 return batch.removeFileEntry(filedIdx, getTransientStore()); 336 } 337}