001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.internals;
020
021import io.opencensus.trace.Tracing;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Objects;
029
030import org.nuxeo.lib.stream.computation.ComputationContext;
031import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
032import org.nuxeo.lib.stream.computation.ComputationPolicy;
033import org.nuxeo.lib.stream.computation.Record;
034import org.nuxeo.lib.stream.computation.StreamManager;
035import org.nuxeo.lib.stream.log.LogOffset;
036
037/**
038 * @since 9.3
039 */
040public class ComputationContextImpl implements ComputationContext {
041    protected final ComputationMetadataMapping metadata;
042
043    protected final Map<String, List<Record>> streamRecords;
044
045    protected final Map<String, Long> timers;
046
047    protected final StreamManager manager;
048
049    protected final ComputationPolicy policy;
050
051    protected final boolean isSpare;
052
053    protected boolean checkpointFlag;
054
055    protected long lowWatermark;
056
057    protected boolean terminateFlag;
058
059    protected LogOffset lastOffset;
060
061    public ComputationContextImpl(StreamManager streamManager, ComputationMetadataMapping metadata,
062            ComputationPolicy policy, boolean isSpare) {
063        this.manager = streamManager;
064        this.metadata = metadata;
065        this.timers = new HashMap<>();
066        this.streamRecords = new HashMap<>();
067        this.policy = policy;
068        this.isSpare = isSpare;
069    }
070
071    public ComputationContextImpl(StreamManager streamManager, ComputationMetadataMapping metadata,
072            ComputationPolicy policy) {
073        this(streamManager, metadata, policy, false);
074    }
075
076    public ComputationContextImpl(ComputationMetadataMapping computationMetadataMapping) {
077        this(null, computationMetadataMapping, ComputationPolicy.NONE, false);
078    }
079
080    public List<Record> getRecords(String streamName) {
081        return streamRecords.getOrDefault(streamName, Collections.emptyList());
082    }
083
084    public Map<String, Long> getTimers() {
085        return timers;
086    }
087
088    @Override
089    public void setTimer(String key, long time) {
090        Objects.requireNonNull(key);
091        timers.put(key, time);
092    }
093
094    public void removeTimer(String key) {
095        Objects.requireNonNull(key);
096        timers.remove(key);
097    }
098
099    @Override
100    public void produceRecord(String streamName, Record record) {
101        String targetStream = metadata.map(streamName);
102        if (!metadata.outputStreams().contains(targetStream)) {
103            throw new IllegalArgumentException("Stream: " + targetStream + " not registered as output of: " + metadata);
104        }
105        streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record);
106    }
107
108    /**
109     * Writes to an output stream immediately. This will creates systematically duplicates on errors, always use
110     * {@link #produceRecord(String, Record)} when possible.
111     */
112    public LogOffset produceRecordImmediate(String streamName, Record record) {
113        Tracing.getTracer().getCurrentSpan().addAnnotation("Produce record immediate " + record.getKey());
114        if (manager == null) {
115            throw new IllegalStateException("No logManager provided in context");
116        }
117        String targetStream = metadata.map(streamName);
118        if (!metadata.outputStreams().contains(targetStream)) {
119            throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName);
120        }
121        return manager.append(targetStream, record);
122    }
123
124    public void produceRecordImmediate(String streamName, String key, byte[] data) {
125        produceRecordImmediate(streamName, Record.of(key, data));
126    }
127
128    @Override
129    public LogOffset getLastOffset() {
130        return lastOffset;
131    }
132
133    @Override
134    public ComputationPolicy getPolicy() {
135        return policy;
136    }
137
138    @Override
139    public boolean isSpareComputation() {
140        return isSpare;
141    }
142
143    public void setLastOffset(LogOffset lastOffset) {
144        this.lastOffset = lastOffset;
145    }
146
147    public long getSourceLowWatermark() {
148        return lowWatermark;
149    }
150
151    @Override
152    public void setSourceLowWatermark(long watermark) {
153        this.lowWatermark = watermark;
154    }
155
156    public boolean requireCheckpoint() {
157        return checkpointFlag;
158    }
159
160    public void removeCheckpointFlag() {
161        checkpointFlag = false;
162    }
163
164    @Override
165    public void askForCheckpoint() {
166        checkpointFlag = true;
167    }
168
169    @Override
170    public void cancelAskForCheckpoint() {
171        checkpointFlag = false;
172    }
173
174    @Override
175    public void askForTermination() {
176        terminateFlag = true;
177    }
178
179    public boolean requireTerminate() {
180        return terminateFlag;
181    }
182
183    @Override
184    public String toString() {
185        return "ComputationContextImpl{" +
186                "metadata=" + metadata +
187                ", streamRecords=" + streamRecords +
188                ", timers=" + timers +
189                ", manager=" + manager +
190                ", policy=" + policy +
191                ", isSpare=" + isSpare +
192                ", checkpointFlag=" + checkpointFlag +
193                ", lowWatermark=" + lowWatermark +
194                ", terminateFlag=" + terminateFlag +
195                ", lastOffset=" + lastOffset +
196                '}';
197    }
198}