001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.internals;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Objects;
027
028import org.nuxeo.lib.stream.computation.ComputationContext;
029import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
030import org.nuxeo.lib.stream.computation.Record;
031
032/**
033 * @since 9.3
034 */
035public class ComputationContextImpl implements ComputationContext {
036    protected final ComputationMetadataMapping metadata;
037
038    protected final Map<String, List<Record>> streamRecords;
039
040    protected final Map<String, Long> timers;
041
042    protected boolean checkpointFlag;
043
044    protected long lowWatermark;
045
046    protected boolean terminateFlag;
047
048    public ComputationContextImpl(ComputationMetadataMapping metadata) {
049        this.metadata = metadata;
050        this.timers = new HashMap<>();
051        this.streamRecords = new HashMap<>();
052    }
053
054    public List<Record> getRecords(String streamName) {
055        return streamRecords.getOrDefault(streamName, Collections.emptyList());
056    }
057
058    public Map<String, Long> getTimers() {
059        return timers;
060    }
061
062    @Override
063    public void setTimer(String key, long time) {
064        Objects.requireNonNull(key);
065        timers.put(key, time);
066    }
067
068    public void removeTimer(String key) {
069        Objects.requireNonNull(key);
070        timers.remove(key);
071    }
072
073    @Override
074    public void produceRecord(String streamName, Record record) {
075        String targetStream = metadata.map(streamName);
076        if (!metadata.outputStreams().contains(targetStream)) {
077            throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName);
078        }
079        streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record);
080    }
081
082    public long getSourceLowWatermark() {
083        return lowWatermark;
084    }
085
086    @Override
087    public void setSourceLowWatermark(long watermark) {
088        this.lowWatermark = watermark;
089    }
090
091    public boolean requireCheckpoint() {
092        return checkpointFlag;
093    }
094
095    public void removeCheckpointFlag() {
096        checkpointFlag = false;
097    }
098
099    @Override
100    public void askForCheckpoint() {
101        checkpointFlag = true;
102    }
103
104    @Override
105    public void askForTermination() {
106        terminateFlag = true;
107    }
108
109    public boolean requireTerminate() {
110        return terminateFlag;
111    }
112
113}