001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.internals;
020
021import java.io.Externalizable;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Map;
025import java.util.Objects;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028
029import org.nuxeo.lib.stream.log.LogAppender;
030import org.nuxeo.lib.stream.log.LogManager;
031import org.nuxeo.lib.stream.log.LogPartition;
032import org.nuxeo.lib.stream.log.LogTailer;
033import org.nuxeo.lib.stream.log.RebalanceListener;
034
035public abstract class AbstractLogManager implements LogManager {
036    protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap<>();
037
038    protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<>();
039
040    // this define a concurrent set of tailers
041    protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap<LogTailer, Boolean>());
042
043    protected abstract void create(String name, int size);
044
045    protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String name);
046
047    protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
048            String group);
049
050    protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
051            RebalanceListener listener);
052
053    @Override
054    public synchronized boolean createIfNotExists(String name, int size) {
055        if (!exists(name)) {
056            create(name, size);
057            return true;
058        }
059        return false;
060    }
061
062    @Override
063    public boolean delete(String name) {
064        return false;
065    }
066
067    @Override
068    public <M extends Externalizable> LogTailer<M> createTailer(String group, Collection<LogPartition> partitions) {
069        partitions.forEach(partition -> checkInvalidAssignment(group, partition));
070        LogTailer<M> ret = doCreateTailer(partitions, group);
071        partitions.forEach(partition -> tailersAssignments.put(new LogPartitionGroup(group, partition), ret));
072        tailers.add(ret);
073        return ret;
074    }
075
076    @Override
077    public boolean supportSubscribe() {
078        return false;
079    }
080
081    @Override
082    public <M extends Externalizable> LogTailer<M> subscribe(String group, Collection<String> names,
083            RebalanceListener listener) {
084        LogTailer<M> ret = doSubscribe(group, names, listener);
085        tailers.add(ret);
086        return ret;
087    }
088
089    protected void checkInvalidAssignment(String group, LogPartition partition) {
090        LogPartitionGroup key = new LogPartitionGroup(group, partition);
091        LogTailer ret = tailersAssignments.get(key);
092        if (ret != null && !ret.closed()) {
093            throw new IllegalArgumentException(
094                    "Tailer for this partition already created: " + partition + ", group: " + group);
095        }
096        if (!exists(partition.name())) {
097            throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name());
098        }
099    }
100
101    @SuppressWarnings("unchecked")
102    @Override
103    public synchronized <M extends Externalizable> LogAppender<M> getAppender(String name) {
104        return (LogAppender<M>) appenders.computeIfAbsent(name, n -> {
105            if (exists(n)) {
106                return createAppender(n);
107            }
108            throw new IllegalArgumentException("unknown Log name: " + n);
109        });
110    }
111
112    @Override
113    public void close() {
114        appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close);
115        appenders.clear();
116        tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close);
117        tailers.clear();
118        tailersAssignments.clear();
119    }
120
121}