001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_DEFAULT_VALUE;
022import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_KEY;
023import static org.nuxeo.ecm.core.work.StreamWorkManager.STORESTATE_KEY;
024
025import java.io.ByteArrayInputStream;
026import java.io.ByteArrayOutputStream;
027import java.io.IOException;
028import java.io.ObjectInput;
029import java.io.ObjectInputStream;
030import java.io.ObjectOutput;
031import java.io.ObjectOutputStream;
032import java.util.concurrent.TimeUnit;
033
034import org.apache.commons.collections.buffer.CircularFifoBuffer;
035import org.apache.logging.log4j.LogManager;
036import org.apache.logging.log4j.Logger;
037import org.nuxeo.common.utils.ExceptionUtils;
038import org.nuxeo.ecm.core.work.api.Work;
039import org.nuxeo.lib.stream.computation.AbstractComputation;
040import org.nuxeo.lib.stream.computation.ComputationContext;
041import org.nuxeo.lib.stream.computation.Record;
042import org.nuxeo.lib.stream.log.Name;
043import org.nuxeo.runtime.api.Framework;
044import org.nuxeo.runtime.metrics.MetricsService;
045import org.nuxeo.runtime.services.config.ConfigurationService;
046
047import io.dropwizard.metrics5.MetricName;
048import io.dropwizard.metrics5.MetricRegistry;
049import io.dropwizard.metrics5.SharedMetricRegistries;
050import io.dropwizard.metrics5.Timer;
051
052/**
053 * A Stream computation that consumes works.
054 *
055 * @since 9.3
056 */
057public class WorkComputation extends AbstractComputation {
058    private static final Logger log = LogManager.getLogger(WorkComputation.class);
059
060    protected static final int IDS_SIZE = 50;
061
062    protected final CircularFifoBuffer workIds = new CircularFifoBuffer(IDS_SIZE);
063
064    protected final Timer workTimer;
065
066    protected final long stateTTL;
067
068    protected Work work;
069
070    public WorkComputation(String name) {
071        super(name, 1, 0);
072        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
073        workTimer = registry.timer(
074                MetricName.build("nuxeo.works.queue.timer").tagged("queue", Name.ofUrn(name).getName()));
075        stateTTL = Framework.getService(ConfigurationService.class).getLong(STATETTL_KEY, STATETTL_DEFAULT_VALUE);
076    }
077
078    @Override
079    public void signalStop() {
080        if (work != null) {
081            work.setWorkInstanceSuspending();
082        }
083    }
084
085    @Override
086    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
087        work = deserialize(record.getData());
088        try {
089            if (work.isCoalescing() && WorkStateHelper.getLastOffset(work.getId()) > context.getLastOffset().offset()) {
090                log.debug("Skipping duplicate of coalescing work id: " + work.getId() + " " + work);
091            } else if (work.isIdempotent() && workIds.contains(work.getId())) {
092                log.debug("Skipping duplicate of idempotent work id: " + work.getId());
093            } else {
094                boolean storeState = Framework.getService(ConfigurationService.class).isBooleanTrue(STORESTATE_KEY);
095                if (storeState) {
096                    if (WorkStateHelper.getState(work.getId()) != Work.State.SCHEDULED) {
097                        // try to avoid a race condition where state is not yet written in the kv
098                        Thread.sleep(200);
099                        if (WorkStateHelper.getState(work.getId()) != Work.State.SCHEDULED) {
100                            log.warn("work has been canceled, saving and returning");
101                            context.askForCheckpoint();
102                            return;
103                        } else {
104                            log.warn("Race condition avoided on " + work.getId());
105                        }
106                    }
107                    WorkStateHelper.setState(work.getId(), Work.State.RUNNING, stateTTL);
108                }
109                // The running state is needed to activate the DLQ mechanism
110                work.setWorkInstanceState(Work.State.RUNNING);
111                new WorkHolder(work).run();
112                // if the same work id has not been scheduled again, set the state to null for 'completed'
113                if (storeState && WorkStateHelper.getState(work.getId()) == Work.State.RUNNING) {
114                    WorkStateHelper.setState(work.getId(), null, stateTTL);
115                }
116                workIds.add(work.getId());
117            }
118            work.cleanUp(true, null);
119            if (!work.isWorkInstanceSuspended()) {
120                context.askForCheckpoint();
121            }
122        } catch (Exception e) {
123            if (Thread.currentThread().isInterrupted() || ExceptionUtils.hasInterruptedCause(e)) {
124                Thread.currentThread().interrupt();
125                // propagate the interruption to stop the computation thread
126                // thread has been interrupted we don't want to mark the work as completed.
127                log.warn(
128                        String.format("Work id: %s title: %s, has been interrupted the work thread is terminating, it will be rescheduled, record: %s",
129                                work.getId(), work.getTitle(), record), e);
130            } else {
131                // Report an error on the work and continue
132                log.error(String.format(
133                        "Skip Work in failure: id: %s, title: %s, offset: %s, record: %s, thread: %s", work.getId(),
134                        work.getTitle(), context.getLastOffset(), record, Thread.currentThread().getName()), e);
135                context.askForCheckpoint();
136            }
137            // Cleanup should take care of logging error except if exception comes from the cleanup
138            log.debug("Exception during work " + work.getId(), e);
139            // Try to cleanup after an exception, if exception comes from the previous cleanup it is a duplicate cleanup
140            cleanupWorkInFailure(work, e);
141        } finally {
142            workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
143            work = null;
144        }
145    }
146
147    protected void cleanupWorkInFailure(Work work, Exception exception) {
148        try {
149            work.cleanUp(false, exception);
150        } catch (Exception e) {
151            log.error("Error during cleanup work: " + work.getId(), e);
152        }
153    }
154
155    @SuppressWarnings("squid:S2093")
156    public static Work deserialize(byte[] data) {
157        // TODO: switch to commons-lang3 SerializationUtils
158        ByteArrayInputStream bis = new ByteArrayInputStream(data);
159        ObjectInput in = null;
160        try {
161            in = new ObjectInputStream(bis);
162            return (Work) in.readObject();
163        } catch (IOException | ClassNotFoundException e) {
164            throw new RuntimeException(e);
165        } finally {
166            try {
167                if (in != null) {
168                    in.close();
169                }
170            } catch (IOException ex) {
171                // ignore close exception so we cannot use a try-with-resources squid:S2093
172            }
173        }
174    }
175
176    @SuppressWarnings("squid:S2093")
177    public static byte[] serialize(Work work) {
178        ByteArrayOutputStream bos = new ByteArrayOutputStream();
179        ObjectOutput out;
180        try {
181            out = new ObjectOutputStream(bos);
182            out.writeObject(work);
183            out.flush();
184            return bos.toByteArray();
185        } catch (IOException e) {
186            throw new RuntimeException(e);
187        } finally {
188            try {
189                bos.close();
190            } catch (IOException ex) {
191                // ignore close exception so we cannot use a try-with-resources squid:S2093
192            }
193        }
194    }
195}