001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 *
020 */
021package org.nuxeo.ecm.automation.server.jaxrs.batch;
022
023import static org.apache.commons.lang3.StringUtils.isEmpty;
024
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Set;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.logging.log4j.LogManager;
036import org.apache.logging.log4j.Logger;
037import org.nuxeo.ecm.automation.AutomationService;
038import org.nuxeo.ecm.automation.OperationContext;
039import org.nuxeo.ecm.automation.OperationException;
040import org.nuxeo.ecm.automation.core.util.BlobList;
041import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
042import org.nuxeo.ecm.automation.server.AutomationServer;
043import org.nuxeo.ecm.automation.server.RestBinding;
044import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchHandlerDescriptor;
045import org.nuxeo.ecm.core.api.Blob;
046import org.nuxeo.ecm.core.api.Blobs;
047import org.nuxeo.ecm.core.api.CoreSession;
048import org.nuxeo.ecm.core.api.NuxeoException;
049import org.nuxeo.ecm.core.api.NuxeoPrincipal;
050import org.nuxeo.ecm.core.transientstore.api.TransientStore;
051import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException;
052import org.nuxeo.runtime.api.Framework;
053import org.nuxeo.runtime.model.ComponentContext;
054import org.nuxeo.runtime.model.DefaultComponent;
055
056/**
057 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}.
058 *
059 * @since 5.4.2
060 */
061public class BatchManagerComponent extends DefaultComponent implements BatchManager {
062
063    private static final Logger log = LogManager.getLogger(BatchManagerComponent.class);
064
065    public static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId";
066
067    /**
068     * The default batch handler name.
069     *
070     * @since 10.1
071     */
072    public static final String DEFAULT_BATCH_HANDLER = "default";
073
074    /** @since 10.1 */
075    public static final String XP_BATCH_HANDLER = "handlers";
076
077    protected Map<String, BatchHandler> handlers = new HashMap<>();
078
079    protected final AtomicInteger uploadInProgress = new AtomicInteger(0);
080
081    static {
082        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder());
083    }
084
085    @Override
086    public void start(ComponentContext context) {
087        super.start(context);
088        List<BatchHandlerDescriptor> descriptors = getDescriptors(XP_BATCH_HANDLER);
089        descriptors.forEach(d -> {
090            try {
091                BatchHandler handler = d.klass.getDeclaredConstructor().newInstance();
092                handler.initialize(d.name, d.properties);
093                handlers.put(d.name, handler);
094            } catch (ReflectiveOperationException e) {
095                log.error("Unable to instantiate batch handler", e);
096            }
097        });
098    }
099
100    @Override
101    public void stop(ComponentContext context) throws InterruptedException {
102        super.stop(context);
103        handlers.clear();
104    }
105
106    @Override
107    @Deprecated
108    public TransientStore getTransientStore() {
109        return getHandler(DEFAULT_BATCH_HANDLER).getTransientStore();
110    }
111
112    @Override
113    public Set<String> getSupportedHandlers() {
114        return Collections.unmodifiableSet(handlers.keySet());
115    }
116
117    @Override
118    public BatchHandler getHandler(String handlerName) {
119        return handlers.get(handlerName);
120    }
121
122    @Override
123    public String initBatch() {
124        Batch batch = initBatchInternal(null);
125        return batch.getKey();
126    }
127
128    @Override
129    @Deprecated
130    public String initBatch(String batchId, String contextName) {
131        Batch batch = initBatchInternal(batchId);
132        return batch.getKey();
133    }
134
135    protected Batch initBatchInternal(String batchId) {
136        BatchHandler batchHandler = handlers.get(DEFAULT_BATCH_HANDLER);
137        return batchHandler.newBatch(batchId);
138    }
139
140    @Override
141    public Batch initBatch(String handlerName) {
142        if (isEmpty(handlerName)) {
143            handlerName = DEFAULT_BATCH_HANDLER;
144        }
145        BatchHandler batchHandler = handlers.get(handlerName);
146        if (batchHandler == null) {
147            throw new IllegalArgumentException("Batch handler does not exist: " + handlerName);
148        }
149        return batchHandler.newBatch(null);
150    }
151
152    @Override
153    public Batch getBatch(String batchId) {
154        return handlers.values()
155                       .stream()
156                       .map(batchHandler -> batchHandler.getBatch(batchId))
157                       .filter(Objects::nonNull)
158                       .findFirst()
159                       .orElse(null);
160    }
161
162    @Override
163    public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException {
164        Blob blob = Blobs.createBlob(is);
165        addBlob(batchId, index, blob, name, mime);
166    }
167
168    @Override
169    public void addBlob(String batchId, String index, Blob blob, String name, String mime) throws IOException {
170        uploadInProgress.incrementAndGet();
171        try {
172            Batch batch = getBatch(batchId);
173            if (batch == null) {
174                batch = initBatchInternal(batchId);
175            }
176            batch.addFile(index, blob, name, mime);
177            log.debug("Added file {} [{}] to batch {}", index, name, batch.getKey());
178        } finally {
179            uploadInProgress.decrementAndGet();
180        }
181    }
182
183    @Override
184    public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name,
185            String mime, long fileSize) throws IOException {
186        Blob blob = Blobs.createBlob(is);
187        addBlob(batchId, index, blob, chunkCount, chunkIndex, name, mime, fileSize);
188    }
189
190    @Override
191    public void addBlob(String batchId, String index, Blob blob, int chunkCount, int chunkIndex, String name,
192            String mime, long fileSize) throws IOException {
193        uploadInProgress.incrementAndGet();
194        try {
195            Batch batch = getBatch(batchId);
196            if (batch == null) {
197                batch = initBatchInternal(batchId);
198            }
199            batch.addChunk(index, blob, chunkCount, chunkIndex, name, mime, fileSize);
200            log.debug("Added chunk {} to file {} [{}] in batch {}", chunkIndex, index, name, batch.getKey());
201        } finally {
202            uploadInProgress.decrementAndGet();
203        }
204    }
205
206    @Override
207    public boolean hasBatch(String batchId) {
208        return handlers.values().stream().anyMatch(batchHandler -> batchHandler.getBatch(batchId) != null);
209    }
210
211    @Override
212    public List<Blob> getBlobs(String batchId) {
213        return getBlobs(batchId, 0);
214    }
215
216    @Override
217    public List<Blob> getBlobs(String batchId, int timeoutS) {
218        if (uploadInProgress.get() > 0 && timeoutS > 0) {
219            for (int i = 0; i < timeoutS * 5; i++) {
220                try {
221                    Thread.sleep(200);
222                } catch (InterruptedException e) {
223                    Thread.currentThread().interrupt();
224                }
225                if (uploadInProgress.get() == 0) {
226                    break;
227                }
228            }
229        }
230        Batch batch = getBatch(batchId);
231        if (batch == null) {
232            log.error("Unable to find batch with id {}", batchId);
233            return Collections.emptyList();
234        }
235        return batch.getBlobs();
236    }
237
238    @Override
239    public Blob getBlob(String batchId, String fileIndex) {
240        return getBlob(batchId, fileIndex, 0);
241    }
242
243    @Override
244    public Blob getBlob(String batchId, String fileIndex, int timeoutS) {
245        Blob blob = getBatchBlob(batchId, fileIndex);
246        if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) {
247            for (int i = 0; i < timeoutS * 5; i++) {
248                try {
249                    Thread.sleep(200);
250                } catch (InterruptedException e) {
251                    Thread.currentThread().interrupt();
252                }
253                blob = getBatchBlob(batchId, fileIndex);
254                if (blob != null) {
255                    break;
256                }
257            }
258        }
259        if (!hasBatch(batchId)) {
260            log.error("Unable to find batch with id {}", batchId);
261            return null;
262        }
263        return blob;
264    }
265
266    protected Blob getBatchBlob(String batchId, String fileIndex) {
267        Blob blob = null;
268        Batch batch = getBatch(batchId);
269        if (batch != null) {
270            blob = batch.getBlob(fileIndex);
271        }
272        return blob;
273    }
274
275    @Override
276    public List<BatchFileEntry> getFileEntries(String batchId) {
277        Batch batch = getBatch(batchId);
278        if (batch == null) {
279            return null;
280        }
281        return batch.getFileEntries();
282    }
283
284    @Override
285    public BatchFileEntry getFileEntry(String batchId, String fileIndex) {
286        Batch batch = getBatch(batchId);
287        if (batch == null) {
288            return null;
289        }
290        return batch.getFileEntry(fileIndex);
291    }
292
293    @Override
294    public void clean(String batchId) {
295        Batch batch = getBatch(batchId);
296        if (batch != null) {
297            batch.clean();
298        }
299    }
300
301    @Override
302    public Object execute(String batchId, String chainOrOperationId, CoreSession session,
303            Map<String, Object> contextParams, Map<String, Object> operationParams) {
304        List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout());
305        if (blobs == null) {
306            String message = String.format("Unable to find batch associated with id '%s'", batchId);
307            log.error(message);
308            throw new NuxeoException(message);
309        }
310        return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams);
311    }
312
313    @Override
314    public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session,
315            Map<String, Object> contextParams, Map<String, Object> operationParams) {
316        Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout());
317        if (blob == null) {
318            String message = String.format(
319                    "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId,
320                    fileIndex);
321            log.error(message);
322            throw new NuxeoException(message);
323        }
324        return execute(blob, chainOrOperationId, session, contextParams, operationParams);
325    }
326
327    protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session,
328            Map<String, Object> contextParams, Map<String, Object> operationParams) {
329        if (contextParams == null) {
330            contextParams = new HashMap<>();
331        }
332        if (operationParams == null) {
333            operationParams = new HashMap<>();
334        }
335
336        try (OperationContext ctx = new OperationContext(session)) {
337
338            AutomationServer server = Framework.getService(AutomationServer.class);
339            RestBinding binding = server.getOperationBinding(chainOrOperationId);
340
341            if (binding != null && binding.isAdministrator) {
342                NuxeoPrincipal principal = ctx.getPrincipal();
343                if (!principal.isAdministrator()) {
344                    String message = "Not allowed. You must be administrator to use this operation";
345                    log.error(message);
346                    throw new WebSecurityException(message);
347                }
348            }
349
350            ctx.setInput(blobInput);
351            ctx.putAll(contextParams);
352
353            AutomationService as = Framework.getService(AutomationService.class);
354            // Drag and Drop action category is accessible from the chain sub context as chain parameters
355            return as.run(ctx, chainOrOperationId, operationParams);
356        } catch (OperationException e) {
357            log.error("Error while executing automation batch ", e);
358            throw new NuxeoException(e);
359        }
360    }
361
362    protected int getUploadWaitTimeout() {
363        String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5");
364        try {
365            return Integer.parseInt(t);
366        } catch (NumberFormatException e) {
367            log.error("Wrong number format for upload wait timeout property", e);
368            return 5;
369        }
370    }
371
372    @Override
373    public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session,
374            Map<String, Object> contextParams, Map<String, Object> operationParams) {
375        try {
376            return execute(batchId, chainOrOperationId, session, contextParams, operationParams);
377        } finally {
378            clean(batchId);
379        }
380    }
381
382    @Override
383    public boolean removeFileEntry(String batchId, String filedIdx) {
384        Batch batch = getBatch(batchId);
385        return batch != null && batch.removeFileEntry(filedIdx);
386    }
387}