001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation.internals; 020 021import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationContext; 022import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping; 023import org.nuxeo.ecm.platform.importer.mqueues.computation.Record; 024 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Objects; 031 032/** 033 * @since 9.2 034 */ 035public class ComputationContextImpl implements ComputationContext { 036 private final ComputationMetadataMapping metadata; 037 private final Map<String, List<Record>> streamRecords; 038 private final Map<String, Long> timers; 039 private boolean checkpointFlag = false; 040 private long lowWatermark; 041 042 public ComputationContextImpl(ComputationMetadataMapping metadata) { 043 this.metadata = metadata; 044 this.timers = new HashMap<>(); 045 this.streamRecords = new HashMap<>(); 046 } 047 048 public List<Record> getRecords(String streamName) { 049 return streamRecords.getOrDefault(streamName, Collections.emptyList()); 050 } 051 052 public Map<String, Long> getTimers() { 053 return timers; 054 } 055 056 @Override 057 public void setState(String key, byte[] binaryValue) { 058 throw new UnsupportedOperationException("setState is not yet implemented"); 059 } 060 061 @Override 062 public byte[] getState(String key) { 063 throw new UnsupportedOperationException("getState is not yet implemented"); 064 } 065 066 @Override 067 public void setTimer(String key, long time) { 068 Objects.requireNonNull(key); 069 timers.put(key, time); 070 } 071 072 public void removeTimer(String key) { 073 Objects.requireNonNull(key); 074 timers.remove(key); 075 } 076 077 @Override 078 public void produceRecord(String streamName, String key, byte[] data) { 079 produceRecord(streamName, Record.of(key, data)); 080 } 081 082 @Override 083 public void produceRecord(String streamName, Record record) { 084 String targetStream = metadata.map(streamName); 085 if (!metadata.outputStreams().contains(targetStream)) { 086 throw new IllegalArgumentException("Stream not registered as output: " + targetStream + ":" + streamName); 087 } 088 List<Record> records = streamRecords.getOrDefault(targetStream, new ArrayList<>()); 089 records.add(record); 090 streamRecords.putIfAbsent(targetStream, records); 091 } 092 093 @Override 094 public void setSourceLowWatermark(long watermark) { 095 this.lowWatermark = watermark; 096 } 097 098 public long getSourceLowWatermark() { 099 return lowWatermark; 100 } 101 102 public boolean requireCheckpoint() { 103 return checkpointFlag; 104 } 105 106 public void removeCheckpointFlag() { 107 checkpointFlag = false; 108 } 109 110 @Override 111 public void askForCheckpoint() { 112 checkpointFlag = true; 113 } 114 115} 116