001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Thierry Delprat <tdelprat@nuxeo.com>
016 *     Antoine Taillefer <ataillefer@nuxeo.com>
017 *
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.batch;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.Serializable;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.UUID;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import org.apache.commons.lang.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.automation.AutomationService;
035import org.nuxeo.ecm.automation.OperationContext;
036import org.nuxeo.ecm.automation.OperationException;
037import org.nuxeo.ecm.automation.core.util.BlobList;
038import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
039import org.nuxeo.ecm.core.api.Blob;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.transientstore.api.TransientStore;
043import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.model.DefaultComponent;
046import org.nuxeo.runtime.services.config.ConfigurationService;
047
048/**
049 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
050 *
051 * @since 5.4.2
052 */
053public class BatchManagerComponent extends DefaultComponent implements BatchManager {
054
055    protected static final Log log = LogFactory.getLog(BatchManagerComponent.class);
056
057    protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache";
058
059    protected static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId";
060
061    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
062
063    static {
064        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
065    }
066
067    @Override
068    public TransientStore getTransientStore() {
069        TransientStoreService tss = Framework.getService(TransientStoreService.class);
070        return tss.getStore(TRANSIENT_STORE_NAME);
071    }
072
073    @Override
074    public String initBatch() {
075        Batch batch = initBatchInternal(null);
076        return batch.getKey();
077    }
078
079    @Override
080    @Deprecated
081    public String initBatch(String batchId, String contextName) {
082        Batch batch = initBatchInternal(batchId);
083        return batch.getKey();
084    }
085
086    protected Batch initBatchInternal(String batchId) {
087        if (StringUtils.isEmpty(batchId)) {
088            batchId = "batchId-" + UUID.randomUUID().toString();
089        } else if (!Framework.getService(ConfigurationService.class).isBooleanPropertyTrue(CLIENT_BATCH_ID_FLAG)) {
090            throw new NuxeoException(
091                    String.format(
092                            "Cannot initialize upload batch with a given id since configuration property %s is not set to true",
093                            CLIENT_BATCH_ID_FLAG));
094        }
095
096        // That's the way of storing an empty entry
097        log.debug("Initializing batch with id " + batchId);
098        getTransientStore().setCompleted(batchId, false);
099        return new Batch(batchId);
100    }
101
102    public Batch getBatch(String batchId) {
103        Map<String, Serializable> batchEntryParams = getTransientStore().getParameters(batchId);
104        if (batchEntryParams == null) {
105            if (!hasBatch(batchId)) {
106                return null;
107            }
108            batchEntryParams = new HashMap<>();
109        }
110        return new Batch(batchId, batchEntryParams);
111    }
112
113    @Override
114    public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException {
115        uploadInProgress.incrementAndGet();
116        try {
117            Batch batch = getBatch(batchId);
118            if (batch == null) {
119                batch = initBatchInternal(batchId);
120            }
121            batch.addFile(index, is, name, mime);
122            log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey()));
123        } finally {
124            uploadInProgress.decrementAndGet();
125        }
126    }
127
128    @Override
129    public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name,
130            String mime, long fileSize) throws IOException {
131        uploadInProgress.incrementAndGet();
132        try {
133            Batch batch = getBatch(batchId);
134            if (batch == null) {
135                batch = initBatchInternal(batchId);
136            }
137            batch.addChunk(index, is, chunkCount, chunkIndex, name, mime, fileSize);
138            log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name,
139                    batch.getKey()));
140        } finally {
141            uploadInProgress.decrementAndGet();
142        }
143    }
144
145    @Override
146    public boolean hasBatch(String batchId) {
147        return batchId != null && getTransientStore().exists(batchId);
148    }
149
150    @Override
151    public List<Blob> getBlobs(String batchId) {
152        return getBlobs(batchId, 0);
153    }
154
155    @Override
156    public List<Blob> getBlobs(String batchId, int timeoutS) {
157        if (uploadInProgress.get() > 0 && timeoutS > 0) {
158            for (int i = 0; i < timeoutS * 5; i++) {
159                try {
160                    Thread.sleep(200);
161                } catch (InterruptedException e) {
162                    Thread.currentThread().interrupt();
163                }
164                if (uploadInProgress.get() == 0) {
165                    break;
166                }
167            }
168        }
169        Batch batch = getBatch(batchId);
170        if (batch == null) {
171            log.error("Unable to find batch with id " + batchId);
172            return Collections.emptyList();
173        }
174        return batch.getBlobs();
175    }
176
177    @Override
178    public Blob getBlob(String batchId, String fileIndex) {
179        return getBlob(batchId, fileIndex, 0);
180    }
181
182    @Override
183    public Blob getBlob(String batchId, String fileIndex, int timeoutS) {
184        Blob blob = getBatchBlob(batchId, fileIndex);
185        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
186            for (int i = 0; i < timeoutS * 5; i++) {
187                try {
188                    Thread.sleep(200);
189                } catch (InterruptedException e) {
190                    Thread.currentThread().interrupt();
191                }
192                blob = getBatchBlob(batchId, fileIndex);
193                if (blob != null) {
194                    break;
195                }
196            }
197        }
198        if (!hasBatch(batchId)) {
199            log.error("Unable to find batch with id " + batchId);
200            return null;
201        }
202        return blob;
203    }
204
205    protected Blob getBatchBlob(String batchId, String fileIndex) {
206        Blob blob = null;
207        Batch batch = getBatch(batchId);
208        if (batch != null) {
209            blob = batch.getBlob(fileIndex);
210        }
211        return blob;
212    }
213
214    @Override
215    public List<BatchFileEntry> getFileEntries(String batchId) {
216        Batch batch = getBatch(batchId);
217        if (batch == null) {
218            return null;
219        }
220        return batch.getFileEntries();
221    }
222
223    @Override
224    public BatchFileEntry getFileEntry(String batchId, String fileIndex) {
225        Batch batch = getBatch(batchId);
226        if (batch == null) {
227            return null;
228        }
229        return batch.getFileEntry(fileIndex);
230    }
231
232    @Override
233    public void clean(String batchId) {
234        Batch batch = getBatch(batchId);
235        if (batch != null) {
236            batch.clean();
237        }
238    }
239
240    @Override
241    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
242            Map<String, Object> contextParams, Map<String, Object> operationParams) {
243        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
244        if (blobs == null) {
245            String message = String.format("Unable to find batch associated with id '%s'", batchId);
246            log.error(message);
247            throw new NuxeoException(message);
248        }
249        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
250    }
251
252    @Override
253    public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session,
254            Map<String, Object> contextParams, Map<String, Object> operationParams) {
255        Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout());
256        if (blob == null) {
257            String message = String.format(
258                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId,
259                    fileIndex);
260            log.error(message);
261            throw new NuxeoException(message);
262        }
263        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
264    }
265
266    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
267            Map<String, Object> contextParams, Map<String, Object> operationParams) {
268        if (contextParams == null) {
269            contextParams = new HashMap<>();
270        }
271        if (operationParams == null) {
272            operationParams = new HashMap<>();
273        }
274
275        OperationContext ctx = new OperationContext(session);
276        ctx.setInput(blobInput);
277        ctx.putAll(contextParams);
278
279        try {
280            Object result = null;
281            AutomationService as = Framework.getLocalService(AutomationService.class);
282            // Drag and Drop action category is accessible from the chain sub context as chain parameters
283            result = as.run(ctx, chainOrOperationId, operationParams);
284            return result;
285        } catch (OperationException e) {
286            log.error("Error while executing automation batch ", e);
287            throw new NuxeoException(e);
288        }
289    }
290
291    protected int getUploadWaitTimeout() {
292        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
293        try {
294            return Integer.parseInt(t);
295        } catch (NumberFormatException e) {
296            log.error("Wrong number format for upload wait timeout property", e);
297            return 5;
298        }
299    }
300
301    @Override
302    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
303            Map<String, Object> contextParams, Map<String, Object> operationParams) {
304        try {
305            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
306        } finally {
307            clean(batchId);
308        }
309    }
310}