001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nelson Silva <nsilva@nuxeo.com>
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.adapters;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.UUID;
030
031import javax.mail.MessagingException;
032import javax.servlet.http.HttpServletRequest;
033import javax.servlet.http.HttpServletResponse;
034import javax.ws.rs.DELETE;
035import javax.ws.rs.GET;
036import javax.ws.rs.POST;
037import javax.ws.rs.Path;
038import javax.ws.rs.PathParam;
039import javax.ws.rs.Produces;
040import javax.ws.rs.core.Context;
041import javax.ws.rs.core.MediaType;
042import javax.ws.rs.core.Response;
043
044import org.apache.commons.collections.CollectionUtils;
045import org.apache.logging.log4j.LogManager;
046import org.apache.logging.log4j.Logger;
047import org.nuxeo.ecm.automation.AutomationService;
048import org.nuxeo.ecm.automation.OperationCallback;
049import org.nuxeo.ecm.automation.OperationContext;
050import org.nuxeo.ecm.automation.OperationException;
051import org.nuxeo.ecm.automation.OperationType;
052import org.nuxeo.ecm.automation.core.impl.InvokableMethod;
053import org.nuxeo.ecm.automation.core.util.BlobList;
054import org.nuxeo.ecm.automation.jaxrs.io.operations.ExecutionRequest;
055import org.nuxeo.ecm.automation.server.AutomationServer;
056import org.nuxeo.ecm.automation.server.jaxrs.OperationResource;
057import org.nuxeo.ecm.automation.server.jaxrs.ResponseHelper;
058import org.nuxeo.ecm.core.api.AsyncService;
059import org.nuxeo.ecm.core.api.AsyncStatus;
060import org.nuxeo.ecm.core.api.Blob;
061import org.nuxeo.ecm.core.api.CoreInstance;
062import org.nuxeo.ecm.core.api.CoreSession;
063import org.nuxeo.ecm.core.api.DocumentModel;
064import org.nuxeo.ecm.core.api.DocumentModelList;
065import org.nuxeo.ecm.core.api.NuxeoException;
066import org.nuxeo.ecm.core.api.NuxeoPrincipal;
067import org.nuxeo.ecm.core.transientstore.api.TransientStore;
068import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
069import org.nuxeo.ecm.platform.web.common.exceptionhandling.ExceptionHelper;
070import org.nuxeo.ecm.platform.web.common.vh.VirtualHostHelper;
071import org.nuxeo.ecm.webengine.model.WebAdapter;
072import org.nuxeo.ecm.webengine.model.exceptions.WebResourceNotFoundException;
073import org.nuxeo.ecm.webengine.model.impl.DefaultAdapter;
074import org.nuxeo.runtime.api.Framework;
075import org.nuxeo.runtime.api.login.LoginComponent;
076import org.nuxeo.runtime.transaction.TransactionHelper;
077
078import com.fasterxml.jackson.databind.ObjectMapper;
079
080import io.opencensus.common.Scope;
081import io.opencensus.trace.Link;
082import io.opencensus.trace.Span;
083import io.opencensus.trace.SpanContext;
084import io.opencensus.trace.Tracing;
085
086/**
087 * Adapter that allows asynchronous execution of operations.
088 *
089 * @since 10.3
090 */
091@WebAdapter(name = AsyncOperationAdapter.NAME, type = "AsyncOperationAdapter", targetType = "operation")
092@Produces({ MediaType.APPLICATION_JSON })
093public class AsyncOperationAdapter extends DefaultAdapter {
094
095    public static final String NAME = "async";
096
097    private static final Logger log = LogManager.getLogger(AsyncOperationAdapter.class);
098
099    protected static final String STATUS_STORE_NAME = "automation";
100
101    protected static final String TRANSIENT_STORE_SERVICE = "service";
102
103    protected static final String TRANSIENT_STORE_TASK_ID = "taskId";
104
105    protected static final String TRANSIENT_STORE_ERROR = "error";
106
107    protected static final String TRANSIENT_STORE_OUTPUT = "output";
108
109    protected static final String TRANSIENT_STORE_OUTPUT_BLOB = "blob";
110
111    protected static final String STATUS_PATH= "status";
112
113    protected static final String RUNNING_STATUS= "RUNNING";
114
115    protected static final String RESULT_URL_KEY= "url";
116
117    protected static final ObjectMapper MAPPER = new ObjectMapper();
118
119    @Context
120    protected AutomationService service;
121
122    @Context
123    protected HttpServletRequest request;
124
125    @Context
126    protected HttpServletResponse response;
127
128    @Context
129    protected CoreSession session;
130
131    @Context
132    protected AutomationServer srv;
133
134    @SuppressWarnings("resource") // ExecutionRequest's OperationContext not owned by us, don't close it
135    @POST
136    public Object doPost(ExecutionRequest xreq) {
137        OperationResource op = (OperationResource) getTarget();
138        String opId = op.getId();
139
140        if (!srv.accept(opId, op.isChain(), request)) {
141            return ResponseHelper.notFound();
142        }
143        String executionId = UUID.randomUUID().toString();
144
145        // session will be set in the task thread
146        // ExecutionRequest's OperationContext not owned by us, don't close it
147        OperationContext opCtx = xreq.createContext(request, response, null); // NOSONAR
148
149        opCtx.setCallback(new OperationCallback() {
150
151            private Object output;
152
153            @Override
154            public void onChainEnter(OperationType chain) {
155                //
156            }
157
158            @Override
159            public void onChainExit() {
160                setOutput(executionId, (Serializable) output);
161                setCompleted(executionId);
162            }
163
164            @Override
165            public void onOperationEnter(OperationContext context, OperationType type, InvokableMethod method,
166                    Map<String, Object> params) {
167                enterMethod(executionId, method);
168            }
169
170            @Override
171            public void onOperationExit(Object output) {
172                this.output = output;
173            }
174
175            @Override
176            public OperationException onError(OperationException error) {
177                setError(executionId, error);
178                return error;
179            }
180
181        });
182
183        String repoName = session.getRepositoryName();
184        NuxeoPrincipal principal = session.getPrincipal();
185
186        // TODO NXP-26303: use thread pool
187        SpanContext traceContext = Tracing.getTracer().getCurrentSpan().getContext();
188        new Thread(() -> {
189            Span span = Tracing.getTracer().spanBuilderWithRemoteParent("automation/" + opId + "/async", traceContext).startSpan();
190            span.addLink(Link.fromSpanContext(traceContext, Link.Type.PARENT_LINKED_SPAN));
191            try (Scope scope = Tracing.getTracer().withSpan(span)) {
192                TransactionHelper.runInTransaction(() -> {
193                    LoginComponent.pushPrincipal(principal);
194                    try {
195                        CoreSession s = CoreInstance.getCoreSession(repoName, principal);
196                        opCtx.setCoreSession(s);
197                        service.run(opCtx, opId, xreq.getParams());
198                    } catch (OperationException e) {
199                        setError(executionId, e);
200                    } finally {
201                        LoginComponent.popPrincipal();
202                    }
203                 });
204             }
205        }, String.format("Nuxeo-AsyncOperation-%s", executionId)).start();
206
207        try {
208            String statusURL = String.format("%s%s/%s/%s", ctx.getServerURL(), getPath(), executionId, STATUS_PATH);
209            return Response.status(HttpServletResponse.SC_ACCEPTED).location(new URI(statusURL)).build();
210        } catch (URISyntaxException e) {
211            throw new NuxeoException(e);
212        }
213    }
214
215    @GET
216    @Path("{executionId}/status")
217    public Object status(@PathParam("executionId") String executionId) throws IOException, MessagingException {
218        String error = getError(executionId);
219        if (error != null) {
220            throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
221        }
222        if (isCompleted(executionId)) {
223            String resURL = String.format("%s/%s", getPath(), executionId);
224            return redirect(resURL);
225        } else {
226            if (isAsync(executionId)) {
227                Serializable taskId = getTaskId(executionId);
228                Object result = getAsyncService(executionId).getStatus(taskId);
229                return ResponseHelper.getResponse(result, request, HttpServletResponse.SC_OK);
230            } else {
231                return Response.status(HttpServletResponse.SC_OK)
232                               .entity(MAPPER.writeValueAsString(RUNNING_STATUS))
233                               .build();
234            }
235
236        }
237    }
238
239    @GET
240    @Path("{executionId}")
241    public Object result(@PathParam("executionId") String executionId) throws IOException, MessagingException {
242
243        if (isCompleted(executionId)) {
244            Object output = getResult(executionId);
245
246            String error = getError(executionId);
247
248            // cleanup after result is accessed
249            cleanup(executionId);
250
251            if (error != null) {
252                throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
253            }
254
255            // if output is a map let's return as json
256            if (output instanceof Map) {
257                Map map = (Map) output;
258                // if output has a "url" key make it absolute
259                Object url = ((Map<?, ?>) output).get(RESULT_URL_KEY);
260                if (url instanceof String) {
261                    map = new HashMap(map);
262                    try {
263                        boolean isAbsolute = new URI((String) url).isAbsolute();
264                        String baseUrl = VirtualHostHelper.getBaseURL(ctx.getRequest());
265                        map.put(RESULT_URL_KEY, isAbsolute ? url : baseUrl + url);
266                    } catch (URISyntaxException e) {
267                        log.error("Failed to parse result url {}", url);
268                    }
269                }
270                return Response.status(HttpServletResponse.SC_OK).entity(MAPPER.writeValueAsString(map)).build();
271            }
272
273            return ResponseHelper.getResponse(output, request, HttpServletResponse.SC_OK);
274        }
275
276        throw new WebResourceNotFoundException("Execution with id=" + executionId + " not found");
277    }
278
279    @DELETE
280    @Path("{executionId}")
281    public Object abort(@PathParam("executionId") String executionId) throws IOException, MessagingException {
282        if (exists(executionId) && !isCompleted(executionId)) {
283            // TODO NXP-26304: support aborting any execution
284            if (isAsync(executionId)) {
285                Serializable taskId = getTaskId(executionId);
286                return getAsyncService(executionId).abort(taskId);
287            }
288            return ResponseHelper.getResponse(RUNNING_STATUS, request, HttpServletResponse.SC_OK);
289        }
290        throw new WebResourceNotFoundException("Execution with id=" + executionId + " has completed");
291    }
292
293    protected TransientStore getTransientStore() {
294        return Framework.getService(TransientStoreService.class).getStore(STATUS_STORE_NAME);
295    }
296
297    protected void enterMethod(String executionId, InvokableMethod method) {
298        // reset parameters
299        getTransientStore().remove(executionId);
300
301        // AsyncService.class is default => not async
302        if (!AsyncService.class.equals(method.getAsyncService())) {
303            getTransientStore().putParameter(executionId, TRANSIENT_STORE_SERVICE, method.getAsyncService().getName());
304        }
305    }
306
307    protected void setError(String executionId, Throwable t) {
308        setError(executionId, ExceptionHelper.unwrapException(t).getMessage());
309    }
310
311    /**
312     * @deprecated since 11.1, because unused
313     */
314    @Deprecated(since = "11.1")
315    protected void setError(String executionId, String error) {
316        getTransientStore().putParameter(executionId, TRANSIENT_STORE_ERROR, error);
317        setCompleted(executionId);
318    }
319
320    public String getError(String executionId) {
321        return (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_ERROR);
322    }
323
324    protected void setOutput(String executionId, Serializable output) {
325        TransientStore ts = getTransientStore();
326        // store only taskId for async tasks
327        if (isAsync(executionId)) {
328            Serializable taskId = output instanceof AsyncStatus ? ((AsyncStatus<?>) output).getId() : output;
329            ts.putParameter(executionId, TRANSIENT_STORE_TASK_ID, taskId);
330        } else {
331            if (output instanceof DocumentModel) {
332                detach((DocumentModel) output);
333            } else if (output instanceof DocumentModelList) {
334                ((DocumentModelList) output).forEach(this::detach);
335            }
336            if (output instanceof Blob) {
337                ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, true);
338                ts.putBlobs(executionId, Collections.singletonList((Blob) output));
339            } else if (output instanceof BlobList) {
340                ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, false);
341                ts.putBlobs(executionId, (BlobList) output);
342            } else {
343                ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT, output);
344            }
345        }
346    }
347
348    protected Object getResult(String executionId) {
349        TransientStore ts = getTransientStore();
350
351        if (isAsync(executionId)) {
352            AsyncService<Serializable, ?, ?> service = getAsyncService(executionId);
353            if (service != null) {
354                Serializable taskId = ts.getParameter(executionId, TRANSIENT_STORE_TASK_ID);
355                return service.getResult(taskId);
356            }
357        }
358
359        Object output;
360        List<Blob> blobs = ts.getBlobs(executionId);
361        if (CollectionUtils.isNotEmpty(blobs)) {
362            boolean isSingle = (boolean) ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB);
363            output = isSingle ? blobs.get(0) : new BlobList(blobs);
364        } else {
365            output = ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT);
366        }
367        if (output instanceof DocumentModel) {
368            attach((DocumentModel) output);
369        } else if (output instanceof DocumentModelList) {
370            ((DocumentModelList) output).forEach(this::attach);
371        }
372        return output;
373    }
374
375    protected void attach(DocumentModel doc) {
376        doc.attach(ctx.getCoreSession());
377    }
378
379    protected void detach(DocumentModel doc) {
380        doc.detach(false);
381    }
382
383    protected boolean isAsync(String executionId) {
384        return getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE) != null;
385    }
386
387    protected Serializable getTaskId(String executionId) {
388        return getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID);
389    }
390
391    protected AsyncService<Serializable, ?, ?> getAsyncService(String executionId) {
392        String serviceClass = (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE);
393        try {
394            @SuppressWarnings("unchecked")
395            AsyncService<Serializable, ?, ?> asyncService = (AsyncService<Serializable, ?, ?>) Framework.getService(Class.forName(serviceClass));
396            return asyncService;
397        } catch (ClassNotFoundException e) {
398            log.error("AsyncService class {} not found", serviceClass);
399            return null;
400        }
401    }
402
403    protected void setCompleted(String executionId) {
404        getTransientStore().setCompleted(executionId, true);
405    }
406
407    protected boolean isCompleted(String executionId) {
408        if (isAsync(executionId)) {
409            Serializable taskId = getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID);
410            return getAsyncService(executionId).getStatus(taskId).isCompleted();
411        }
412        return getTransientStore().isCompleted(executionId);
413    }
414
415    protected boolean exists(String executionId) {
416        return getTransientStore().exists(executionId);
417    }
418
419    protected void cleanup(String executionId) {
420        getTransientStore().release(executionId);
421    }
422}