001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nelson Silva <nsilva@nuxeo.com> 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.adapters; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.UUID; 030 031import javax.mail.MessagingException; 032import javax.servlet.http.HttpServletRequest; 033import javax.servlet.http.HttpServletResponse; 034import javax.ws.rs.DELETE; 035import javax.ws.rs.GET; 036import javax.ws.rs.POST; 037import javax.ws.rs.Path; 038import javax.ws.rs.PathParam; 039import javax.ws.rs.Produces; 040import javax.ws.rs.core.Context; 041import javax.ws.rs.core.MediaType; 042import javax.ws.rs.core.Response; 043 044import org.apache.commons.collections.CollectionUtils; 045import org.apache.logging.log4j.LogManager; 046import org.apache.logging.log4j.Logger; 047import org.nuxeo.ecm.automation.AutomationService; 048import org.nuxeo.ecm.automation.OperationCallback; 049import org.nuxeo.ecm.automation.OperationContext; 050import org.nuxeo.ecm.automation.OperationException; 051import org.nuxeo.ecm.automation.OperationType; 052import org.nuxeo.ecm.automation.core.impl.InvokableMethod; 053import org.nuxeo.ecm.automation.core.util.BlobList; 054import org.nuxeo.ecm.automation.jaxrs.io.operations.ExecutionRequest; 055import org.nuxeo.ecm.automation.server.AutomationServer; 056import org.nuxeo.ecm.automation.server.jaxrs.OperationResource; 057import org.nuxeo.ecm.automation.server.jaxrs.ResponseHelper; 058import org.nuxeo.ecm.core.api.AsyncService; 059import org.nuxeo.ecm.core.api.AsyncStatus; 060import org.nuxeo.ecm.core.api.Blob; 061import org.nuxeo.ecm.core.api.CoreInstance; 062import org.nuxeo.ecm.core.api.CoreSession; 063import org.nuxeo.ecm.core.api.DocumentModel; 064import org.nuxeo.ecm.core.api.DocumentModelList; 065import org.nuxeo.ecm.core.api.NuxeoException; 066import org.nuxeo.ecm.core.api.NuxeoPrincipal; 067import org.nuxeo.ecm.core.transientstore.api.TransientStore; 068import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 069import org.nuxeo.ecm.platform.web.common.exceptionhandling.ExceptionHelper; 070import org.nuxeo.ecm.platform.web.common.vh.VirtualHostHelper; 071import org.nuxeo.ecm.webengine.model.WebAdapter; 072import org.nuxeo.ecm.webengine.model.exceptions.WebResourceNotFoundException; 073import org.nuxeo.ecm.webengine.model.impl.DefaultAdapter; 074import org.nuxeo.runtime.api.Framework; 075import org.nuxeo.runtime.api.login.LoginComponent; 076import org.nuxeo.runtime.transaction.TransactionHelper; 077 078import com.fasterxml.jackson.databind.ObjectMapper; 079 080import io.opencensus.common.Scope; 081import io.opencensus.trace.Link; 082import io.opencensus.trace.Span; 083import io.opencensus.trace.SpanContext; 084import io.opencensus.trace.Tracing; 085 086/** 087 * Adapter that allows asynchronous execution of operations. 088 * 089 * @since 10.3 090 */ 091@WebAdapter(name = AsyncOperationAdapter.NAME, type = "AsyncOperationAdapter", targetType = "operation") 092@Produces({ MediaType.APPLICATION_JSON }) 093public class AsyncOperationAdapter extends DefaultAdapter { 094 095 public static final String NAME = "async"; 096 097 private static final Logger log = LogManager.getLogger(AsyncOperationAdapter.class); 098 099 protected static final String STATUS_STORE_NAME = "automation"; 100 101 protected static final String TRANSIENT_STORE_SERVICE = "service"; 102 103 protected static final String TRANSIENT_STORE_TASK_ID = "taskId"; 104 105 protected static final String TRANSIENT_STORE_ERROR = "error"; 106 107 protected static final String TRANSIENT_STORE_OUTPUT = "output"; 108 109 protected static final String TRANSIENT_STORE_OUTPUT_BLOB = "blob"; 110 111 protected static final String STATUS_PATH= "status"; 112 113 protected static final String RUNNING_STATUS= "RUNNING"; 114 115 protected static final String RESULT_URL_KEY= "url"; 116 117 protected static final ObjectMapper MAPPER = new ObjectMapper(); 118 119 @Context 120 protected AutomationService service; 121 122 @Context 123 protected HttpServletRequest request; 124 125 @Context 126 protected HttpServletResponse response; 127 128 @Context 129 protected CoreSession session; 130 131 @Context 132 protected AutomationServer srv; 133 134 @SuppressWarnings("resource") // ExecutionRequest's OperationContext not owned by us, don't close it 135 @POST 136 public Object doPost(ExecutionRequest xreq) { 137 OperationResource op = (OperationResource) getTarget(); 138 String opId = op.getId(); 139 140 if (!srv.accept(opId, op.isChain(), request)) { 141 return ResponseHelper.notFound(); 142 } 143 String executionId = UUID.randomUUID().toString(); 144 145 // session will be set in the task thread 146 // ExecutionRequest's OperationContext not owned by us, don't close it 147 OperationContext opCtx = xreq.createContext(request, response, null); // NOSONAR 148 149 opCtx.setCallback(new OperationCallback() { 150 151 private Object output; 152 153 @Override 154 public void onChainEnter(OperationType chain) { 155 // 156 } 157 158 @Override 159 public void onChainExit() { 160 setOutput(executionId, (Serializable) output); 161 setCompleted(executionId); 162 } 163 164 @Override 165 public void onOperationEnter(OperationContext context, OperationType type, InvokableMethod method, 166 Map<String, Object> params) { 167 enterMethod(executionId, method); 168 } 169 170 @Override 171 public void onOperationExit(Object output) { 172 this.output = output; 173 } 174 175 @Override 176 public OperationException onError(OperationException error) { 177 setError(executionId, error); 178 return error; 179 } 180 181 }); 182 183 String repoName = session.getRepositoryName(); 184 NuxeoPrincipal principal = session.getPrincipal(); 185 186 // TODO NXP-26303: use thread pool 187 SpanContext traceContext = Tracing.getTracer().getCurrentSpan().getContext(); 188 new Thread(() -> { 189 Span span = Tracing.getTracer().spanBuilderWithRemoteParent("automation/" + opId + "/async", traceContext).startSpan(); 190 span.addLink(Link.fromSpanContext(traceContext, Link.Type.PARENT_LINKED_SPAN)); 191 try (Scope scope = Tracing.getTracer().withSpan(span)) { 192 TransactionHelper.runInTransaction(() -> { 193 LoginComponent.pushPrincipal(principal); 194 try { 195 CoreSession s = CoreInstance.getCoreSession(repoName, principal); 196 opCtx.setCoreSession(s); 197 service.run(opCtx, opId, xreq.getParams()); 198 } catch (OperationException e) { 199 setError(executionId, e); 200 } finally { 201 LoginComponent.popPrincipal(); 202 } 203 }); 204 } 205 }, String.format("Nuxeo-AsyncOperation-%s", executionId)).start(); 206 207 try { 208 String statusURL = String.format("%s%s/%s/%s", ctx.getServerURL(), getPath(), executionId, STATUS_PATH); 209 return Response.status(HttpServletResponse.SC_ACCEPTED).location(new URI(statusURL)).build(); 210 } catch (URISyntaxException e) { 211 throw new NuxeoException(e); 212 } 213 } 214 215 @GET 216 @Path("{executionId}/status") 217 public Object status(@PathParam("executionId") String executionId) throws IOException, MessagingException { 218 String error = getError(executionId); 219 if (error != null) { 220 throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 221 } 222 if (isCompleted(executionId)) { 223 String resURL = String.format("%s/%s", getPath(), executionId); 224 return redirect(resURL); 225 } else { 226 if (isAsync(executionId)) { 227 Serializable taskId = getTaskId(executionId); 228 Object result = getAsyncService(executionId).getStatus(taskId); 229 return ResponseHelper.getResponse(result, request, HttpServletResponse.SC_OK); 230 } else { 231 return Response.status(HttpServletResponse.SC_OK) 232 .entity(MAPPER.writeValueAsString(RUNNING_STATUS)) 233 .build(); 234 } 235 236 } 237 } 238 239 @GET 240 @Path("{executionId}") 241 public Object result(@PathParam("executionId") String executionId) throws IOException, MessagingException { 242 243 if (isCompleted(executionId)) { 244 Object output = getResult(executionId); 245 246 String error = getError(executionId); 247 248 // cleanup after result is accessed 249 cleanup(executionId); 250 251 if (error != null) { 252 throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 253 } 254 255 // if output is a map let's return as json 256 if (output instanceof Map) { 257 Map map = (Map) output; 258 // if output has a "url" key make it absolute 259 Object url = ((Map<?, ?>) output).get(RESULT_URL_KEY); 260 if (url instanceof String) { 261 map = new HashMap(map); 262 try { 263 boolean isAbsolute = new URI((String) url).isAbsolute(); 264 String baseUrl = VirtualHostHelper.getBaseURL(ctx.getRequest()); 265 map.put(RESULT_URL_KEY, isAbsolute ? url : baseUrl + url); 266 } catch (URISyntaxException e) { 267 log.error("Failed to parse result url {}", url); 268 } 269 } 270 return Response.status(HttpServletResponse.SC_OK).entity(MAPPER.writeValueAsString(map)).build(); 271 } 272 273 return ResponseHelper.getResponse(output, request, HttpServletResponse.SC_OK); 274 } 275 276 throw new WebResourceNotFoundException("Execution with id=" + executionId + " not found"); 277 } 278 279 @DELETE 280 @Path("{executionId}") 281 public Object abort(@PathParam("executionId") String executionId) throws IOException, MessagingException { 282 if (exists(executionId) && !isCompleted(executionId)) { 283 // TODO NXP-26304: support aborting any execution 284 if (isAsync(executionId)) { 285 Serializable taskId = getTaskId(executionId); 286 return getAsyncService(executionId).abort(taskId); 287 } 288 return ResponseHelper.getResponse(RUNNING_STATUS, request, HttpServletResponse.SC_OK); 289 } 290 throw new WebResourceNotFoundException("Execution with id=" + executionId + " has completed"); 291 } 292 293 protected TransientStore getTransientStore() { 294 return Framework.getService(TransientStoreService.class).getStore(STATUS_STORE_NAME); 295 } 296 297 protected void enterMethod(String executionId, InvokableMethod method) { 298 // reset parameters 299 getTransientStore().remove(executionId); 300 301 // AsyncService.class is default => not async 302 if (!AsyncService.class.equals(method.getAsyncService())) { 303 getTransientStore().putParameter(executionId, TRANSIENT_STORE_SERVICE, method.getAsyncService().getName()); 304 } 305 } 306 307 protected void setError(String executionId, Throwable t) { 308 setError(executionId, ExceptionHelper.unwrapException(t).getMessage()); 309 } 310 311 /** 312 * @deprecated since 11.1, because unused 313 */ 314 @Deprecated(since = "11.1") 315 protected void setError(String executionId, String error) { 316 getTransientStore().putParameter(executionId, TRANSIENT_STORE_ERROR, error); 317 setCompleted(executionId); 318 } 319 320 public String getError(String executionId) { 321 return (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_ERROR); 322 } 323 324 protected void setOutput(String executionId, Serializable output) { 325 TransientStore ts = getTransientStore(); 326 // store only taskId for async tasks 327 if (isAsync(executionId)) { 328 Serializable taskId = output instanceof AsyncStatus ? ((AsyncStatus<?>) output).getId() : output; 329 ts.putParameter(executionId, TRANSIENT_STORE_TASK_ID, taskId); 330 } else { 331 if (output instanceof DocumentModel) { 332 detach((DocumentModel) output); 333 } else if (output instanceof DocumentModelList) { 334 ((DocumentModelList) output).forEach(this::detach); 335 } 336 if (output instanceof Blob) { 337 ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, true); 338 ts.putBlobs(executionId, Collections.singletonList((Blob) output)); 339 } else if (output instanceof BlobList) { 340 ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, false); 341 ts.putBlobs(executionId, (BlobList) output); 342 } else { 343 ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT, output); 344 } 345 } 346 } 347 348 protected Object getResult(String executionId) { 349 TransientStore ts = getTransientStore(); 350 351 if (isAsync(executionId)) { 352 AsyncService<Serializable, ?, ?> service = getAsyncService(executionId); 353 if (service != null) { 354 Serializable taskId = ts.getParameter(executionId, TRANSIENT_STORE_TASK_ID); 355 return service.getResult(taskId); 356 } 357 } 358 359 Object output; 360 List<Blob> blobs = ts.getBlobs(executionId); 361 if (CollectionUtils.isNotEmpty(blobs)) { 362 boolean isSingle = (boolean) ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB); 363 output = isSingle ? blobs.get(0) : new BlobList(blobs); 364 } else { 365 output = ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT); 366 } 367 if (output instanceof DocumentModel) { 368 attach((DocumentModel) output); 369 } else if (output instanceof DocumentModelList) { 370 ((DocumentModelList) output).forEach(this::attach); 371 } 372 return output; 373 } 374 375 protected void attach(DocumentModel doc) { 376 doc.attach(ctx.getCoreSession()); 377 } 378 379 protected void detach(DocumentModel doc) { 380 doc.detach(false); 381 } 382 383 protected boolean isAsync(String executionId) { 384 return getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE) != null; 385 } 386 387 protected Serializable getTaskId(String executionId) { 388 return getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID); 389 } 390 391 protected AsyncService<Serializable, ?, ?> getAsyncService(String executionId) { 392 String serviceClass = (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE); 393 try { 394 @SuppressWarnings("unchecked") 395 AsyncService<Serializable, ?, ?> asyncService = (AsyncService<Serializable, ?, ?>) Framework.getService(Class.forName(serviceClass)); 396 return asyncService; 397 } catch (ClassNotFoundException e) { 398 log.error("AsyncService class {} not found", serviceClass); 399 return null; 400 } 401 } 402 403 protected void setCompleted(String executionId) { 404 getTransientStore().setCompleted(executionId, true); 405 } 406 407 protected boolean isCompleted(String executionId) { 408 if (isAsync(executionId)) { 409 Serializable taskId = getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID); 410 return getAsyncService(executionId).getStatus(taskId).isCompleted(); 411 } 412 return getTransientStore().isCompleted(executionId); 413 } 414 415 protected boolean exists(String executionId) { 416 return getTransientStore().exists(executionId); 417 } 418 419 protected void cleanup(String executionId) { 420 getTransientStore().release(executionId); 421 } 422}