001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.log.internals; 020 021import java.io.Externalizable; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Map; 025import java.util.Objects; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028 029import org.nuxeo.lib.stream.log.LogAppender; 030import org.nuxeo.lib.stream.log.LogManager; 031import org.nuxeo.lib.stream.log.LogPartition; 032import org.nuxeo.lib.stream.log.LogTailer; 033import org.nuxeo.lib.stream.log.RebalanceListener; 034 035public abstract class AbstractLogManager implements LogManager { 036 protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap<>(); 037 038 protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>(); 039 040 // this define a concurrent set of tailers 041 protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<LogTailer, Boolean>()); 042 043 protected abstract void create(String name, int size); 044 045 protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String name); 046 047 protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, 048 String group); 049 050 protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, 051 RebalanceListener listener); 052 053 @Override 054 public synchronized boolean createIfNotExists(String name, int size) { 055 if (!exists(name)) { 056 create(name, size); 057 return true; 058 } 059 return false; 060 } 061 062 @Override 063 public boolean delete(String name) { 064 return false; 065 } 066 067 @Override 068 public <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) { 069 partitions.forEach(partition -> checkInvalidAssignment(group, partition)); 070 LogTailer<M> ret = doCreateTailer(partitions, group); 071 partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret)); 072 tailers.add(ret); 073 return ret; 074 } 075 076 @Override 077 public boolean supportSubscribe() { 078 return false; 079 } 080 081 @Override 082 public <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names, 083 RebalanceListener listener) { 084 LogTailer<M> ret = doSubscribe(group, names, listener); 085 tailers.add(ret); 086 return ret; 087 } 088 089 protected void checkInvalidAssignment(String group, LogPartition partition) { 090 LogPartitionGroup key = new LogPartitionGroup(group, partition); 091 LogTailer ret = tailersAssignments.get(key); 092 if (ret != null && !ret.closed()) { 093 throw new IllegalArgumentException( 094 "Tailer for this partition already created: " + partition + ", group: " + group); 095 } 096 if (!exists(partition.name())) { 097 throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name()); 098 } 099 } 100 101 @SuppressWarnings("unchecked") 102 @Override 103 public synchronized <M extends Externalizable> LogAppender<M> getAppender(String name) { 104 return (LogAppender<M>) appenders.computeIfAbsent(name, n -> { 105 if (exists(n)) { 106 return createAppender(n); 107 } 108 throw new IllegalArgumentException("unknown Log name: " + n); 109 }); 110 } 111 112 @Override 113 public void close() { 114 appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close); 115 appenders.clear(); 116 tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close); 117 tailers.clear(); 118 tailersAssignments.clear(); 119 } 120 121}