001/* 002 * (C) Copyright 2019 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.internals; 020 021import java.util.ArrayDeque; 022import java.util.Deque; 023import java.util.Iterator; 024import java.util.Objects; 025 026import org.nuxeo.lib.stream.computation.Record; 027import org.nuxeo.lib.stream.computation.RecordFilter; 028import org.nuxeo.lib.stream.computation.RecordFilterChain; 029import org.nuxeo.lib.stream.log.LogOffset; 030 031/** 032 * Chains multiple record filters. 033 * 034 * @since 11.1 035 */ 036public class RecordFilterChainImpl implements RecordFilterChain { 037 038 public static final RecordFilterChain NONE = new NoFilterChain(); 039 040 protected final Deque<RecordFilter> filters = new ArrayDeque<>(); 041 042 @Override 043 public RecordFilterChain addFilter(RecordFilter filter) { 044 Objects.requireNonNull(filter); 045 filters.add(filter); 046 return this; 047 } 048 049 @Override 050 public Record beforeAppend(Record record) { 051 for (Iterator<RecordFilter> iterator = filters.iterator(); record != null && iterator.hasNext();) { 052 RecordFilter filter = iterator.next(); 053 record = filter.beforeAppend(record); 054 } 055 return record; 056 } 057 058 @Override 059 public void afterAppend(Record record, LogOffset offset) { 060 for (Iterator<RecordFilter> iterator = filters.iterator(); record != null && iterator.hasNext();) { 061 RecordFilter filter = iterator.next(); 062 filter.afterAppend(record, offset); 063 } 064 } 065 066 @Override 067 public Record afterRead(Record record, LogOffset offset) { 068 for (Iterator<RecordFilter> iterator = filters.descendingIterator(); record != null && iterator.hasNext();) { 069 RecordFilter filter = iterator.next(); 070 record = filter.afterRead(record, offset); 071 } 072 return record; 073 } 074}