001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_DEFAULT_VALUE; 022import static org.nuxeo.ecm.core.work.StreamWorkManager.STATETTL_KEY; 023import static org.nuxeo.ecm.core.work.StreamWorkManager.STORESTATE_KEY; 024 025import java.io.ByteArrayInputStream; 026import java.io.ByteArrayOutputStream; 027import java.io.IOException; 028import java.io.ObjectInput; 029import java.io.ObjectInputStream; 030import java.io.ObjectOutput; 031import java.io.ObjectOutputStream; 032import java.util.concurrent.TimeUnit; 033 034import org.apache.commons.collections.buffer.CircularFifoBuffer; 035import org.apache.logging.log4j.LogManager; 036import org.apache.logging.log4j.Logger; 037import org.nuxeo.common.utils.ExceptionUtils; 038import org.nuxeo.ecm.core.work.api.Work; 039import org.nuxeo.lib.stream.computation.AbstractComputation; 040import org.nuxeo.lib.stream.computation.ComputationContext; 041import org.nuxeo.lib.stream.computation.Record; 042import org.nuxeo.lib.stream.log.Name; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.metrics.MetricsService; 045import org.nuxeo.runtime.services.config.ConfigurationService; 046 047import io.dropwizard.metrics5.MetricName; 048import io.dropwizard.metrics5.MetricRegistry; 049import io.dropwizard.metrics5.SharedMetricRegistries; 050import io.dropwizard.metrics5.Timer; 051 052/** 053 * A Stream computation that consumes works. 054 * 055 * @since 9.3 056 */ 057public class WorkComputation extends AbstractComputation { 058 private static final Logger log = LogManager.getLogger(WorkComputation.class); 059 060 protected static final int IDS_SIZE = 50; 061 062 protected final CircularFifoBuffer workIds = new CircularFifoBuffer(IDS_SIZE); 063 064 protected final Timer workTimer; 065 066 protected final long stateTTL; 067 068 protected Work work; 069 070 public WorkComputation(String name) { 071 super(name, 1, 0); 072 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 073 workTimer = registry.timer( 074 MetricName.build("nuxeo.works.queue.timer").tagged("queue", Name.ofUrn(name).getName())); 075 stateTTL = Framework.getService(ConfigurationService.class).getLong(STATETTL_KEY, STATETTL_DEFAULT_VALUE); 076 } 077 078 @Override 079 public void signalStop() { 080 if (work != null) { 081 work.setWorkInstanceSuspending(); 082 } 083 } 084 085 @Override 086 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 087 work = deserialize(record.getData()); 088 try { 089 if (work.isCoalescing() && WorkStateHelper.getLastOffset(work.getId()) > context.getLastOffset().offset()) { 090 log.debug("Skipping duplicate of coalescing work id: " + work.getId() + " " + work); 091 } else if (work.isIdempotent() && workIds.contains(work.getId())) { 092 log.debug("Skipping duplicate of idempotent work id: " + work.getId()); 093 } else { 094 boolean storeState = Framework.getService(ConfigurationService.class).isBooleanTrue(STORESTATE_KEY); 095 if (storeState) { 096 if (WorkStateHelper.getState(work.getId()) != Work.State.SCHEDULED) { 097 // try to avoid a race condition where state is not yet written in the kv 098 Thread.sleep(200); 099 if (WorkStateHelper.getState(work.getId()) != Work.State.SCHEDULED) { 100 log.warn("work has been canceled, saving and returning"); 101 context.askForCheckpoint(); 102 return; 103 } else { 104 log.warn("Race condition avoided on " + work.getId()); 105 } 106 } 107 WorkStateHelper.setState(work.getId(), Work.State.RUNNING, stateTTL); 108 } 109 // The running state is needed to activate the DLQ mechanism 110 work.setWorkInstanceState(Work.State.RUNNING); 111 new WorkHolder(work).run(); 112 // if the same work id has not been scheduled again, set the state to null for 'completed' 113 if (storeState && WorkStateHelper.getState(work.getId()) == Work.State.RUNNING) { 114 WorkStateHelper.setState(work.getId(), null, stateTTL); 115 } 116 workIds.add(work.getId()); 117 } 118 work.cleanUp(true, null); 119 if (!work.isWorkInstanceSuspended()) { 120 context.askForCheckpoint(); 121 } 122 } catch (Exception e) { 123 if (Thread.currentThread().isInterrupted() || ExceptionUtils.hasInterruptedCause(e)) { 124 Thread.currentThread().interrupt(); 125 // propagate the interruption to stop the computation thread 126 // thread has been interrupted we don't want to mark the work as completed. 127 log.warn( 128 String.format("Work id: %s title: %s, has been interrupted the work thread is terminating, it will be rescheduled, record: %s", 129 work.getId(), work.getTitle(), record), e); 130 } else { 131 // Report an error on the work and continue 132 log.error(String.format( 133 "Skip Work in failure: id: %s, title: %s, offset: %s, record: %s, thread: %s", work.getId(), 134 work.getTitle(), context.getLastOffset(), record, Thread.currentThread().getName()), e); 135 context.askForCheckpoint(); 136 } 137 // Cleanup should take care of logging error except if exception comes from the cleanup 138 log.debug("Exception during work " + work.getId(), e); 139 // Try to cleanup after an exception, if exception comes from the previous cleanup it is a duplicate cleanup 140 cleanupWorkInFailure(work, e); 141 } finally { 142 workTimer.update(work.getCompletionTime() - work.getStartTime(), TimeUnit.MILLISECONDS); 143 work = null; 144 } 145 } 146 147 protected void cleanupWorkInFailure(Work work, Exception exception) { 148 try { 149 work.cleanUp(false, exception); 150 } catch (Exception e) { 151 log.error("Error during cleanup work: " + work.getId(), e); 152 } 153 } 154 155 @SuppressWarnings("squid:S2093") 156 public static Work deserialize(byte[] data) { 157 // TODO: switch to commons-lang3 SerializationUtils 158 ByteArrayInputStream bis = new ByteArrayInputStream(data); 159 ObjectInput in = null; 160 try { 161 in = new ObjectInputStream(bis); 162 return (Work) in.readObject(); 163 } catch (IOException | ClassNotFoundException e) { 164 throw new RuntimeException(e); 165 } finally { 166 try { 167 if (in != null) { 168 in.close(); 169 } 170 } catch (IOException ex) { 171 // ignore close exception so we cannot use a try-with-resources squid:S2093 172 } 173 } 174 } 175 176 @SuppressWarnings("squid:S2093") 177 public static byte[] serialize(Work work) { 178 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 179 ObjectOutput out; 180 try { 181 out = new ObjectOutputStream(bos); 182 out.writeObject(work); 183 out.flush(); 184 return bos.toByteArray(); 185 } catch (IOException e) { 186 throw new RuntimeException(e); 187 } finally { 188 try { 189 bos.close(); 190 } catch (IOException ex) { 191 // ignore close exception so we cannot use a try-with-resources squid:S2093 192 } 193 } 194 } 195}