001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_DEFAULT_VALUE;
022import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_KEY;
023import static org.nuxeo.ecm.core.work.StreamWorkManager.STORESTATE_KEY;
024
025import java.io.ByteArrayInputStream;
026import java.io.ByteArrayOutputStream;
027import java.io.IOException;
028import java.io.ObjectInput;
029import java.io.ObjectInputStream;
030import java.io.ObjectOutput;
031import java.io.ObjectOutputStream;
032import java.util.concurrent.TimeUnit;
033
034import org.apache.commons.collections.buffer.CircularFifoBuffer;
035import org.apache.logging.log4j.LogManager;
036import org.apache.logging.log4j.Logger;
037import org.nuxeo.common.utils.ExceptionUtils;
038import org.nuxeo.ecm.core.work.api.Work;
039import org.nuxeo.lib.stream.computation.AbstractComputation;
040import org.nuxeo.lib.stream.computation.ComputationContext;
041import org.nuxeo.lib.stream.computation.Record;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.metrics.MetricsService;
044import org.nuxeo.runtime.services.config.ConfigurationService;
045
046import com.codahale.metrics.MetricRegistry;
047import com.codahale.metrics.SharedMetricRegistries;
048import com.codahale.metrics.Timer;
049
050/**
051 * A Stream computation that consumes works.
052 *
053 * @since 9.3
054 */
055public class WorkComputation extends AbstractComputation {
056    private static final Logger log = LogManager.getLogger(WorkComputation.class);
057
058    protected static final int IDS_SIZE = 50;
059
060    protected final CircularFifoBuffer workIds = new CircularFifoBuffer(IDS_SIZE);
061
062    protected final Timer workTimer;
063
064    protected final long stateTTL;
065
066    protected Work work;
067
068    public WorkComputation(String name) {
069        super(name, 1, 0);
070        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
071        workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", name, "total"));
072        stateTTL = Long.parseLong(
073                Framework.getService(ConfigurationService.class).getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE));
074    }
075
076    @Override
077    public void signalStop() {
078        if (work != null) {
079            work.setWorkInstanceSuspending();
080        }
081    }
082
083    @Override
084    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
085        work = deserialize(record.getData());
086        try {
087            if (work.isCoalescing() && WorkStateHelper.getLastOffset(work.getId()) > context.getLastOffset().offset()) {
088                log.debug("Skipping duplicate of coalescing work id: " + work.getId() + " " + work);
089            } else if (work.isIdempotent() && workIds.contains(work.getId())) {
090                log.debug("Skipping duplicate of idempotent work id: " + work.getId());
091            } else {
092                boolean storeState = Framework.getService(ConfigurationService.class)
093                                              .isBooleanPropertyTrue(STORESTATE_KEY);
094                if (storeState) {
095                    if (WorkStateHelper.getState(work.getId()) != Work.State.SCHEDULED) {
096                        log.warn("work has been canceled, saving and returning");
097                        context.askForCheckpoint();
098                        return;
099                    }
100                    WorkStateHelper.setState(work.getId(), Work.State.RUNNING, stateTTL);
101                }
102                new WorkHolder(work).run();
103                // if the same work id has not been scheduled again, set the state to null for 'completed'
104                if (storeState && WorkStateHelper.getState(work.getId()) == Work.State.RUNNING) {
105                    WorkStateHelper.setState(work.getId(), null, stateTTL);
106                }
107                workIds.add(work.getId());
108            }
109            work.cleanUp(true, null);
110            if (!work.isWorkInstanceSuspended()) {
111                context.askForCheckpoint();
112            }
113        } catch (Exception e) {
114            if (ExceptionUtils.hasInterruptedCause(e)) {
115                Thread.currentThread().interrupt();
116                // propagate the interruption to stop the computation thread
117                // thread has been interrupted we don't want to mark the work as completed.
118                log.warn(
119                        String.format("Work id: %s title: %s, has been interrupted, it will be rescheduled, record: %s",
120                                work.getId(), work.getTitle(), record));
121            } else {
122                // Report an error on the work and continue
123                log.error(String.format("Work id: %s title: %s is in error, the work is skipped, record: %s",
124                        work.getId(), work.getTitle(), record));
125                context.askForCheckpoint();
126            }
127            // Cleanup should take care of logging error, but better to dup this in debug
128            log.debug("Exception during work " + work.getId(), e);
129            work.cleanUp(false, e);
130        } finally {
131            workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
132            work = null;
133        }
134    }
135
136    @SuppressWarnings("squid:S2093")
137    public static Work deserialize(byte[] data) {
138        // TODO: switch to commons-lang3 SerializationUtils
139        ByteArrayInputStream bis = new ByteArrayInputStream(data);
140        ObjectInput in = null;
141        try {
142            in = new ObjectInputStream(bis);
143            return (Work) in.readObject();
144        } catch (IOException | ClassNotFoundException e) {
145            throw new RuntimeException(e);
146        } finally {
147            try {
148                if (in != null) {
149                    in.close();
150                }
151            } catch (IOException ex) {
152                // ignore close exception so we cannot use a try-with-resources squid:S2093
153            }
154        }
155    }
156
157    @SuppressWarnings("squid:S2093")
158    public static byte[] serialize(Work work) {
159        ByteArrayOutputStream bos = new ByteArrayOutputStream();
160        ObjectOutput out;
161        try {
162            out = new ObjectOutputStream(bos);
163            out.writeObject(work);
164            out.flush();
165            return bos.toByteArray();
166        } catch (IOException e) {
167            throw new RuntimeException(e);
168        } finally {
169            try {
170                bos.close();
171            } catch (IOException ex) {
172                // ignore close exception so we cannot use a try-with-resources squid:S2093
173            }
174        }
175    }
176}