001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *
020 */
021package org.nuxeo.ecm.automation.server.jaxrs.batch;
022
023import static org.apache.commons.lang3.StringUtils.isEmpty;
024
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Set;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.logging.log4j.LogManager;
036import org.apache.logging.log4j.Logger;
037import org.nuxeo.ecm.automation.AutomationService;
038import org.nuxeo.ecm.automation.OperationContext;
039import org.nuxeo.ecm.automation.OperationException;
040import org.nuxeo.ecm.automation.core.util.BlobList;
041import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
042import org.nuxeo.ecm.automation.server.AutomationServer;
043import org.nuxeo.ecm.automation.server.RestBinding;
044import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchHandlerDescriptor;
045import org.nuxeo.ecm.core.api.Blob;
046import org.nuxeo.ecm.core.api.Blobs;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.NuxeoException;
049import org.nuxeo.ecm.core.api.NuxeoPrincipal;
050import org.nuxeo.ecm.core.transientstore.api.TransientStore;
051import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
052import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException;
053import org.nuxeo.runtime.api.Framework;
054import org.nuxeo.runtime.model.ComponentContext;
055import org.nuxeo.runtime.model.DefaultComponent;
056
057/**
058 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
059 *
060 * @since 5.4.2
061 */
062public class BatchManagerComponent extends DefaultComponent implements BatchManager {
063
064    private static final Logger log = LogManager.getLogger(BatchManagerComponent.class);
065
066    public static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId";
067
068    /**
069     * The default batch handler name.
070     *
071     * @since 10.1
072     */
073    public static final String DEFAULT_BATCH_HANDLER = "default";
074
075    /** @since 10.1 */
076    public static final String XP_BATCH_HANDLER = "handlers";
077
078    protected Map<String, BatchHandler> handlers = new HashMap<>();
079
080    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
081
082    static {
083        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
084    }
085
086    @Override
087    public void start(ComponentContext context) {
088        super.start(context);
089        List<BatchHandlerDescriptor> descriptors = getDescriptors(XP_BATCH_HANDLER);
090        descriptors.forEach(d -> {
091            try {
092                BatchHandler handler = d.klass.getDeclaredConstructor().newInstance();
093                handler.initialize(d.name, d.properties);
094                handlers.put(d.name, handler);
095            } catch (ReflectiveOperationException e) {
096                log.error("Unable to instantiate batch handler", e);
097            }
098        });
099    }
100
101    @Override
102    public void stop(ComponentContext context) throws InterruptedException {
103        super.stop(context);
104        handlers.clear();
105    }
106
107    @Override
108    @Deprecated
109    public TransientStore getTransientStore() {
110        return getHandler(DEFAULT_BATCH_HANDLER).getTransientStore();
111    }
112
113    @Override
114    public Set<String> getSupportedHandlers() {
115        return Collections.unmodifiableSet(handlers.keySet());
116    }
117
118    @Override
119    public BatchHandler getHandler(String handlerName) {
120        return handlers.get(handlerName);
121    }
122
123    @Override
124    public String initBatch() {
125        Batch batch = initBatchInternal(null);
126        return batch.getKey();
127    }
128
129    @Override
130    @Deprecated
131    public String initBatch(String batchId, String contextName) {
132        Batch batch = initBatchInternal(batchId);
133        return batch.getKey();
134    }
135
136    protected Batch initBatchInternal(String batchId) {
137        BatchHandler batchHandler = handlers.get(DEFAULT_BATCH_HANDLER);
138        return batchHandler.newBatch(batchId);
139    }
140
141    @Override
142    public Batch initBatch(String handlerName) {
143        if (isEmpty(handlerName)) {
144            handlerName = DEFAULT_BATCH_HANDLER;
145        }
146        BatchHandler batchHandler = handlers.get(handlerName);
147        if (batchHandler == null) {
148            throw new IllegalArgumentException("Batch handler does not exist: " + handlerName);
149        }
150        return batchHandler.newBatch(null);
151    }
152
153    @Override
154    public Batch getBatch(String batchId) {
155        return handlers.values()
156                       .stream()
157                       .map(batchHandler -> batchHandler.getBatch(batchId))
158                       .filter(Objects::nonNull)
159                       .findFirst()
160                       .orElse(null);
161    }
162
163    @Override
164    public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException {
165        Blob blob = Blobs.createBlob(is);
166        addBlob(batchId, index, blob, name, mime);
167    }
168
169    @Override
170    public void addBlob(String batchId, String index, Blob blob, String name, String mime) throws IOException {
171        uploadInProgress.incrementAndGet();
172        try {
173            Batch batch = getBatch(batchId);
174            if (batch == null) {
175                batch = initBatchInternal(batchId);
176            }
177            batch.addFile(index, blob, name, mime);
178            log.debug("Added file {} [{}] to batch {}", index, name, batch.getKey());
179        } finally {
180            uploadInProgress.decrementAndGet();
181        }
182    }
183
184    @Override
185    public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name,
186            String mime, long fileSize) throws IOException {
187        Blob blob = Blobs.createBlob(is);
188        addBlob(batchId, index, blob, chunkCount, chunkIndex, name, mime, fileSize);
189    }
190
191    @Override
192    public void addBlob(String batchId, String index, Blob blob, int chunkCount, int chunkIndex, String name,
193            String mime, long fileSize) throws IOException {
194        uploadInProgress.incrementAndGet();
195        try {
196            Batch batch = getBatch(batchId);
197            if (batch == null) {
198                batch = initBatchInternal(batchId);
199            }
200            batch.addChunk(index, blob, chunkCount, chunkIndex, name, mime, fileSize);
201            log.debug("Added chunk {} to file {} [{}] in batch {}", chunkIndex, index, name, batch.getKey());
202        } finally {
203            uploadInProgress.decrementAndGet();
204        }
205    }
206
207    @Override
208    public boolean hasBatch(String batchId) {
209        return handlers.values().stream().anyMatch(batchHandler -> batchHandler.getBatch(batchId) != null);
210    }
211
212    @Override
213    public List<Blob> getBlobs(String batchId) {
214        return getBlobs(batchId, 0);
215    }
216
217    @Override
218    public List<Blob> getBlobs(String batchId, int timeoutS) {
219        if (uploadInProgress.get() > 0 && timeoutS > 0) {
220            for (int i = 0; i < timeoutS * 5; i++) {
221                try {
222                    Thread.sleep(200);
223                } catch (InterruptedException e) {
224                    Thread.currentThread().interrupt();
225                }
226                if (uploadInProgress.get() == 0) {
227                    break;
228                }
229            }
230        }
231        Batch batch = getBatch(batchId);
232        if (batch == null) {
233            log.error("Unable to find batch with id {}", batchId);
234            return Collections.emptyList();
235        }
236        return batch.getBlobs();
237    }
238
239    @Override
240    public Blob getBlob(String batchId, String fileIndex) {
241        return getBlob(batchId, fileIndex, 0);
242    }
243
244    @Override
245    public Blob getBlob(String batchId, String fileIndex, int timeoutS) {
246        Blob blob = getBatchBlob(batchId, fileIndex);
247        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
248            for (int i = 0; i < timeoutS * 5; i++) {
249                try {
250                    Thread.sleep(200);
251                } catch (InterruptedException e) {
252                    Thread.currentThread().interrupt();
253                }
254                blob = getBatchBlob(batchId, fileIndex);
255                if (blob != null) {
256                    break;
257                }
258            }
259        }
260        if (!hasBatch(batchId)) {
261            log.error("Unable to find batch with id {}", batchId);
262            return null;
263        }
264        return blob;
265    }
266
267    protected Blob getBatchBlob(String batchId, String fileIndex) {
268        Blob blob = null;
269        Batch batch = getBatch(batchId);
270        if (batch != null) {
271            blob = batch.getBlob(fileIndex);
272        }
273        return blob;
274    }
275
276    @Override
277    public List<BatchFileEntry> getFileEntries(String batchId) {
278        Batch batch = getBatch(batchId);
279        if (batch == null) {
280            return null;
281        }
282        return batch.getFileEntries();
283    }
284
285    @Override
286    public BatchFileEntry getFileEntry(String batchId, String fileIndex) {
287        Batch batch = getBatch(batchId);
288        if (batch == null) {
289            return null;
290        }
291        return batch.getFileEntry(fileIndex);
292    }
293
294    @Override
295    public void clean(String batchId) {
296        Batch batch = getBatch(batchId);
297        if (batch != null) {
298            batch.clean();
299        }
300    }
301
302    @Override
303    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
304            Map<String, Object> contextParams, Map<String, Object> operationParams) {
305        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
306        if (blobs == null) {
307            String message = String.format("Unable to find batch associated with id '%s'", batchId);
308            log.error(message);
309            throw new NuxeoException(message);
310        }
311        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
312    }
313
314    @Override
315    public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session,
316            Map<String, Object> contextParams, Map<String, Object> operationParams) {
317        Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout());
318        if (blob == null) {
319            String message = String.format(
320                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId,
321                    fileIndex);
322            log.error(message);
323            throw new NuxeoException(message);
324        }
325        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
326    }
327
328    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
329            Map<String, Object> contextParams, Map<String, Object> operationParams) {
330        if (contextParams == null) {
331            contextParams = new HashMap<>();
332        }
333        if (operationParams == null) {
334            operationParams = new HashMap<>();
335        }
336
337        try (OperationContext ctx = new OperationContext(session)) {
338
339            AutomationServer server = Framework.getService(AutomationServer.class);
340            RestBinding binding = server.getOperationBinding(chainOrOperationId);
341
342            if (binding != null && binding.isAdministrator) {
343                NuxeoPrincipal principal = ctx.getPrincipal();
344                if (!principal.isAdministrator()) {
345                    String message = "Not allowed. You must be administrator to use this operation";
346                    log.error(message);
347                    throw new WebSecurityException(message);
348                }
349            }
350
351            ctx.setInput(blobInput);
352            ctx.putAll(contextParams);
353
354            AutomationService as = Framework.getService(AutomationService.class);
355            // Drag and Drop action category is accessible from the chain sub context as chain parameters
356            return as.run(ctx, chainOrOperationId, operationParams);
357        } catch (OperationException e) {
358            log.error("Error while executing automation batch ", e);
359            throw new NuxeoException(e);
360        }
361    }
362
363    protected int getUploadWaitTimeout() {
364        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
365        try {
366            return Integer.parseInt(t);
367        } catch (NumberFormatException e) {
368            log.error("Wrong number format for upload wait timeout property", e);
369            return 5;
370        }
371    }
372
373    @Override
374    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
375            Map<String, Object> contextParams, Map<String, Object> operationParams) {
376        try {
377            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
378        } finally {
379            clean(batchId);
380        }
381    }
382
383    @Override
384    public boolean removeFileEntry(String batchId, String filedIdx) {
385        Batch batch = getBatch(batchId);
386        return batch != null && batch.removeFileEntry(filedIdx);
387    }
388}