001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Thierry Delprat <tdelprat@nuxeo.com>
016 *     Antoine Taillefer <ataillefer@nuxeo.com>
017 *
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.batch;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.UUID;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.automation.AutomationService;
033import org.nuxeo.ecm.automation.OperationContext;
034import org.nuxeo.ecm.automation.OperationException;
035import org.nuxeo.ecm.automation.core.util.BlobList;
036import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.transientstore.api.StorageEntry;
041import org.nuxeo.ecm.core.transientstore.api.TransientStore;
042import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
043import org.nuxeo.runtime.api.Framework;
044import org.nuxeo.runtime.model.DefaultComponent;
045
046/**
047 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
048 *
049 * @since 5.4.2
050 */
051public class BatchManagerComponent extends DefaultComponent implements BatchManager {
052
053    protected static final String DEFAULT_CONTEXT = "None";
054
055    protected static final Log log = LogFactory.getLog(BatchManagerComponent.class);
056
057    protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache";
058
059    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
060
061    static {
062        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
063    }
064
065    @Override
066    public TransientStore getTransientStore() {
067        TransientStoreService tss = Framework.getService(TransientStoreService.class);
068        return tss.getStore(TRANSIENT_STORE_NAME);
069    }
070
071    @Override
072    public String initBatch() {
073        return initBatch(null, null);
074    }
075
076    @Override
077    public String initBatch(String batchId, String contextName) {
078        Batch batch = initBatchInternal(batchId, contextName);
079        return batch.getId();
080    }
081
082    protected Batch initBatchInternal(String batchId, String contextName) {
083        if (batchId != null) {
084            StorageEntry entry = getTransientStore().get(batchId);
085            if (entry != null) {
086                return (Batch) entry;
087            }
088        }
089
090        if (batchId == null || batchId.isEmpty()) {
091            batchId = "batchId-" + UUID.randomUUID().toString();
092        }
093        if (contextName == null || contextName.isEmpty()) {
094            contextName = DEFAULT_CONTEXT;
095        }
096
097        Batch newBatch = new Batch(batchId);
098        getTransientStore().put(newBatch);
099        return newBatch;
100    }
101
102    @Override
103    public void addStream(String batchId, String idx, InputStream is, String name, String mime) throws IOException {
104        uploadInProgress.incrementAndGet();
105        try {
106            Batch batch = (Batch) getTransientStore().get(batchId);
107            if (batch == null) {
108                batch = initBatchInternal(batchId, null);
109            }
110            String fileEntryId = batch.addFile(idx, is, name, mime);
111
112            // Need to synchronize manipulation of the batch TransientStore entry params
113            synchronized (this) {
114                batch = (Batch) getTransientStore().get(batch.getId());
115                batch.put(idx, fileEntryId);
116                getTransientStore().put(batch);
117            }
118            log.debug(String.format("Added file %s [%s] to batch %s", idx, name, batch.getId()));
119        } finally {
120            uploadInProgress.decrementAndGet();
121        }
122    }
123
124    @Override
125    public void addStream(String batchId, String idx, InputStream is, int chunkCount, int chunkIdx, String name,
126            String mime, long fileSize) throws IOException {
127        uploadInProgress.incrementAndGet();
128        try {
129            Batch batch = (Batch) getTransientStore().get(batchId);
130            if (batch == null) {
131                batch = initBatchInternal(batchId, null);
132            }
133            String fileEntryId = batch.addChunk(idx, is, chunkCount, chunkIdx, name, mime, fileSize);
134
135            // Need to synchronize manipulation of the batch TransientStore entry params
136            synchronized (this) {
137                batch = (Batch) getTransientStore().get(batch.getId());
138                batch.put(idx, fileEntryId);
139                getTransientStore().put(batch);
140            }
141            log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIdx, idx, name, batch.getId()));
142        } finally {
143            uploadInProgress.decrementAndGet();
144        }
145    }
146
147    @Override
148    public boolean hasBatch(String batchId) {
149        return batchId != null && getTransientStore().get(batchId) != null;
150    }
151
152    @Override
153    public List<Blob> getBlobs(String batchId) {
154        return getBlobs(batchId, 0);
155    }
156
157    @Override
158    public List<Blob> getBlobs(String batchId, int timeoutS) {
159        if (uploadInProgress.get() > 0 && timeoutS > 0) {
160            for (int i = 0; i < timeoutS * 5; i++) {
161                try {
162                    Thread.sleep(200);
163                } catch (InterruptedException e) {
164                    Thread.currentThread().interrupt();
165                }
166                if (uploadInProgress.get() == 0) {
167                    break;
168                }
169            }
170        }
171        Batch batch = (Batch) getTransientStore().get(batchId);
172        if (batch == null) {
173            log.error("Unable to find batch with id " + batchId);
174            return Collections.emptyList();
175        }
176        return batch.getBlobs();
177    }
178
179    @Override
180    public Blob getBlob(String batchId, String fileId) {
181        return getBlob(batchId, fileId, 0);
182    }
183
184    @Override
185    public Blob getBlob(String batchId, String fileId, int timeoutS) {
186        Blob blob = getBatchBlob(batchId, fileId);
187        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
188            for (int i = 0; i < timeoutS * 5; i++) {
189                try {
190                    Thread.sleep(200);
191                } catch (InterruptedException e) {
192                    Thread.currentThread().interrupt();
193                }
194                blob = getBatchBlob(batchId, fileId);
195                if (blob != null) {
196                    break;
197                }
198            }
199        }
200        if (!hasBatch(batchId)) {
201            log.error("Unable to find batch with id " + batchId);
202            return null;
203        }
204        return blob;
205    }
206
207    protected Blob getBatchBlob(String batchId, String fileId) {
208        Blob blob = null;
209        Batch batch = (Batch) getTransientStore().get(batchId);
210        if (batch != null) {
211            blob = batch.getBlob(fileId);
212        }
213        return blob;
214    }
215
216    @Override
217    public List<BatchFileEntry> getFileEntries(String batchId) {
218        Batch batch = (Batch) getTransientStore().get(batchId);
219        if (batch == null) {
220            return null;
221        }
222        return batch.getFileEntries();
223    }
224
225    @Override
226    public BatchFileEntry getFileEntry(String batchId, String fileId) {
227        Batch batch = (Batch) getTransientStore().get(batchId);
228        if (batch == null) {
229            return null;
230        }
231        return batch.getFileEntry(fileId);
232    }
233
234    @Override
235    public void clean(String batchId) {
236        Batch batch = (Batch) getTransientStore().get(batchId);
237        if (batch != null) {
238            batch.clean();
239        }
240    }
241
242    @Override
243    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
244            Map<String, Object> contextParams, Map<String, Object> operationParams) {
245        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
246        if (blobs == null) {
247            String message = String.format("Unable to find batch associated with id '%s'", batchId);
248            log.error(message);
249            throw new NuxeoException(message);
250        }
251        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
252    }
253
254    @Override
255    public Object execute(String batchId, String fileIdx, String chainOrOperationId, CoreSession session,
256            Map<String, Object> contextParams, Map<String, Object> operationParams) {
257        Blob blob = getBlob(batchId, fileIdx, getUploadWaitTimeout());
258        if (blob == null) {
259            String message = String.format(
260                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, fileIdx);
261            log.error(message);
262            throw new NuxeoException(message);
263        }
264        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
265    }
266
267    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
268            Map<String, Object> contextParams, Map<String, Object> operationParams) {
269        if (contextParams == null) {
270            contextParams = new HashMap<>();
271        }
272        if (operationParams == null) {
273            operationParams = new HashMap<>();
274        }
275
276        OperationContext ctx = new OperationContext(session);
277        ctx.setInput(blobInput);
278        ctx.putAll(contextParams);
279
280        try {
281            Object result = null;
282            AutomationService as = Framework.getLocalService(AutomationService.class);
283            // Drag and Drop action category is accessible from the chain sub context as chain parameters
284            result = as.run(ctx, chainOrOperationId, operationParams);
285            return result;
286        } catch (OperationException e) {
287            log.error("Error while executing automation batch ", e);
288            throw new NuxeoException(e);
289        }
290    }
291
292    protected int getUploadWaitTimeout() {
293        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
294        try {
295            return Integer.parseInt(t);
296        } catch (NumberFormatException e) {
297            log.error("Wrong number format for upload wait timeout property", e);
298            return 5;
299        }
300    }
301
302    @Override
303    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
304            Map<String, Object> contextParams, Map<String, Object> operationParams) {
305        try {
306            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
307        } finally {
308            clean(batchId);
309        }
310    }
311}