001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.ObjectInput; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutput; 027import java.io.ObjectOutputStream; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.commons.collections.buffer.CircularFifoBuffer; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.work.api.Work; 034import org.nuxeo.lib.stream.computation.AbstractComputation; 035import org.nuxeo.lib.stream.computation.ComputationContext; 036import org.nuxeo.lib.stream.computation.Record; 037import org.nuxeo.runtime.metrics.MetricsService; 038 039import com.codahale.metrics.MetricRegistry; 040import com.codahale.metrics.SharedMetricRegistries; 041import com.codahale.metrics.Timer; 042 043/** 044 * A Stream computation that consumes works. 045 * 046 * @since 9.3 047 */ 048public class WorkComputation extends AbstractComputation { 049 private static final Log log = LogFactory.getLog(WorkComputation.class); 050 051 protected static final int IDS_SIZE = 50; 052 053 protected final CircularFifoBuffer workIds = new CircularFifoBuffer(IDS_SIZE); 054 055 protected final Timer workTimer; 056 057 public WorkComputation(String name) { 058 super(name, 1, 0); 059 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 060 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", name, "total")); 061 } 062 063 @Override 064 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 065 Work work = deserialize(record.data); 066 try { 067 if (workIds.contains(work.getId())) { 068 log.warn("Duplicate work id: " + work.getId() + " skipping"); 069 } else { 070 new WorkHolder(work).run(); 071 workIds.add(work.getId()); 072 } 073 } catch (Exception e) { 074 // TODO: check what to catch exactly we don't want to kill the computation on error 075 log.warn(String.format("Work id: %s title: %s, raise an exception, work is marked as completed", 076 work.getId(), work.getTitle()), e); 077 } finally { 078 work.cleanUp(true, null); 079 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 080 context.askForCheckpoint(); 081 } 082 } 083 084 public static Work deserialize(byte[] data) { 085 // TODO: switch to commons-lang3 SerializationUtils 086 ByteArrayInputStream bis = new ByteArrayInputStream(data); 087 ObjectInput in = null; 088 try { 089 in = new ObjectInputStream(bis); 090 return (Work) in.readObject(); 091 } catch (IOException | ClassNotFoundException e) { 092 throw new RuntimeException(e); 093 } finally { 094 try { 095 if (in != null) { 096 in.close(); 097 } 098 } catch (IOException ex) { 099 // ignore close exception 100 } 101 } 102 } 103 104 public static byte[] serialize(Work work) { 105 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 106 ObjectOutput out; 107 try { 108 out = new ObjectOutputStream(bos); 109 out.writeObject(work); 110 out.flush(); 111 return bos.toByteArray(); 112 } catch (IOException e) { 113 System.out.println("Error " + e.getMessage()); 114 throw new RuntimeException(e); 115 } finally { 116 try { 117 bos.close(); 118 } catch (IOException ex) { 119 // ignore close exception 120 } 121 } 122 } 123}