001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.internals;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Objects;
027
028import org.nuxeo.lib.stream.computation.ComputationContext;
029import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
030import org.nuxeo.lib.stream.computation.ComputationPolicy;
031import org.nuxeo.lib.stream.computation.Record;
032import org.nuxeo.lib.stream.log.LogAppender;
033import org.nuxeo.lib.stream.log.LogManager;
034import org.nuxeo.lib.stream.log.LogOffset;
035
036/**
037 * @since 9.3
038 */
039public class ComputationContextImpl implements ComputationContext {
040    protected final ComputationMetadataMapping metadata;
041
042    protected final Map<String, List<Record>> streamRecords;
043
044    protected final Map<String, Long> timers;
045
046    protected final LogManager manager;
047
048    protected final ComputationPolicy policy;
049
050    protected boolean checkpointFlag;
051
052    protected long lowWatermark;
053
054    protected boolean terminateFlag;
055
056    protected LogOffset lastOffset;
057
058    public ComputationContextImpl(LogManager logManager, ComputationMetadataMapping metadata,
059            ComputationPolicy policy) {
060        this.manager = logManager;
061        this.metadata = metadata;
062        this.timers = new HashMap<>();
063        this.streamRecords = new HashMap<>();
064        this.policy = policy;
065    }
066
067    public ComputationContextImpl(LogManager logManager, ComputationMetadataMapping metadata) {
068        this(logManager, metadata, ComputationPolicy.NONE);
069    }
070
071    public ComputationContextImpl(ComputationMetadataMapping computationMetadataMapping) {
072        this(null, computationMetadataMapping);
073    }
074
075    public List<Record> getRecords(String streamName) {
076        return streamRecords.getOrDefault(streamName, Collections.emptyList());
077    }
078
079    public Map<String, Long> getTimers() {
080        return timers;
081    }
082
083    @Override
084    public void setTimer(String key, long time) {
085        Objects.requireNonNull(key);
086        timers.put(key, time);
087    }
088
089    public void removeTimer(String key) {
090        Objects.requireNonNull(key);
091        timers.remove(key);
092    }
093
094    @Override
095    public void produceRecord(String streamName, Record record) {
096        String targetStream = metadata.map(streamName);
097        if (!metadata.outputStreams().contains(targetStream)) {
098            throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName);
099        }
100        streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record);
101    }
102
103    /**
104     * Writes to an output stream immediately. This will creates systematically duplicates on errors, always use
105     * {@link #produceRecord(String, Record)} when possible.
106     */
107    public LogOffset produceRecordImmediate(String streamName, Record record) {
108        if (manager == null) {
109            throw new IllegalStateException("No logManager provided in context");
110        }
111        String targetStream = metadata.map(streamName);
112        if (!metadata.outputStreams().contains(targetStream)) {
113            throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName);
114        }
115        LogAppender<Record> appender = manager.getAppender(targetStream);
116        return appender.append(record.getKey(), record);
117    }
118
119    public void produceRecordImmediate(String streamName, String key, byte[] data) {
120        produceRecordImmediate(streamName, Record.of(key, data));
121    }
122
123    @Override
124    public LogOffset getLastOffset() {
125        return lastOffset;
126    }
127
128    @Override
129    public ComputationPolicy getPolicy() {
130        return policy;
131    }
132
133    public void setLastOffset(LogOffset lastOffset) {
134        this.lastOffset = lastOffset;
135    }
136
137    public long getSourceLowWatermark() {
138        return lowWatermark;
139    }
140
141    @Override
142    public void setSourceLowWatermark(long watermark) {
143        this.lowWatermark = watermark;
144    }
145
146    public boolean requireCheckpoint() {
147        return checkpointFlag;
148    }
149
150    public void removeCheckpointFlag() {
151        checkpointFlag = false;
152    }
153
154    @Override
155    public void askForCheckpoint() {
156        checkpointFlag = true;
157    }
158
159    @Override
160    public void cancelAskForCheckpoint() {
161        checkpointFlag = false;
162    }
163
164    @Override
165    public void askForTermination() {
166        terminateFlag = true;
167    }
168
169    public boolean requireTerminate() {
170        return terminateFlag;
171    }
172}