001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.internals;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Objects;
027
028import org.nuxeo.lib.stream.computation.ComputationContext;
029import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
030import org.nuxeo.lib.stream.computation.Record;
031
032/**
033 * @since 9.3
034 */
035public class ComputationContextImpl implements ComputationContext {
036    protected final ComputationMetadataMapping metadata;
037
038    protected final Map<String, List<Record>> streamRecords;
039
040    protected final Map<String, Long> timers;
041
042    protected boolean checkpointFlag;
043
044    protected long lowWatermark;
045
046    public ComputationContextImpl(ComputationMetadataMapping metadata) {
047        this.metadata = metadata;
048        this.timers = new HashMap<>();
049        this.streamRecords = new HashMap<>();
050    }
051
052    public List<Record> getRecords(String streamName) {
053        return streamRecords.getOrDefault(streamName, Collections.emptyList());
054    }
055
056    public Map<String, Long> getTimers() {
057        return timers;
058    }
059
060    @Override
061    public void setTimer(String key, long time) {
062        Objects.requireNonNull(key);
063        timers.put(key, time);
064    }
065
066    public void removeTimer(String key) {
067        Objects.requireNonNull(key);
068        timers.remove(key);
069    }
070
071    @Override
072    public void produceRecord(String streamName, Record record) {
073        String targetStream = metadata.map(streamName);
074        if (!metadata.outputStreams().contains(targetStream)) {
075            throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName);
076        }
077        streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record);
078    }
079
080    public long getSourceLowWatermark() {
081        return lowWatermark;
082    }
083
084    @Override
085    public void setSourceLowWatermark(long watermark) {
086        this.lowWatermark = watermark;
087    }
088
089    public boolean requireCheckpoint() {
090        return checkpointFlag;
091    }
092
093    public void removeCheckpointFlag() {
094        checkpointFlag = false;
095    }
096
097    @Override
098    public void askForCheckpoint() {
099        checkpointFlag = true;
100    }
101
102}