001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.internals; 020 021import io.opencensus.trace.Tracing; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Objects; 029 030import org.nuxeo.lib.stream.computation.ComputationContext; 031import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 032import org.nuxeo.lib.stream.computation.ComputationPolicy; 033import org.nuxeo.lib.stream.computation.Record; 034import org.nuxeo.lib.stream.computation.StreamManager; 035import org.nuxeo.lib.stream.log.LogOffset; 036 037/** 038 * @since 9.3 039 */ 040public class ComputationContextImpl implements ComputationContext { 041 protected final ComputationMetadataMapping metadata; 042 043 protected final Map<String, List<Record>> streamRecords; 044 045 protected final Map<String, Long> timers; 046 047 protected final StreamManager manager; 048 049 protected final ComputationPolicy policy; 050 051 protected final boolean isSpare; 052 053 protected boolean checkpointFlag; 054 055 protected long lowWatermark; 056 057 protected boolean terminateFlag; 058 059 protected LogOffset lastOffset; 060 061 public ComputationContextImpl(StreamManager streamManager, ComputationMetadataMapping metadata, 062 ComputationPolicy policy, boolean isSpare) { 063 this.manager = streamManager; 064 this.metadata = metadata; 065 this.timers = new HashMap<>(); 066 this.streamRecords = new HashMap<>(); 067 this.policy = policy; 068 this.isSpare = isSpare; 069 } 070 071 public ComputationContextImpl(StreamManager streamManager, ComputationMetadataMapping metadata, 072 ComputationPolicy policy) { 073 this(streamManager, metadata, policy, false); 074 } 075 076 public ComputationContextImpl(ComputationMetadataMapping computationMetadataMapping) { 077 this(null, computationMetadataMapping, ComputationPolicy.NONE, false); 078 } 079 080 public List<Record> getRecords(String streamName) { 081 return streamRecords.getOrDefault(streamName, Collections.emptyList()); 082 } 083 084 public Map<String, Long> getTimers() { 085 return timers; 086 } 087 088 @Override 089 public void setTimer(String key, long time) { 090 Objects.requireNonNull(key); 091 timers.put(key, time); 092 } 093 094 public void removeTimer(String key) { 095 Objects.requireNonNull(key); 096 timers.remove(key); 097 } 098 099 @Override 100 public void produceRecord(String streamName, Record record) { 101 String targetStream = metadata.map(streamName); 102 if (!metadata.outputStreams().contains(targetStream)) { 103 throw new IllegalArgumentException("Stream: " + targetStream + " not registered as output of: " + metadata); 104 } 105 streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record); 106 } 107 108 /** 109 * Writes to an output stream immediately. This will creates systematically duplicates on errors, always use 110 * {@link #produceRecord(String, Record)} when possible. 111 */ 112 public LogOffset produceRecordImmediate(String streamName, Record record) { 113 Tracing.getTracer().getCurrentSpan().addAnnotation("Produce record immediate " + record.getKey()); 114 if (manager == null) { 115 throw new IllegalStateException("No logManager provided in context"); 116 } 117 String targetStream = metadata.map(streamName); 118 if (!metadata.outputStreams().contains(targetStream)) { 119 throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName); 120 } 121 return manager.append(targetStream, record); 122 } 123 124 public void produceRecordImmediate(String streamName, String key, byte[] data) { 125 produceRecordImmediate(streamName, Record.of(key, data)); 126 } 127 128 @Override 129 public LogOffset getLastOffset() { 130 return lastOffset; 131 } 132 133 @Override 134 public ComputationPolicy getPolicy() { 135 return policy; 136 } 137 138 @Override 139 public boolean isSpareComputation() { 140 return isSpare; 141 } 142 143 public void setLastOffset(LogOffset lastOffset) { 144 this.lastOffset = lastOffset; 145 } 146 147 public long getSourceLowWatermark() { 148 return lowWatermark; 149 } 150 151 @Override 152 public void setSourceLowWatermark(long watermark) { 153 this.lowWatermark = watermark; 154 } 155 156 public boolean requireCheckpoint() { 157 return checkpointFlag; 158 } 159 160 public void removeCheckpointFlag() { 161 checkpointFlag = false; 162 } 163 164 @Override 165 public void askForCheckpoint() { 166 checkpointFlag = true; 167 } 168 169 @Override 170 public void cancelAskForCheckpoint() { 171 checkpointFlag = false; 172 } 173 174 @Override 175 public void askForTermination() { 176 terminateFlag = true; 177 } 178 179 public boolean requireTerminate() { 180 return terminateFlag; 181 } 182 183 @Override 184 public String toString() { 185 return "ComputationContextImpl{" + 186 "metadata=" + metadata + 187 ", streamRecords=" + streamRecords + 188 ", timers=" + timers + 189 ", manager=" + manager + 190 ", policy=" + policy + 191 ", isSpare=" + isSpare + 192 ", checkpointFlag=" + checkpointFlag + 193 ", lowWatermark=" + lowWatermark + 194 ", terminateFlag=" + terminateFlag + 195 ", lastOffset=" + lastOffset + 196 '}'; 197 } 198}