001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *
020 */
021package org.nuxeo.ecm.automation.server.jaxrs.batch;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.Serializable;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.UUID;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.commons.lang.StringUtils;
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.automation.AutomationService;
037import org.nuxeo.ecm.automation.OperationContext;
038import org.nuxeo.ecm.automation.OperationException;
039import org.nuxeo.ecm.automation.core.util.BlobList;
040import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
041import org.nuxeo.ecm.core.api.Blob;
042import org.nuxeo.ecm.core.api.CoreSession;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.transientstore.api.TransientStore;
045import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.model.DefaultComponent;
048import org.nuxeo.runtime.services.config.ConfigurationService;
049
050/**
051 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
052 *
053 * @since 5.4.2
054 */
055public class BatchManagerComponent extends DefaultComponent implements BatchManager {
056
057    protected static final Log log = LogFactory.getLog(BatchManagerComponent.class);
058
059    protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache";
060
061    protected static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId";
062
063    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
064
065    static {
066        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
067    }
068
069    @Override
070    public TransientStore getTransientStore() {
071        TransientStoreService tss = Framework.getService(TransientStoreService.class);
072        return tss.getStore(TRANSIENT_STORE_NAME);
073    }
074
075    @Override
076    public String initBatch() {
077        Batch batch = initBatchInternal(null);
078        return batch.getKey();
079    }
080
081    @Override
082    @Deprecated
083    public String initBatch(String batchId, String contextName) {
084        Batch batch = initBatchInternal(batchId);
085        return batch.getKey();
086    }
087
088    protected Batch initBatchInternal(String batchId) {
089        if (StringUtils.isEmpty(batchId)) {
090            batchId = "batchId-" + UUID.randomUUID().toString();
091        } else if (!Framework.getService(ConfigurationService.class).isBooleanPropertyTrue(CLIENT_BATCH_ID_FLAG)) {
092            throw new NuxeoException(
093                    String.format(
094                            "Cannot initialize upload batch with a given id since configuration property %s is not set to true",
095                            CLIENT_BATCH_ID_FLAG));
096        }
097
098        // That's the way of storing an empty entry
099        log.debug("Initializing batch with id " + batchId);
100        getTransientStore().setCompleted(batchId, false);
101        return new Batch(batchId);
102    }
103
104    public Batch getBatch(String batchId) {
105        Map<String, Serializable> batchEntryParams = getTransientStore().getParameters(batchId);
106        if (batchEntryParams == null) {
107            if (!hasBatch(batchId)) {
108                return null;
109            }
110            batchEntryParams = new HashMap<>();
111        }
112        return new Batch(batchId, batchEntryParams);
113    }
114
115    @Override
116    public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException {
117        uploadInProgress.incrementAndGet();
118        try {
119            Batch batch = getBatch(batchId);
120            if (batch == null) {
121                batch = initBatchInternal(batchId);
122            }
123            batch.addFile(index, is, name, mime);
124            log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey()));
125        } finally {
126            uploadInProgress.decrementAndGet();
127        }
128    }
129
130    @Override
131    public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name,
132            String mime, long fileSize) throws IOException {
133        uploadInProgress.incrementAndGet();
134        try {
135            Batch batch = getBatch(batchId);
136            if (batch == null) {
137                batch = initBatchInternal(batchId);
138            }
139            batch.addChunk(index, is, chunkCount, chunkIndex, name, mime, fileSize);
140            log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name,
141                    batch.getKey()));
142        } finally {
143            uploadInProgress.decrementAndGet();
144        }
145    }
146
147    @Override
148    public boolean hasBatch(String batchId) {
149        return batchId != null && getTransientStore().exists(batchId);
150    }
151
152    @Override
153    public List<Blob> getBlobs(String batchId) {
154        return getBlobs(batchId, 0);
155    }
156
157    @Override
158    public List<Blob> getBlobs(String batchId, int timeoutS) {
159        if (uploadInProgress.get() > 0 && timeoutS > 0) {
160            for (int i = 0; i < timeoutS * 5; i++) {
161                try {
162                    Thread.sleep(200);
163                } catch (InterruptedException e) {
164                    Thread.currentThread().interrupt();
165                }
166                if (uploadInProgress.get() == 0) {
167                    break;
168                }
169            }
170        }
171        Batch batch = getBatch(batchId);
172        if (batch == null) {
173            log.error("Unable to find batch with id " + batchId);
174            return Collections.emptyList();
175        }
176        return batch.getBlobs();
177    }
178
179    @Override
180    public Blob getBlob(String batchId, String fileIndex) {
181        return getBlob(batchId, fileIndex, 0);
182    }
183
184    @Override
185    public Blob getBlob(String batchId, String fileIndex, int timeoutS) {
186        Blob blob = getBatchBlob(batchId, fileIndex);
187        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
188            for (int i = 0; i < timeoutS * 5; i++) {
189                try {
190                    Thread.sleep(200);
191                } catch (InterruptedException e) {
192                    Thread.currentThread().interrupt();
193                }
194                blob = getBatchBlob(batchId, fileIndex);
195                if (blob != null) {
196                    break;
197                }
198            }
199        }
200        if (!hasBatch(batchId)) {
201            log.error("Unable to find batch with id " + batchId);
202            return null;
203        }
204        return blob;
205    }
206
207    protected Blob getBatchBlob(String batchId, String fileIndex) {
208        Blob blob = null;
209        Batch batch = getBatch(batchId);
210        if (batch != null) {
211            blob = batch.getBlob(fileIndex);
212        }
213        return blob;
214    }
215
216    @Override
217    public List<BatchFileEntry> getFileEntries(String batchId) {
218        Batch batch = getBatch(batchId);
219        if (batch == null) {
220            return null;
221        }
222        return batch.getFileEntries();
223    }
224
225    @Override
226    public BatchFileEntry getFileEntry(String batchId, String fileIndex) {
227        Batch batch = getBatch(batchId);
228        if (batch == null) {
229            return null;
230        }
231        return batch.getFileEntry(fileIndex);
232    }
233
234    @Override
235    public void clean(String batchId) {
236        Batch batch = getBatch(batchId);
237        if (batch != null) {
238            batch.clean();
239        }
240    }
241
242    @Override
243    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
244            Map<String, Object> contextParams, Map<String, Object> operationParams) {
245        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
246        if (blobs == null) {
247            String message = String.format("Unable to find batch associated with id '%s'", batchId);
248            log.error(message);
249            throw new NuxeoException(message);
250        }
251        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
252    }
253
254    @Override
255    public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session,
256            Map<String, Object> contextParams, Map<String, Object> operationParams) {
257        Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout());
258        if (blob == null) {
259            String message = String.format(
260                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId,
261                    fileIndex);
262            log.error(message);
263            throw new NuxeoException(message);
264        }
265        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
266    }
267
268    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
269            Map<String, Object> contextParams, Map<String, Object> operationParams) {
270        if (contextParams == null) {
271            contextParams = new HashMap<>();
272        }
273        if (operationParams == null) {
274            operationParams = new HashMap<>();
275        }
276
277        OperationContext ctx = new OperationContext(session);
278        ctx.setInput(blobInput);
279        ctx.putAll(contextParams);
280
281        try {
282            Object result = null;
283            AutomationService as = Framework.getLocalService(AutomationService.class);
284            // Drag and Drop action category is accessible from the chain sub context as chain parameters
285            result = as.run(ctx, chainOrOperationId, operationParams);
286            return result;
287        } catch (OperationException e) {
288            log.error("Error while executing automation batch ", e);
289            throw new NuxeoException(e);
290        }
291    }
292
293    protected int getUploadWaitTimeout() {
294        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
295        try {
296            return Integer.parseInt(t);
297        } catch (NumberFormatException e) {
298            log.error("Wrong number format for upload wait timeout property", e);
299            return 5;
300        }
301    }
302
303    @Override
304    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
305            Map<String, Object> contextParams, Map<String, Object> operationParams) {
306        try {
307            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
308        } finally {
309            clean(batchId);
310        }
311    }
312}