001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.internals; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Objects; 027 028import org.nuxeo.lib.stream.computation.ComputationContext; 029import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 030import org.nuxeo.lib.stream.computation.Record; 031 032/** 033 * @since 9.3 034 */ 035public class ComputationContextImpl implements ComputationContext { 036 protected final ComputationMetadataMapping metadata; 037 038 protected final Map<String, List<Record>> streamRecords; 039 040 protected final Map<String, Long> timers; 041 042 protected boolean checkpointFlag; 043 044 protected long lowWatermark; 045 046 protected boolean terminateFlag; 047 048 public ComputationContextImpl(ComputationMetadataMapping metadata) { 049 this.metadata = metadata; 050 this.timers = new HashMap<>(); 051 this.streamRecords = new HashMap<>(); 052 } 053 054 public List<Record> getRecords(String streamName) { 055 return streamRecords.getOrDefault(streamName, Collections.emptyList()); 056 } 057 058 public Map<String, Long> getTimers() { 059 return timers; 060 } 061 062 @Override 063 public void setTimer(String key, long time) { 064 Objects.requireNonNull(key); 065 timers.put(key, time); 066 } 067 068 public void removeTimer(String key) { 069 Objects.requireNonNull(key); 070 timers.remove(key); 071 } 072 073 @Override 074 public void produceRecord(String streamName, Record record) { 075 String targetStream = metadata.map(streamName); 076 if (!metadata.outputStreams().contains(targetStream)) { 077 throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName); 078 } 079 streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record); 080 } 081 082 public long getSourceLowWatermark() { 083 return lowWatermark; 084 } 085 086 @Override 087 public void setSourceLowWatermark(long watermark) { 088 this.lowWatermark = watermark; 089 } 090 091 public boolean requireCheckpoint() { 092 return checkpointFlag; 093 } 094 095 public void removeCheckpointFlag() { 096 checkpointFlag = false; 097 } 098 099 @Override 100 public void askForCheckpoint() { 101 checkpointFlag = true; 102 } 103 104 @Override 105 public void askForTermination() { 106 terminateFlag = true; 107 } 108 109 public boolean requireTerminate() { 110 return terminateFlag; 111 } 112 113}