001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation.internals;
020
021import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationContext;
022import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping;
023import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
024
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031
032/**
033 * @since 9.2
034 */
035public class ComputationContextImpl implements ComputationContext {
036    private final ComputationMetadataMapping metadata;
037    private final Map<String, List<Record>> streamRecords;
038    private final Map<String, Long> timers;
039    private boolean checkpointFlag = false;
040    private long lowWatermark;
041
042    public ComputationContextImpl(ComputationMetadataMapping metadata) {
043        this.metadata = metadata;
044        this.timers = new HashMap<>();
045        this.streamRecords = new HashMap<>();
046    }
047
048    public List<Record> getRecords(String streamName) {
049        return streamRecords.getOrDefault(streamName, Collections.emptyList());
050    }
051
052    public Map<String, Long> getTimers() {
053        return timers;
054    }
055
056    @Override
057    public void setState(String key, byte[] binaryValue) {
058        throw new UnsupportedOperationException("setState is not yet implemented");
059    }
060
061    @Override
062    public byte[] getState(String key) {
063        throw new UnsupportedOperationException("getState is not yet implemented");
064    }
065
066    @Override
067    public void setTimer(String key, long time) {
068        Objects.requireNonNull(key);
069        timers.put(key, time);
070    }
071
072    public void removeTimer(String key) {
073        Objects.requireNonNull(key);
074        timers.remove(key);
075    }
076
077    @Override
078    public void produceRecord(String streamName, String key, byte[] data) {
079        produceRecord(streamName, Record.of(key, data));
080    }
081
082    @Override
083    public void produceRecord(String streamName, Record record) {
084        String targetStream = metadata.map(streamName);
085        if (!metadata.outputStreams().contains(targetStream)) {
086            throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName);
087        }
088        List<Record> records = streamRecords.getOrDefault(targetStream, new ArrayList<>());
089        records.add(record);
090        streamRecords.putIfAbsent(targetStream, records);
091    }
092
093    @Override
094    public void setSourceLowWatermark(long watermark) {
095        this.lowWatermark = watermark;
096    }
097
098    public long getSourceLowWatermark() {
099        return lowWatermark;
100    }
101
102    public boolean requireCheckpoint() {
103        return checkpointFlag;
104    }
105
106    public void removeCheckpointFlag() {
107        checkpointFlag = false;
108    }
109
110    @Override
111    public void askForCheckpoint() {
112        checkpointFlag = true;
113    }
114
115}
116