001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *
020 */
021package org.nuxeo.ecm.automation.server.jaxrs.batch;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.Serializable;
026import java.security.Principal;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.UUID;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.automation.AutomationService;
038import org.nuxeo.ecm.automation.OperationContext;
039import org.nuxeo.ecm.automation.OperationException;
040import org.nuxeo.ecm.automation.core.util.BlobList;
041import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
042import org.nuxeo.ecm.automation.server.AutomationServer;
043import org.nuxeo.ecm.automation.server.RestBinding;
044import org.nuxeo.ecm.core.api.Blob;
045import org.nuxeo.ecm.core.api.CoreSession;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.api.NuxeoPrincipal;
048import org.nuxeo.ecm.core.transientstore.api.TransientStore;
049import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
050import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.model.DefaultComponent;
053import org.nuxeo.runtime.services.config.ConfigurationService;
054
055/**
056 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
057 *
058 * @since 5.4.2
059 */
060public class BatchManagerComponent extends DefaultComponent implements BatchManager {
061
062    protected static final Log log = LogFactory.getLog(BatchManagerComponent.class);
063
064    protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache";
065
066    protected static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId";
067
068    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
069
070    static {
071        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
072    }
073
074    @Override
075    public TransientStore getTransientStore() {
076        TransientStoreService tss = Framework.getService(TransientStoreService.class);
077        return tss.getStore(TRANSIENT_STORE_NAME);
078    }
079
080    @Override
081    public String initBatch() {
082        Batch batch = initBatchInternal(null);
083        return batch.getKey();
084    }
085
086    @Override
087    @Deprecated
088    public String initBatch(String batchId, String contextName) {
089        Batch batch = initBatchInternal(batchId);
090        return batch.getKey();
091    }
092
093    protected Batch initBatchInternal(String batchId) {
094        if (StringUtils.isEmpty(batchId)) {
095            batchId = "batchId-" + UUID.randomUUID().toString();
096        } else if (!Framework.getService(ConfigurationService.class).isBooleanPropertyTrue(CLIENT_BATCH_ID_FLAG)) {
097            throw new NuxeoException(String.format(
098                    "Cannot initialize upload batch with a given id since configuration property %s is not set to true",
099                    CLIENT_BATCH_ID_FLAG));
100        }
101
102        // That's the way of storing an empty entry
103        log.debug("Initializing batch with id " + batchId);
104        getTransientStore().setCompleted(batchId, false);
105        return new Batch(batchId);
106    }
107
108    public Batch getBatch(String batchId) {
109        Map<String, Serializable> batchEntryParams = getTransientStore().getParameters(batchId);
110        if (batchEntryParams == null) {
111            if (!hasBatch(batchId)) {
112                return null;
113            }
114            batchEntryParams = new HashMap<>();
115        }
116        return new Batch(batchId, batchEntryParams);
117    }
118
119    @Override
120    public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException {
121        uploadInProgress.incrementAndGet();
122        try {
123            Batch batch = getBatch(batchId);
124            if (batch == null) {
125                batch = initBatchInternal(batchId);
126            }
127            batch.addFile(index, is, name, mime);
128            log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey()));
129        } finally {
130            uploadInProgress.decrementAndGet();
131        }
132    }
133
134    @Override
135    public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name,
136            String mime, long fileSize) throws IOException {
137        uploadInProgress.incrementAndGet();
138        try {
139            Batch batch = getBatch(batchId);
140            if (batch == null) {
141                batch = initBatchInternal(batchId);
142            }
143            batch.addChunk(index, is, chunkCount, chunkIndex, name, mime, fileSize);
144            log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name,
145                    batch.getKey()));
146        } finally {
147            uploadInProgress.decrementAndGet();
148        }
149    }
150
151    @Override
152    public boolean hasBatch(String batchId) {
153        return batchId != null && getTransientStore().exists(batchId);
154    }
155
156    @Override
157    public List<Blob> getBlobs(String batchId) {
158        return getBlobs(batchId, 0);
159    }
160
161    @Override
162    public List<Blob> getBlobs(String batchId, int timeoutS) {
163        if (uploadInProgress.get() > 0 && timeoutS > 0) {
164            for (int i = 0; i < timeoutS * 5; i++) {
165                try {
166                    Thread.sleep(200);
167                } catch (InterruptedException e) {
168                    Thread.currentThread().interrupt();
169                }
170                if (uploadInProgress.get() == 0) {
171                    break;
172                }
173            }
174        }
175        Batch batch = getBatch(batchId);
176        if (batch == null) {
177            log.error("Unable to find batch with id " + batchId);
178            return Collections.emptyList();
179        }
180        return batch.getBlobs();
181    }
182
183    @Override
184    public Blob getBlob(String batchId, String fileIndex) {
185        return getBlob(batchId, fileIndex, 0);
186    }
187
188    @Override
189    public Blob getBlob(String batchId, String fileIndex, int timeoutS) {
190        Blob blob = getBatchBlob(batchId, fileIndex);
191        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
192            for (int i = 0; i < timeoutS * 5; i++) {
193                try {
194                    Thread.sleep(200);
195                } catch (InterruptedException e) {
196                    Thread.currentThread().interrupt();
197                }
198                blob = getBatchBlob(batchId, fileIndex);
199                if (blob != null) {
200                    break;
201                }
202            }
203        }
204        if (!hasBatch(batchId)) {
205            log.error("Unable to find batch with id " + batchId);
206            return null;
207        }
208        return blob;
209    }
210
211    protected Blob getBatchBlob(String batchId, String fileIndex) {
212        Blob blob = null;
213        Batch batch = getBatch(batchId);
214        if (batch != null) {
215            blob = batch.getBlob(fileIndex);
216        }
217        return blob;
218    }
219
220    @Override
221    public List<BatchFileEntry> getFileEntries(String batchId) {
222        Batch batch = getBatch(batchId);
223        if (batch == null) {
224            return null;
225        }
226        return batch.getFileEntries();
227    }
228
229    @Override
230    public BatchFileEntry getFileEntry(String batchId, String fileIndex) {
231        Batch batch = getBatch(batchId);
232        if (batch == null) {
233            return null;
234        }
235        return batch.getFileEntry(fileIndex);
236    }
237
238    @Override
239    public void clean(String batchId) {
240        Batch batch = getBatch(batchId);
241        if (batch != null) {
242            batch.clean();
243        }
244    }
245
246    @Override
247    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
248            Map<String, Object> contextParams, Map<String, Object> operationParams) {
249        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
250        if (blobs == null) {
251            String message = String.format("Unable to find batch associated with id '%s'", batchId);
252            log.error(message);
253            throw new NuxeoException(message);
254        }
255        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
256    }
257
258    @Override
259    public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session,
260            Map<String, Object> contextParams, Map<String, Object> operationParams) {
261        Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout());
262        if (blob == null) {
263            String message = String.format(
264                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId,
265                    fileIndex);
266            log.error(message);
267            throw new NuxeoException(message);
268        }
269        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
270    }
271
272    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
273            Map<String, Object> contextParams, Map<String, Object> operationParams) {
274        if (contextParams == null) {
275            contextParams = new HashMap<>();
276        }
277        if (operationParams == null) {
278            operationParams = new HashMap<>();
279        }
280
281        try (OperationContext ctx = new OperationContext(session)) {
282
283            AutomationServer server = Framework.getService(AutomationServer.class);
284            RestBinding binding = server.getOperationBinding(chainOrOperationId);
285
286            if (binding != null && binding.isAdministrator()) {
287                Principal principal = ctx.getPrincipal();
288                if (!(principal instanceof NuxeoPrincipal && ((NuxeoPrincipal) principal).isAdministrator())) {
289                    String message = "Not allowed. You must be administrator to use this operation";
290                    log.error(message);
291                    throw new WebSecurityException(message);
292                }
293            }
294
295            ctx.setInput(blobInput);
296            ctx.putAll(contextParams);
297
298            Object result = null;
299            AutomationService as = Framework.getLocalService(AutomationService.class);
300            // Drag and Drop action category is accessible from the chain sub context as chain parameters
301            result = as.run(ctx, chainOrOperationId, operationParams);
302            return result;
303        } catch (OperationException e) {
304            log.error("Error while executing automation batch ", e);
305            throw new NuxeoException(e);
306        }
307    }
308
309    protected int getUploadWaitTimeout() {
310        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
311        try {
312            return Integer.parseInt(t);
313        } catch (NumberFormatException e) {
314            log.error("Wrong number format for upload wait timeout property", e);
315            return 5;
316        }
317    }
318
319    @Override
320    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
321            Map<String, Object> contextParams, Map<String, Object> operationParams) {
322        try {
323            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
324        } finally {
325            clean(batchId);
326        }
327    }
328
329    @Override
330    public boolean removeFileEntry(String batchId, String filedIdx) {
331        Batch batch = getBatch(batchId);
332        if (batch == null) {
333            return false;
334        }
335        return batch.removeFileEntry(filedIdx, getTransientStore());
336    }
337}