001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals;
020
021
022import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
027
028import java.io.Externalizable;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034
035
036public abstract class AbstractMQManager<M extends Externalizable> implements MQManager<M> {
037    private final Map<String, MQAppender<M>> appenders = new ConcurrentHashMap<>();
038    private final Map<MQPartitionGroup, MQTailer<M>> tailersAssignments = new ConcurrentHashMap<>();
039    private final Set<MQTailer<M>> tailers = Collections.newSetFromMap(new ConcurrentHashMap<MQTailer<M>, Boolean>());
040
041    protected abstract void create(String name, int size);
042
043    protected abstract MQAppender<M> createAppender(String name);
044
045    protected abstract MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group);
046
047    protected abstract MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener);
048
049    @Override
050    public boolean createIfNotExists(String name, int size) {
051        if (!exists(name)) {
052            create(name, size);
053            return true;
054        }
055        return false;
056    }
057
058    @Override
059    public boolean delete(String name) {
060        return false;
061    }
062
063    @Override
064    public MQTailer<M> createTailer(String group, Collection<MQPartition> partitions) {
065        partitions.forEach(partition -> checkTailerForPartition(group, partition));
066        MQTailer<M> ret = acquireTailer(partitions, group);
067        partitions.forEach(partition -> tailersAssignments.put(new MQPartitionGroup(group, partition), ret));
068        tailers.add(ret);
069        return ret;
070    }
071
072    @Override
073    public boolean supportSubscribe() {
074        return false;
075    }
076
077    @Override
078    public MQTailer<M> subscribe(String group, Collection<String> names, MQRebalanceListener listener) {
079        MQTailer<M> ret = doSubscribe(group, names, listener);
080        tailers.add(ret);
081        return ret;
082    }
083
084
085    private void checkTailerForPartition(String group, MQPartition partition) {
086        MQPartitionGroup key = new MQPartitionGroup(group, partition);
087        MQTailer<M> ret = tailersAssignments.get(key);
088        if (ret != null && !ret.closed()) {
089            throw new IllegalArgumentException("Tailer for this partition already created: " + partition + ", group: " + group);
090        }
091        if (!exists(partition.name())) {
092            throw new IllegalArgumentException("Tailer with unknown MQueue name: " + partition.name());
093        }
094    }
095
096    @Override
097    public MQTailer<M> createTailer(String group, MQPartition partition) {
098        return createTailer(group, Collections.singletonList(partition));
099    }
100
101    @Override
102    public synchronized MQAppender<M> getAppender(String name) {
103        if (!appenders.containsKey(name) || appenders.get(name).closed()) {
104            if (exists(name)) {
105                appenders.put(name, createAppender(name));
106            } else {
107                throw new IllegalArgumentException("unknown MQueue name: " + name);
108            }
109        }
110        return appenders.get(name);
111    }
112
113
114    @Override
115    public void close() throws Exception {
116        // TODO: check if we want this behavior, closing the manager close all MQueue
117        for (MQAppender<M> app : appenders.values()) {
118            app.close();
119        }
120        appenders.clear();
121        for (MQTailer<M> tailer : tailers) {
122            tailer.close();
123        }
124        tailers.clear();
125        tailersAssignments.clear();
126    }
127}