001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *
020 */
021package org.nuxeo.ecm.automation.server.jaxrs.batch;
022
023import static org.apache.commons.lang3.StringUtils.isEmpty;
024
025import java.io.IOException;
026import java.io.InputStream;
027import java.security.Principal;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Set;
034import java.util.concurrent.atomic.AtomicInteger;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.automation.AutomationService;
039import org.nuxeo.ecm.automation.OperationContext;
040import org.nuxeo.ecm.automation.OperationException;
041import org.nuxeo.ecm.automation.core.util.BlobList;
042import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
043import org.nuxeo.ecm.automation.server.AutomationServer;
044import org.nuxeo.ecm.automation.server.RestBinding;
045import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchHandlerDescriptor;
046import org.nuxeo.ecm.core.api.Blob;
047import org.nuxeo.ecm.core.api.Blobs;
048import org.nuxeo.ecm.core.api.CoreSession;
049import org.nuxeo.ecm.core.api.NuxeoException;
050import org.nuxeo.ecm.core.api.NuxeoPrincipal;
051import org.nuxeo.ecm.core.transientstore.api.TransientStore;
052import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
053import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.model.ComponentContext;
056import org.nuxeo.runtime.model.ComponentInstance;
057import org.nuxeo.runtime.model.DefaultComponent;
058
059/**
060 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
061 *
062 * @since 5.4.2
063 */
064public class BatchManagerComponent extends DefaultComponent implements BatchManager {
065
066    protected static final Log log = LogFactory.getLog(BatchManagerComponent.class);
067
068    protected static final String TRANSIENT_STORE_NAME = "BatchManagerCache";
069
070    public static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId";
071
072    /**
073     * The default batch handler name.
074     *
075     * @since 10.1
076     */
077    public static final String DEFAULT_BATCH_HANDLER = "default";
078
079    /** @since 10.1 */
080    public static final String EP_BATCH_HANDLER = "handlers";
081
082    protected Map<String, BatchHandler> handlers;
083
084    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
085
086    static {
087        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
088    }
089
090    @Override
091    public void activate(ComponentContext context) {
092        super.activate(context);
093        handlers = new HashMap<>();
094    }
095
096    @Override
097    public void deactivate(ComponentContext context) {
098        handlers = null;
099        super.deactivate(context);
100    }
101
102    @Override
103    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
104        if (EP_BATCH_HANDLER.equals(extensionPoint)) {
105            if (contribution instanceof BatchHandlerDescriptor) {
106                BatchHandlerDescriptor contributionDescriptor = (BatchHandlerDescriptor) contribution;
107                String name = contributionDescriptor.getName();
108                BatchHandler batchHandler = contributionDescriptor.newInstance();
109                batchHandler.initialize(name, contributionDescriptor.getProperties());
110                handlers.put(name, batchHandler);
111            } else {
112                throw new NuxeoException("Invalid class: " + contribution.getClass().getName());
113            }
114        } else {
115            throw new NuxeoException("Invalid extension point: " + extensionPoint);
116        }
117    }
118
119    @Override
120    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
121        if (EP_BATCH_HANDLER.equals(extensionPoint)) {
122            if (BatchHandlerDescriptor.class.isAssignableFrom(contribution.getClass())) {
123                BatchHandlerDescriptor contributionDescriptor = (BatchHandlerDescriptor) contribution;
124                String name = contributionDescriptor.getName();
125                handlers.remove(name);
126            }
127        }
128    }
129
130    @Override
131    public TransientStore getTransientStore() {
132        TransientStoreService tss = Framework.getService(TransientStoreService.class);
133        return tss.getStore(TRANSIENT_STORE_NAME);
134    }
135
136    @Override
137    public Set<String> getSupportedHandlers() {
138        return Collections.unmodifiableSet(handlers.keySet());
139    }
140
141    @Override
142    public BatchHandler getHandler(String handlerName) {
143        return handlers.get(handlerName);
144    }
145
146    @Override
147    public String initBatch() {
148        Batch batch = initBatchInternal(null);
149        return batch.getKey();
150    }
151
152    @Override
153    @Deprecated
154    public String initBatch(String batchId, String contextName) {
155        Batch batch = initBatchInternal(batchId);
156        return batch.getKey();
157    }
158
159    protected Batch initBatchInternal(String batchId) {
160        BatchHandler batchHandler = handlers.get(DEFAULT_BATCH_HANDLER);
161        return batchHandler.newBatch(batchId);
162    }
163
164    @Override
165    public Batch initBatch(String handlerName) {
166        if (isEmpty(handlerName)) {
167            handlerName = DEFAULT_BATCH_HANDLER;
168        }
169        BatchHandler batchHandler = handlers.get(handlerName);
170        if (batchHandler == null) {
171            throw new IllegalArgumentException("Batch handler does not exist: " + handlerName);
172        }
173        return batchHandler.newBatch(null);
174    }
175
176    @Override
177    public Batch getBatch(String batchId) {
178        return handlers.values()
179                       .stream()
180                       .map(batchHandler -> batchHandler.getBatch(batchId))
181                       .filter(Objects::nonNull)
182                       .findFirst()
183                       .orElse(null);
184    }
185
186    @Override
187    public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException {
188        Blob blob = Blobs.createBlob(is);
189        addBlob(batchId, index, blob, name, mime);
190    }
191
192    @Override
193    public void addBlob(String batchId, String index, Blob blob, String name, String mime) throws IOException {
194        uploadInProgress.incrementAndGet();
195        try {
196            Batch batch = getBatch(batchId);
197            if (batch == null) {
198                batch = initBatchInternal(batchId);
199            }
200            batch.addFile(index, blob, name, mime);
201            log.debug(String.format("Added file %s [%s] to batch %s", index, name, batch.getKey()));
202        } finally {
203            uploadInProgress.decrementAndGet();
204        }
205    }
206
207    @Override
208    public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name,
209            String mime, long fileSize) throws IOException {
210        Blob blob = Blobs.createBlob(is);
211        addBlob(batchId, index, blob, chunkCount, chunkIndex, name, mime, fileSize);
212    }
213
214    @Override
215    public void addBlob(String batchId, String index, Blob blob, int chunkCount, int chunkIndex, String name,
216            String mime, long fileSize) throws IOException {
217        uploadInProgress.incrementAndGet();
218        try {
219            Batch batch = getBatch(batchId);
220            if (batch == null) {
221                batch = initBatchInternal(batchId);
222            }
223            batch.addChunk(index, blob, chunkCount, chunkIndex, name, mime, fileSize);
224            log.debug(String.format("Added chunk %s to file %s [%s] in batch %s", chunkIndex, index, name,
225                    batch.getKey()));
226        } finally {
227            uploadInProgress.decrementAndGet();
228        }
229    }
230
231    @Override
232    public boolean hasBatch(String batchId) {
233        return handlers.values().stream().anyMatch(batchHandler -> batchHandler.getBatch(batchId) != null);
234    }
235
236    @Override
237    public List<Blob> getBlobs(String batchId) {
238        return getBlobs(batchId, 0);
239    }
240
241    @Override
242    public List<Blob> getBlobs(String batchId, int timeoutS) {
243        if (uploadInProgress.get() > 0 && timeoutS > 0) {
244            for (int i = 0; i < timeoutS * 5; i++) {
245                try {
246                    Thread.sleep(200);
247                } catch (InterruptedException e) {
248                    Thread.currentThread().interrupt();
249                }
250                if (uploadInProgress.get() == 0) {
251                    break;
252                }
253            }
254        }
255        Batch batch = getBatch(batchId);
256        if (batch == null) {
257            log.error("Unable to find batch with id " + batchId);
258            return Collections.emptyList();
259        }
260        return batch.getBlobs();
261    }
262
263    @Override
264    public Blob getBlob(String batchId, String fileIndex) {
265        return getBlob(batchId, fileIndex, 0);
266    }
267
268    @Override
269    public Blob getBlob(String batchId, String fileIndex, int timeoutS) {
270        Blob blob = getBatchBlob(batchId, fileIndex);
271        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
272            for (int i = 0; i < timeoutS * 5; i++) {
273                try {
274                    Thread.sleep(200);
275                } catch (InterruptedException e) {
276                    Thread.currentThread().interrupt();
277                }
278                blob = getBatchBlob(batchId, fileIndex);
279                if (blob != null) {
280                    break;
281                }
282            }
283        }
284        if (!hasBatch(batchId)) {
285            log.error("Unable to find batch with id " + batchId);
286            return null;
287        }
288        return blob;
289    }
290
291    protected Blob getBatchBlob(String batchId, String fileIndex) {
292        Blob blob = null;
293        Batch batch = getBatch(batchId);
294        if (batch != null) {
295            blob = batch.getBlob(fileIndex);
296        }
297        return blob;
298    }
299
300    @Override
301    public List<BatchFileEntry> getFileEntries(String batchId) {
302        Batch batch = getBatch(batchId);
303        if (batch == null) {
304            return null;
305        }
306        return batch.getFileEntries();
307    }
308
309    @Override
310    public BatchFileEntry getFileEntry(String batchId, String fileIndex) {
311        Batch batch = getBatch(batchId);
312        if (batch == null) {
313            return null;
314        }
315        return batch.getFileEntry(fileIndex);
316    }
317
318    @Override
319    public void clean(String batchId) {
320        Batch batch = getBatch(batchId);
321        if (batch != null) {
322            batch.clean();
323        }
324    }
325
326    @Override
327    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
328            Map<String, Object> contextParams, Map<String, Object> operationParams) {
329        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
330        if (blobs == null) {
331            String message = String.format("Unable to find batch associated with id '%s'", batchId);
332            log.error(message);
333            throw new NuxeoException(message);
334        }
335        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
336    }
337
338    @Override
339    public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session,
340            Map<String, Object> contextParams, Map<String, Object> operationParams) {
341        Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout());
342        if (blob == null) {
343            String message = String.format(
344                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId,
345                    fileIndex);
346            log.error(message);
347            throw new NuxeoException(message);
348        }
349        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
350    }
351
352    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
353            Map<String, Object> contextParams, Map<String, Object> operationParams) {
354        if (contextParams == null) {
355            contextParams = new HashMap<>();
356        }
357        if (operationParams == null) {
358            operationParams = new HashMap<>();
359        }
360
361        try (OperationContext ctx = new OperationContext(session)) {
362
363            AutomationServer server = Framework.getService(AutomationServer.class);
364            RestBinding binding = server.getOperationBinding(chainOrOperationId);
365
366            if (binding != null && binding.isAdministrator()) {
367                Principal principal = ctx.getPrincipal();
368                if (!(principal instanceof NuxeoPrincipal && ((NuxeoPrincipal) principal).isAdministrator())) {
369                    String message = "Not allowed. You must be administrator to use this operation";
370                    log.error(message);
371                    throw new WebSecurityException(message);
372                }
373            }
374
375            ctx.setInput(blobInput);
376            ctx.putAll(contextParams);
377
378            AutomationService as = Framework.getService(AutomationService.class);
379            // Drag and Drop action category is accessible from the chain sub context as chain parameters
380            return as.run(ctx, chainOrOperationId, operationParams);
381        } catch (OperationException e) {
382            log.error("Error while executing automation batch ", e);
383            throw new NuxeoException(e);
384        }
385    }
386
387    protected int getUploadWaitTimeout() {
388        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
389        try {
390            return Integer.parseInt(t);
391        } catch (NumberFormatException e) {
392            log.error("Wrong number format for upload wait timeout property", e);
393            return 5;
394        }
395    }
396
397    @Override
398    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
399            Map<String, Object> contextParams, Map<String, Object> operationParams) {
400        try {
401            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
402        } finally {
403            clean(batchId);
404        }
405    }
406
407    @Override
408    public boolean removeFileEntry(String batchId, String filedIdx) {
409        Batch batch = getBatch(batchId);
410        return batch != null && batch.removeFileEntry(filedIdx);
411    }
412}