001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.internals; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Objects; 027 028import org.nuxeo.lib.stream.computation.ComputationContext; 029import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 030import org.nuxeo.lib.stream.computation.ComputationPolicy; 031import org.nuxeo.lib.stream.computation.Record; 032import org.nuxeo.lib.stream.log.LogAppender; 033import org.nuxeo.lib.stream.log.LogManager; 034import org.nuxeo.lib.stream.log.LogOffset; 035 036/** 037 * @since 9.3 038 */ 039public class ComputationContextImpl implements ComputationContext { 040 protected final ComputationMetadataMapping metadata; 041 042 protected final Map<String, List<Record>> streamRecords; 043 044 protected final Map<String, Long> timers; 045 046 protected final LogManager manager; 047 048 protected final ComputationPolicy policy; 049 050 protected boolean checkpointFlag; 051 052 protected long lowWatermark; 053 054 protected boolean terminateFlag; 055 056 protected LogOffset lastOffset; 057 058 public ComputationContextImpl(LogManager logManager, ComputationMetadataMapping metadata, 059 ComputationPolicy policy) { 060 this.manager = logManager; 061 this.metadata = metadata; 062 this.timers = new HashMap<>(); 063 this.streamRecords = new HashMap<>(); 064 this.policy = policy; 065 } 066 067 public ComputationContextImpl(LogManager logManager, ComputationMetadataMapping metadata) { 068 this(logManager, metadata, ComputationPolicy.NONE); 069 } 070 071 public ComputationContextImpl(ComputationMetadataMapping computationMetadataMapping) { 072 this(null, computationMetadataMapping); 073 } 074 075 public List<Record> getRecords(String streamName) { 076 return streamRecords.getOrDefault(streamName, Collections.emptyList()); 077 } 078 079 public Map<String, Long> getTimers() { 080 return timers; 081 } 082 083 @Override 084 public void setTimer(String key, long time) { 085 Objects.requireNonNull(key); 086 timers.put(key, time); 087 } 088 089 public void removeTimer(String key) { 090 Objects.requireNonNull(key); 091 timers.remove(key); 092 } 093 094 @Override 095 public void produceRecord(String streamName, Record record) { 096 String targetStream = metadata.map(streamName); 097 if (!metadata.outputStreams().contains(targetStream)) { 098 throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName); 099 } 100 streamRecords.computeIfAbsent(targetStream, key -> new ArrayList<>()).add(record); 101 } 102 103 /** 104 * Writes to an output stream immediately. This will creates systematically duplicates on errors, always use 105 * {@link #produceRecord(String, Record)} when possible. 106 */ 107 public LogOffset produceRecordImmediate(String streamName, Record record) { 108 if (manager == null) { 109 throw new IllegalStateException("No logManager provided in context"); 110 } 111 String targetStream = metadata.map(streamName); 112 if (!metadata.outputStreams().contains(targetStream)) { 113 throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName); 114 } 115 LogAppender<Record> appender = manager.getAppender(targetStream); 116 return appender.append(record.getKey(), record); 117 } 118 119 public void produceRecordImmediate(String streamName, String key, byte[] data) { 120 produceRecordImmediate(streamName, Record.of(key, data)); 121 } 122 123 @Override 124 public LogOffset getLastOffset() { 125 return lastOffset; 126 } 127 128 @Override 129 public ComputationPolicy getPolicy() { 130 return policy; 131 } 132 133 public void setLastOffset(LogOffset lastOffset) { 134 this.lastOffset = lastOffset; 135 } 136 137 public long getSourceLowWatermark() { 138 return lowWatermark; 139 } 140 141 @Override 142 public void setSourceLowWatermark(long watermark) { 143 this.lowWatermark = watermark; 144 } 145 146 public boolean requireCheckpoint() { 147 return checkpointFlag; 148 } 149 150 public void removeCheckpointFlag() { 151 checkpointFlag = false; 152 } 153 154 @Override 155 public void askForCheckpoint() { 156 checkpointFlag = true; 157 } 158 159 @Override 160 public void cancelAskForCheckpoint() { 161 checkpointFlag = false; 162 } 163 164 @Override 165 public void askForTermination() { 166 terminateFlag = true; 167 } 168 169 public boolean requireTerminate() { 170 return terminateFlag; 171 } 172}