001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals; 020 021 022import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener; 026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 027 028import java.io.Externalizable; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034 035 036public abstract class AbstractMQManager<M extends Externalizable> implements MQManager<M> { 037 private final Map<String, MQAppender<M>> appenders = new ConcurrentHashMap<>(); 038 private final Map<MQPartitionGroup, MQTailer<M>> tailersAssignments = new ConcurrentHashMap<>(); 039 private final Set<MQTailer<M>> tailers = Collections.newSetFromMap(new ConcurrentHashMap<MQTailer<M>, Boolean>()); 040 041 protected abstract void create(String name, int size); 042 043 protected abstract MQAppender<M> createAppender(String name); 044 045 protected abstract MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group); 046 047 protected abstract MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener); 048 049 @Override 050 public boolean createIfNotExists(String name, int size) { 051 if (!exists(name)) { 052 create(name, size); 053 return true; 054 } 055 return false; 056 } 057 058 @Override 059 public boolean delete(String name) { 060 return false; 061 } 062 063 @Override 064 public MQTailer<M> createTailer(String group, Collection<MQPartition> partitions) { 065 partitions.forEach(partition -> checkTailerForPartition(group, partition)); 066 MQTailer<M> ret = acquireTailer(partitions, group); 067 partitions.forEach(partition -> tailersAssignments.put(new MQPartitionGroup(group, partition), ret)); 068 tailers.add(ret); 069 return ret; 070 } 071 072 @Override 073 public boolean supportSubscribe() { 074 return false; 075 } 076 077 @Override 078 public MQTailer<M> subscribe(String group, Collection<String> names, MQRebalanceListener listener) { 079 MQTailer<M> ret = doSubscribe(group, names, listener); 080 tailers.add(ret); 081 return ret; 082 } 083 084 085 private void checkTailerForPartition(String group, MQPartition partition) { 086 MQPartitionGroup key = new MQPartitionGroup(group, partition); 087 MQTailer<M> ret = tailersAssignments.get(key); 088 if (ret != null && !ret.closed()) { 089 throw new IllegalArgumentException("Tailer for this partition already created: " + partition + ", group: " + group); 090 } 091 if (!exists(partition.name())) { 092 throw new IllegalArgumentException("Tailer with unknown MQueue name: " + partition.name()); 093 } 094 } 095 096 @Override 097 public MQTailer<M> createTailer(String group, MQPartition partition) { 098 return createTailer(group, Collections.singletonList(partition)); 099 } 100 101 @Override 102 public synchronized MQAppender<M> getAppender(String name) { 103 if (!appenders.containsKey(name) || appenders.get(name).closed()) { 104 if (exists(name)) { 105 appenders.put(name, createAppender(name)); 106 } else { 107 throw new IllegalArgumentException("unknown MQueue name: " + name); 108 } 109 } 110 return appenders.get(name); 111 } 112 113 114 @Override 115 public void close() throws Exception { 116 // TODO: check if we want this behavior, closing the manager close all MQueue 117 for (MQAppender<M> app : appenders.values()) { 118 app.close(); 119 } 120 appenders.clear(); 121 for (MQTailer<M> tailer : tailers) { 122 tailer.close(); 123 } 124 tailers.clear(); 125 tailersAssignments.clear(); 126 } 127}