001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_DEFAULT_VALUE; 022import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_KEY; 023import static org.nuxeo.ecm.core.work.StreamWorkManager.STORESTATE_KEY; 024 025import java.io.ByteArrayInputStream; 026import java.io.ByteArrayOutputStream; 027import java.io.IOException; 028import java.io.ObjectInput; 029import java.io.ObjectInputStream; 030import java.io.ObjectOutput; 031import java.io.ObjectOutputStream; 032import java.util.concurrent.TimeUnit; 033 034import org.apache.commons.collections.buffer.CircularFifoBuffer; 035import org.apache.logging.log4j.LogManager; 036import org.apache.logging.log4j.Logger; 037import org.nuxeo.common.utils.ExceptionUtils; 038import org.nuxeo.ecm.core.work.api.Work; 039import org.nuxeo.lib.stream.computation.AbstractComputation; 040import org.nuxeo.lib.stream.computation.ComputationContext; 041import org.nuxeo.lib.stream.computation.Record; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.metrics.MetricsService; 044import org.nuxeo.runtime.services.config.ConfigurationService; 045 046import com.codahale.metrics.MetricRegistry; 047import com.codahale.metrics.SharedMetricRegistries; 048import com.codahale.metrics.Timer; 049 050/** 051 * A Stream computation that consumes works. 052 * 053 * @since 9.3 054 */ 055public class WorkComputation extends AbstractComputation { 056 private static final Logger log = LogManager.getLogger(WorkComputation.class); 057 058 protected static final int IDS_SIZE = 50; 059 060 protected final CircularFifoBuffer workIds = new CircularFifoBuffer(IDS_SIZE); 061 062 protected final Timer workTimer; 063 064 protected final long stateTTL; 065 066 protected Work work; 067 068 public WorkComputation(String name) { 069 super(name, 1, 0); 070 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 071 workTimer = registry.timer(MetricRegistry.name("nuxeo", "works", name, "total")); 072 stateTTL = Long.parseLong( 073 Framework.getService(ConfigurationService.class).getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE)); 074 } 075 076 @Override 077 public void signalStop() { 078 if (work != null) { 079 work.setWorkInstanceSuspending(); 080 } 081 } 082 083 @Override 084 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 085 work = deserialize(record.getData()); 086 try { 087 if (work.isCoalescing() && WorkStateHelper.getLastOffset(work.getId()) > context.getLastOffset().offset()) { 088 log.debug("Skipping duplicate of coalescing work id: " + work.getId() + " " + work); 089 } else if (work.isIdempotent() && workIds.contains(work.getId())) { 090 log.debug("Skipping duplicate of idempotent work id: " + work.getId()); 091 } else { 092 boolean storeState = Framework.getService(ConfigurationService.class) 093 .isBooleanPropertyTrue(STORESTATE_KEY); 094 if (storeState) { 095 if (WorkStateHelper.getState(work.getId()) != Work.State.SCHEDULED) { 096 log.warn("work has been canceled, saving and returning"); 097 context.askForCheckpoint(); 098 return; 099 } 100 WorkStateHelper.setState(work.getId(), Work.State.RUNNING, stateTTL); 101 } 102 new WorkHolder(work).run(); 103 // if the same work id has not been scheduled again, set the state to null for 'completed' 104 if (storeState && WorkStateHelper.getState(work.getId()) == Work.State.RUNNING) { 105 WorkStateHelper.setState(work.getId(), null, stateTTL); 106 } 107 workIds.add(work.getId()); 108 } 109 work.cleanUp(true, null); 110 if (!work.isWorkInstanceSuspended()) { 111 context.askForCheckpoint(); 112 } 113 } catch (Exception e) { 114 if (ExceptionUtils.hasInterruptedCause(e)) { 115 Thread.currentThread().interrupt(); 116 // propagate the interruption to stop the computation thread 117 // thread has been interrupted we don't want to mark the work as completed. 118 log.warn( 119 String.format("Work id: %s title: %s, has been interrupted, it will be rescheduled, record: %s", 120 work.getId(), work.getTitle(), record)); 121 } else { 122 // Report an error on the work and continue 123 log.error(String.format("Work id: %s title: %s is in error, the work is skipped, record: %s", 124 work.getId(), work.getTitle(), record)); 125 context.askForCheckpoint(); 126 } 127 // Cleanup should take care of logging error, but better to dup this in debug 128 log.debug("Exception during work " + work.getId(), e); 129 work.cleanUp(false, e); 130 } finally { 131 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 132 work = null; 133 } 134 } 135 136 @SuppressWarnings("squid:S2093") 137 public static Work deserialize(byte[] data) { 138 // TODO: switch to commons-lang3 SerializationUtils 139 ByteArrayInputStream bis = new ByteArrayInputStream(data); 140 ObjectInput in = null; 141 try { 142 in = new ObjectInputStream(bis); 143 return (Work) in.readObject(); 144 } catch (IOException | ClassNotFoundException e) { 145 throw new RuntimeException(e); 146 } finally { 147 try { 148 if (in != null) { 149 in.close(); 150 } 151 } catch (IOException ex) { 152 // ignore close exception so we cannot use a try-with-resources squid:S2093 153 } 154 } 155 } 156 157 @SuppressWarnings("squid:S2093") 158 public static byte[] serialize(Work work) { 159 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 160 ObjectOutput out; 161 try { 162 out = new ObjectOutputStream(bos); 163 out.writeObject(work); 164 out.flush(); 165 return bos.toByteArray(); 166 } catch (IOException e) { 167 throw new RuntimeException(e); 168 } finally { 169 try { 170 bos.close(); 171 } catch (IOException ex) { 172 // ignore close exception so we cannot use a try-with-resources squid:S2093 173 } 174 } 175 } 176}