001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.automation.core.operations.services.workmanager; 020 021import static org.nuxeo.ecm.core.work.WorkManagerImpl.DEAD_LETTER_QUEUE; 022import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 023 024import java.io.IOException; 025import java.time.Duration; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.concurrent.TimeoutException; 030 031import org.apache.logging.log4j.LogManager; 032import org.apache.logging.log4j.Logger; 033import org.nuxeo.ecm.automation.core.Constants; 034import org.nuxeo.ecm.automation.core.annotations.Operation; 035import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 036import org.nuxeo.ecm.automation.core.annotations.Param; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.Blobs; 039import org.nuxeo.ecm.core.work.WorkComputation; 040import org.nuxeo.ecm.core.work.WorkHolder; 041import org.nuxeo.ecm.core.work.WorkManagerImpl; 042import org.nuxeo.ecm.core.work.api.Work; 043import org.nuxeo.lib.stream.computation.AbstractComputation; 044import org.nuxeo.lib.stream.computation.ComputationContext; 045import org.nuxeo.lib.stream.computation.ComputationPolicy; 046import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder; 047import org.nuxeo.lib.stream.computation.Record; 048import org.nuxeo.lib.stream.computation.Settings; 049import org.nuxeo.lib.stream.computation.StreamManager; 050import org.nuxeo.lib.stream.computation.StreamProcessor; 051import org.nuxeo.lib.stream.computation.Topology; 052import org.nuxeo.runtime.api.Framework; 053import org.nuxeo.runtime.stream.StreamService; 054 055import net.jodah.failsafe.RetryPolicy; 056 057/** 058 * Executes Works stored in the dead letter queue after failure. 059 * 060 * @since 11.1 061 */ 062@Operation(id = WorkManagerRunWorkInFailure.ID, category = Constants.CAT_SERVICES, label = "Executes Works stored in the dead letter queue", addToStudio = false, description = "Try to execute again Works that have been send to a dead letter queue by the WorkManager after failure") 063public class WorkManagerRunWorkInFailure { 064 private static final Logger log = LogManager.getLogger(WorkManagerRunWorkInFailure.class); 065 066 public static final String ID = "WorkManager.RunWorkInFailure"; 067 068 protected static final long DEFAULT_TIMEOUT_SECONDS = 120L; 069 070 protected static final long ASSIGNMENT_TIMEOUT_SECONDS = 60L; 071 072 protected volatile long countTotal; 073 074 protected volatile long countSuccess; 075 076 @Param(name = "timeoutSeconds", required = false) 077 protected long timeout = DEFAULT_TIMEOUT_SECONDS; 078 079 @OperationMethod 080 public Blob run() throws IOException, InterruptedException, TimeoutException { 081 StreamManager streamManager = Framework.getService(StreamService.class).getStreamManager(); 082 Settings settings = new Settings(1, 1, WorkManagerImpl.DEAD_LETTER_QUEUE_CODEC, getComputationPolicy()); 083 StreamProcessor processor = streamManager.registerAndCreateProcessor("RunWorkInFailure", getTopology(), 084 settings); 085 try { 086 countTotal = 0; 087 countSuccess = 0; 088 processor.start(); 089 processor.waitForAssignments(Duration.ofSeconds(ASSIGNMENT_TIMEOUT_SECONDS)); 090 if (!processor.drainAndStop(getTimeout())) { 091 throw new TimeoutException(); 092 } 093 } finally { 094 processor.shutdown(); 095 } 096 return buildResult(); 097 } 098 099 private Blob buildResult() throws IOException { 100 Map<String, Object> result = new HashMap<>(); 101 result.put("total", countTotal); 102 result.put("success", countSuccess); 103 return Blobs.createJSONBlobFromValue(result); 104 } 105 106 protected Duration getTimeout() { 107 return Duration.ofSeconds(timeout); 108 } 109 110 protected ComputationPolicy getComputationPolicy() { 111 return new ComputationPolicyBuilder().retryPolicy(new RetryPolicy(ComputationPolicy.NO_RETRY)) 112 .continueOnFailure(true) 113 .build(); 114 } 115 116 protected Topology getTopology() { 117 return Topology.builder() 118 .addComputation(WorkFailureComputation::new, 119 Collections.singletonList(INPUT_1 + ":" + DEAD_LETTER_QUEUE.getUrn())) 120 .build(); 121 } 122 123 protected class WorkFailureComputation extends AbstractComputation { 124 125 private static final String NAME = "WorkFailure"; 126 127 public WorkFailureComputation() { 128 super(NAME, 1, 0); 129 } 130 131 @Override 132 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 133 context.askForCheckpoint(); 134 Work work = WorkComputation.deserialize(record.getData()); 135 log.info("Trying to run Work from DLQ: " + work.getCategory() + ":" + work.getId()); 136 try { 137 // Using a non RUNNING state to prevent the Work to go in the DLQ 138 work.setWorkInstanceState(Work.State.UNKNOWN); 139 new WorkHolder(work).run(); 140 cleanup(work, null); 141 log.info(work.getId() + ": Success."); 142 countSuccess++; 143 } catch (Exception e) { 144 cleanup(work, e); 145 log.error(work.getId() + ": Failure, skipping.", e); 146 } 147 countTotal++; 148 } 149 150 protected void cleanup(Work work, Exception exception) { 151 try { 152 work.cleanUp(true, exception); 153 } catch (Exception e) { 154 log.error(work.getId() + ": Failure on cleanup", e); 155 } 156 } 157 } 158}