001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.ObjectInput;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutput;
027import java.io.ObjectOutputStream;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.commons.collections.buffer.CircularFifoBuffer;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.work.api.Work;
034import org.nuxeo.lib.stream.computation.AbstractComputation;
035import org.nuxeo.lib.stream.computation.ComputationContext;
036import org.nuxeo.lib.stream.computation.Record;
037import org.nuxeo.runtime.metrics.MetricsService;
038
039import com.codahale.metrics.MetricRegistry;
040import com.codahale.metrics.SharedMetricRegistries;
041import com.codahale.metrics.Timer;
042
043/**
044 * A Stream computation that consumes works.
045 *
046 * @since 9.3
047 */
048public class WorkComputation extends AbstractComputation {
049    private static final Log log = LogFactory.getLog(WorkComputation.class);
050
051    protected static final int IDS_SIZE = 50;
052
053    protected final CircularFifoBuffer workIds = new CircularFifoBuffer(IDS_SIZE);
054
055    protected final Timer workTimer;
056
057    public WorkComputation(String name) {
058        super(name, 1, 0);
059        MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
060        workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", name, "total"));
061    }
062
063    @Override
064    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
065        Work work = deserialize(record.data);
066        try {
067            if (workIds.contains(work.getId())) {
068                log.warn("Duplicate work id: " + work.getId() + " skipping");
069            } else {
070                new WorkHolder(work).run();
071                workIds.add(work.getId());
072            }
073        } catch (Exception e) {
074            // TODO: check what to catch exactly we don't want to kill the computation on error
075            log.warn(String.format("Work id: %s title: %s, raise an exception, work is marked as completed",
076                    work.getId(), work.getTitle()), e);
077        } finally {
078            work.cleanUp(true, null);
079            workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS);
080            context.askForCheckpoint();
081        }
082    }
083
084    public static Work deserialize(byte[] data) {
085        // TODO: switch to commons-lang3 SerializationUtils
086        ByteArrayInputStream bis = new ByteArrayInputStream(data);
087        ObjectInput in = null;
088        try {
089            in = new ObjectInputStream(bis);
090            return (Work) in.readObject();
091        } catch (IOException | ClassNotFoundException e) {
092            throw new RuntimeException(e);
093        } finally {
094            try {
095                if (in != null) {
096                    in.close();
097                }
098            } catch (IOException ex) {
099                // ignore close exception
100            }
101        }
102    }
103
104    public static byte[] serialize(Work work) {
105        ByteArrayOutputStream bos = new ByteArrayOutputStream();
106        ObjectOutput out;
107        try {
108            out = new ObjectOutputStream(bos);
109            out.writeObject(work);
110            out.flush();
111            return bos.toByteArray();
112        } catch (IOException e) {
113            System.out.println("Error " + e.getMessage());
114            throw new RuntimeException(e);
115        } finally {
116            try {
117                bos.close();
118            } catch (IOException ex) {
119                // ignore close exception
120            }
121        }
122    }
123}