001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nelson Silva <nsilva@nuxeo.com>
018 */
019package org.nuxeo.ecm.automation.server.jaxrs.adapters;
020
021import java.io.Serializable;
022import java.net.URI;
023import javax.mail.MessagingException;
024import javax.servlet.http.HttpServletRequest;
025import javax.servlet.http.HttpServletResponse;
026import javax.ws.rs.DELETE;
027import javax.ws.rs.GET;
028import javax.ws.rs.POST;
029import javax.ws.rs.Path;
030import javax.ws.rs.PathParam;
031import javax.ws.rs.Produces;
032
033import javax.ws.rs.core.Context;
034import javax.ws.rs.core.MediaType;
035import javax.ws.rs.core.Response;
036
037import org.apache.commons.collections.CollectionUtils;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.logging.log4j.LogManager;
040import org.apache.logging.log4j.Logger;
041import org.nuxeo.ecm.automation.AutomationService;
042import org.nuxeo.ecm.automation.OperationCallback;
043import org.nuxeo.ecm.automation.OperationContext;
044import org.nuxeo.ecm.automation.OperationException;
045import org.nuxeo.ecm.automation.OperationType;
046import org.nuxeo.ecm.automation.core.impl.InvokableMethod;
047import org.nuxeo.ecm.automation.core.util.BlobList;
048import org.nuxeo.ecm.automation.jaxrs.io.operations.ExecutionRequest;
049import org.nuxeo.ecm.automation.server.AutomationServer;
050import org.nuxeo.ecm.automation.server.jaxrs.OperationResource;
051import org.nuxeo.ecm.automation.server.jaxrs.ResponseHelper;
052import org.nuxeo.ecm.core.api.AsyncService;
053import org.nuxeo.ecm.core.api.AsyncStatus;
054import org.nuxeo.ecm.core.api.Blob;
055import org.nuxeo.ecm.core.api.CloseableCoreSession;
056import org.nuxeo.ecm.core.api.CoreInstance;
057import org.nuxeo.ecm.core.api.CoreSession;
058import org.nuxeo.ecm.core.api.DocumentModel;
059import org.nuxeo.ecm.core.api.DocumentModelList;
060import org.nuxeo.ecm.core.api.NuxeoException;
061import org.nuxeo.ecm.core.api.NuxeoPrincipal;
062import org.nuxeo.ecm.core.transientstore.api.TransientStore;
063import org.nuxeo.ecm.core.transientstore.api.TransientStoreService;
064import org.nuxeo.ecm.platform.web.common.vh.VirtualHostHelper;
065import org.nuxeo.ecm.webengine.model.WebAdapter;
066import org.nuxeo.ecm.webengine.model.exceptions.WebResourceNotFoundException;
067import org.nuxeo.ecm.webengine.model.impl.DefaultAdapter;
068import org.nuxeo.runtime.api.Framework;
069import org.nuxeo.runtime.transaction.TransactionHelper;
070
071import java.io.IOException;
072import java.net.URISyntaxException;
073import java.util.Collections;
074import java.util.HashMap;
075import java.util.List;
076import java.util.Map;
077import java.util.UUID;
078
079/**
080 * Adapter that allows asynchronous execution of operations.
081 *
082 * @since 10.3
083 */
084@WebAdapter(name = AsyncOperationAdapter.NAME, type = "AsyncOperationAdapter", targetType = "operation")
085@Produces({ MediaType.APPLICATION_JSON })
086public class AsyncOperationAdapter extends DefaultAdapter {
087
088    public static final String NAME = "async";
089
090    private static final Logger log = LogManager.getLogger(AsyncOperationAdapter.class);
091
092    protected static final String STATUS_STORE_NAME = "automation";
093
094    protected static final String TRANSIENT_STORE_SERVICE = "service";
095
096    protected static final String TRANSIENT_STORE_TASK_ID = "taskId";
097
098    protected static final String TRANSIENT_STORE_ERROR = "error";
099
100    protected static final String TRANSIENT_STORE_OUTPUT = "output";
101
102    protected static final String TRANSIENT_STORE_OUTPUT_BLOB = "blob";
103
104    protected static final String STATUS_PATH= "status";
105
106    protected static final String RUNNING_STATUS= "RUNNING";
107
108    protected static final String RESULT_URL_KEY= "url";
109
110    @Context
111    protected AutomationService service;
112
113    @Context
114    protected HttpServletRequest request;
115
116    @Context
117    protected HttpServletResponse response;
118
119    @Context
120    protected CoreSession session;
121
122    @Context
123    protected AutomationServer srv;
124
125    @POST
126    public Object doPost(ExecutionRequest xreq) {
127        OperationResource op = (OperationResource) getTarget();
128        String opId = op.getId();
129
130        if (!srv.accept(opId, op.isChain(), request)) {
131            return ResponseHelper.notFound();
132        }
133        String executionId = UUID.randomUUID().toString();
134
135        // session will be set in the task thread
136        OperationContext opCtx = xreq.createContext(request, response, null);
137
138        opCtx.setCallback(new OperationCallback() {
139
140            @Override
141            public void onChainEnter(OperationType chain) {
142                //
143            }
144
145            @Override
146            public void onChainExit() {
147                setCompleted(executionId);
148            }
149
150            @Override
151            public void onOperationEnter(OperationContext context, OperationType type, InvokableMethod method,
152                    Map<String, Object> params) {
153                enterMethod(executionId, method);
154            }
155
156            @Override
157            public void onOperationExit(Object output) {
158                setOutput(executionId, (Serializable) output);
159            }
160
161            @Override
162            public OperationException onError(OperationException error) {
163                setError(executionId, error.getMessage());
164                return error;
165            }
166
167        });
168
169        String repoName = session.getRepositoryName();
170        NuxeoPrincipal principal = session.getPrincipal();
171
172        // TODO NXP-26303: use thread pool
173        new Thread(() -> {
174            TransactionHelper.runInTransaction(() -> {
175                try (CloseableCoreSession session = CoreInstance.openCoreSession(repoName, principal)){
176                    opCtx.setCoreSession(session);
177                    service.run(opCtx, opId, xreq.getParams());
178                } catch (OperationException e) {
179                    setError(executionId, e.getMessage());
180                }
181            });
182        }, String.format("Nuxeo-AsyncOperation-%s", executionId)).start();
183
184        try {
185            String statusURL = String.format("%s%s/%s/%s", ctx.getServerURL(), getPath(), executionId, STATUS_PATH);
186            return Response.status(HttpServletResponse.SC_ACCEPTED).location(new URI(statusURL)).build();
187        } catch (URISyntaxException e) {
188            throw new NuxeoException(e);
189        }
190    }
191
192    @GET
193    @Path("{executionId}/status")
194    public Object status(@PathParam("executionId") String executionId) throws IOException, MessagingException {
195        if (isCompleted(executionId)) {
196            String error = getError(executionId);
197            if (error != null) {
198                throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
199            }
200            String resURL = String.format("%s/%s", getPath(), executionId);
201            return redirect(resURL);
202        } else {
203            Object result = RUNNING_STATUS;
204            if (isAsync(executionId)) {
205                Serializable taskId = getTaskId(executionId);
206                result = getAsyncService(executionId).getStatus(taskId);
207            }
208            return ResponseHelper.getResponse(result, request, HttpServletResponse.SC_OK);
209        }
210    }
211
212    @GET
213    @Path("{executionId}")
214    public Object result(@PathParam("executionId") String executionId) throws IOException, MessagingException {
215
216        if (isCompleted(executionId)) {
217            Object output = getResult(executionId);
218
219            String error = getError(executionId);
220
221            // cleanup after result is accessed
222            cleanup(executionId);
223
224            if (error != null) {
225                throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
226            }
227
228            // if output has a "url" key assume it's a redirect url
229            if (output instanceof Map) {
230                Object url = ((Map<?, ?>) output).get(RESULT_URL_KEY);
231                if (url instanceof String) {
232                    String baseUrl = VirtualHostHelper.getBaseURL(ctx.getRequest());
233                    return redirect(baseUrl + url);
234                }
235            }
236            return ResponseHelper.getResponse(output, request, HttpServletResponse.SC_OK);
237        }
238
239        throw new WebResourceNotFoundException("Execution with id=" + executionId + " not found");
240    }
241
242    @DELETE
243    @Path("{executionId}")
244    public Object abort(@PathParam("executionId") String executionId) throws IOException, MessagingException {
245        if (exists(executionId) && !isCompleted(executionId)) {
246            // TODO NXP-26304: support aborting any execution
247            if (isAsync(executionId)) {
248                Serializable taskId = getTaskId(executionId);
249                return getAsyncService(executionId).abort(taskId);
250            }
251            return ResponseHelper.getResponse(RUNNING_STATUS, request, HttpServletResponse.SC_OK);
252        }
253        throw new WebResourceNotFoundException("Execution with id=" + executionId + " has completed");
254    }
255
256    protected TransientStore getTransientStore() {
257        return Framework.getService(TransientStoreService.class).getStore(STATUS_STORE_NAME);
258    }
259
260    protected void enterMethod(String executionId, InvokableMethod method) {
261        // reset parameters
262        getTransientStore().remove(executionId);
263
264        // AsyncService.class is default => not async
265        if (!AsyncService.class.equals(method.getAsyncService())) {
266            getTransientStore().putParameter(executionId, TRANSIENT_STORE_SERVICE, method.getAsyncService().getName());
267        }
268    }
269
270    protected void setError(String executionId, String error) {
271        getTransientStore().putParameter(executionId, TRANSIENT_STORE_ERROR, error);
272        setCompleted(executionId);
273    }
274
275    public String getError(String executionId) {
276        return (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_ERROR);
277    }
278
279    protected void setOutput(String executionId, Serializable output) {
280        TransientStore ts = getTransientStore();
281        // store only taskId for async tasks
282        if (isAsync(executionId)) {
283            Serializable taskId = output instanceof AsyncStatus ? ((AsyncStatus) output).getId() : output;
284            ts.putParameter(executionId, TRANSIENT_STORE_TASK_ID, taskId);
285        } else {
286            if (output instanceof DocumentModel) {
287                detach((DocumentModel) output);
288            } else if (output instanceof DocumentModelList) {
289                ((DocumentModelList) output).forEach(this::detach);
290            }
291            if (output instanceof Blob) {
292                ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, true);
293                ts.putBlobs(executionId, Collections.singletonList((Blob) output));
294            } else if (output instanceof BlobList) {
295                ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, false);
296                ts.putBlobs(executionId, (BlobList) output);
297            } else {
298                ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT, output);
299            }
300        }
301    }
302
303    protected Object getResult(String executionId) {
304        TransientStore ts = getTransientStore();
305
306        if (isAsync(executionId)) {
307            AsyncService service = getAsyncService(executionId);
308            if (service != null) {
309                Serializable taskId = ts.getParameter(executionId, TRANSIENT_STORE_TASK_ID);
310                return service.getResult(taskId);
311            }
312        }
313
314        Object output;
315        List<Blob> blobs = ts.getBlobs(executionId);
316        if (CollectionUtils.isNotEmpty(blobs)) {
317            boolean isSingle = (boolean) ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB);
318            output = isSingle ? blobs.get(0) : new BlobList(blobs);
319        } else {
320            output = ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT);
321        }
322        if (output instanceof DocumentModel) {
323            attach((DocumentModel) output);
324        } else if (output instanceof DocumentModelList) {
325            ((DocumentModelList) output).forEach(this::attach);
326        }
327        return output;
328    }
329
330    protected void attach(DocumentModel doc) {
331        String sid = ctx.getCoreSession().getSessionId();
332        doc.attach(sid);
333    }
334
335    protected void detach(DocumentModel doc) {
336        doc.detach(false);
337    }
338
339    protected boolean isAsync(String executionId) {
340        return getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE) != null;
341    }
342
343    protected Serializable getTaskId(String executionId) {
344        return getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID);
345    }
346
347    protected AsyncService getAsyncService(String executionId) {
348        String serviceClass = (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE);
349        try {
350            return (AsyncService) Framework.getService(Class.forName(serviceClass));
351        } catch (ClassNotFoundException e) {
352            log.error("AsyncService class {} not found", serviceClass);
353            return null;
354        }
355    }
356
357    protected void setCompleted(String executionId) {
358        getTransientStore().setCompleted(executionId, true);
359    }
360
361    protected boolean isCompleted(String executionId) {
362        if (isAsync(executionId)) {
363            Serializable taskId = getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID);
364            return getAsyncService(executionId).getStatus(taskId).isCompleted();
365        }
366        return getTransientStore().isCompleted(executionId);
367    }
368
369    protected boolean exists(String executionId) {
370        return getTransientStore().exists(executionId);
371    }
372
373    protected void cleanup(String executionId) {
374        getTransientStore().release(executionId);
375    }
376}