001/*
002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.automation.core.operations.services.workmanager;
020
021import static org.nuxeo.ecm.core.work.WorkManagerImpl.DEAD_LETTER_QUEUE;
022import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
023
024import java.io.IOException;
025import java.time.Duration;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.concurrent.TimeoutException;
030
031import org.apache.logging.log4j.LogManager;
032import org.apache.logging.log4j.Logger;
033import org.nuxeo.ecm.automation.core.Constants;
034import org.nuxeo.ecm.automation.core.annotations.Operation;
035import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
036import org.nuxeo.ecm.automation.core.annotations.Param;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.Blobs;
039import org.nuxeo.ecm.core.work.WorkComputation;
040import org.nuxeo.ecm.core.work.WorkHolder;
041import org.nuxeo.ecm.core.work.WorkManagerImpl;
042import org.nuxeo.ecm.core.work.api.Work;
043import org.nuxeo.lib.stream.computation.AbstractComputation;
044import org.nuxeo.lib.stream.computation.ComputationContext;
045import org.nuxeo.lib.stream.computation.ComputationPolicy;
046import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
047import org.nuxeo.lib.stream.computation.Record;
048import org.nuxeo.lib.stream.computation.Settings;
049import org.nuxeo.lib.stream.computation.StreamManager;
050import org.nuxeo.lib.stream.computation.StreamProcessor;
051import org.nuxeo.lib.stream.computation.Topology;
052import org.nuxeo.runtime.api.Framework;
053import org.nuxeo.runtime.stream.StreamService;
054
055import net.jodah.failsafe.RetryPolicy;
056
057/**
058 * Executes Works stored in the dead letter queue after failure.
059 *
060 * @since 11.1
061 */
062@Operation(id = WorkManagerRunWorkInFailure.ID, category = Constants.CAT_SERVICES, label = "Executes Works stored in the dead letter queue", addToStudio = false, description = "Try to execute again Works that have been send to a dead letter queue by the WorkManager after failure")
063public class WorkManagerRunWorkInFailure {
064    private static final Logger log = LogManager.getLogger(WorkManagerRunWorkInFailure.class);
065
066    public static final String ID = "WorkManager.RunWorkInFailure";
067
068    protected static final long DEFAULT_TIMEOUT_SECONDS = 120L;
069
070    protected static final long ASSIGNMENT_TIMEOUT_SECONDS = 60L;
071
072    protected volatile long countTotal;
073
074    protected volatile long countSuccess;
075
076    @Param(name = "timeoutSeconds", required = false)
077    protected long timeout = DEFAULT_TIMEOUT_SECONDS;
078
079    @OperationMethod
080    public Blob run() throws IOException, InterruptedException, TimeoutException {
081        StreamManager streamManager = Framework.getService(StreamService.class).getStreamManager();
082        Settings settings = new Settings(1, 1, WorkManagerImpl.DEAD_LETTER_QUEUE_CODEC, getComputationPolicy());
083        StreamProcessor processor = streamManager.registerAndCreateProcessor("RunWorkInFailure", getTopology(),
084                settings);
085        try {
086            countTotal = 0;
087            countSuccess = 0;
088            processor.start();
089            processor.waitForAssignments(Duration.ofSeconds(ASSIGNMENT_TIMEOUT_SECONDS));
090            if (!processor.drainAndStop(getTimeout())) {
091                throw new TimeoutException();
092            }
093        } finally {
094            processor.shutdown();
095        }
096        return buildResult();
097    }
098
099    private Blob buildResult() throws IOException {
100        Map<String, Object> result = new HashMap<>();
101        result.put("total", countTotal);
102        result.put("success", countSuccess);
103        return Blobs.createJSONBlobFromValue(result);
104    }
105
106    protected Duration getTimeout() {
107        return Duration.ofSeconds(timeout);
108    }
109
110    protected ComputationPolicy getComputationPolicy() {
111        return new ComputationPolicyBuilder().retryPolicy(new RetryPolicy(ComputationPolicy.NO_RETRY))
112                                             .continueOnFailure(true)
113                                             .build();
114    }
115
116    protected Topology getTopology() {
117        return Topology.builder()
118                       .addComputation(WorkFailureComputation::new,
119                               Collections.singletonList(INPUT_1 + ":" + DEAD_LETTER_QUEUE.getUrn()))
120                       .build();
121    }
122
123    protected class WorkFailureComputation extends AbstractComputation {
124
125        private static final String NAME = "WorkFailure";
126
127        public WorkFailureComputation() {
128            super(NAME, 1, 0);
129        }
130
131        @Override
132        public void processRecord(ComputationContext context, String inputStreamName, Record record) {
133            context.askForCheckpoint();
134            Work work = WorkComputation.deserialize(record.getData());
135            log.info("Trying to run Work from DLQ: " + work.getCategory() + ":" + work.getId());
136            try {
137                // Using a non RUNNING state to prevent the Work to go in the DLQ
138                work.setWorkInstanceState(Work.State.UNKNOWN);
139                new WorkHolder(work).run();
140                cleanup(work, null);
141                log.info(work.getId() + ": Success.");
142                countSuccess++;
143            } catch (Exception e) {
144                cleanup(work, e);
145                log.error(work.getId() + ": Failure, skipping.", e);
146            }
147            countTotal++;
148        }
149
150        protected void cleanup(Work work, Exception exception) {
151            try {
152                work.cleanUp(true, exception);
153            } catch (Exception e) {
154                log.error(work.getId() + ": Failure on cleanup", e);
155            }
156        }
157    }
158}