001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.pubsub; 020 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.function.BiConsumer; 027 028import org.nuxeo.runtime.model.ComponentContext; 029import org.nuxeo.runtime.model.ComponentInstance; 030import org.nuxeo.runtime.model.DefaultComponent; 031 032/** 033 * Implementation for the Publish/Subscribe Service. 034 * 035 * @since 9.1 036 */ 037public class PubSubServiceImpl extends DefaultComponent implements PubSubService { 038 039 public static final String CONFIG_XP = "configuration"; 040 041 /** All the registered descriptors. */ 042 protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>(); 043 044 /** The currently-configured provider. */ 045 protected PubSubProvider provider; 046 047 /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */ 048 protected PubSubProviderDescriptor providerDescriptor; 049 050 /** List of subscribers for each topic. */ 051 protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>(); 052 053 protected Map<String, String> options; 054 055 @Override 056 public void activate(ComponentContext context) { 057 providerDescriptorChanged(); 058 } 059 060 @Override 061 public void deactivate(ComponentContext context) { 062 subscribers.clear(); 063 provider.close(); 064 provider = null; 065 } 066 067 @Override 068 public void start(ComponentContext context) { 069 if (provider == null) { 070 return; 071 } 072 provider.initialize(options, subscribers); 073 } 074 075 @Override 076 public void stop(ComponentContext context) { 077 if (provider == null) { 078 return; 079 } 080 provider.close(); 081 } 082 083 @Override 084 public int getApplicationStartedOrder() { 085 // let RedisComponent start before us (Redis starts before WorkManager that starts before events) 086 return -500 + 10; 087 } 088 089 @Override 090 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 091 if (CONFIG_XP.equals(extensionPoint)) { 092 registerProvider((PubSubProviderDescriptor) contribution); 093 } else { 094 throw new RuntimeException("Unknown extension point: " + extensionPoint); 095 } 096 } 097 098 @Override 099 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 100 if (CONFIG_XP.equals(extensionPoint)) { 101 unregisterProvider((PubSubProviderDescriptor) contribution); 102 } 103 } 104 105 protected void registerProvider(PubSubProviderDescriptor descriptor) { 106 providerDescriptors.add(descriptor); 107 providerDescriptor = descriptor; 108 providerDescriptorChanged(); 109 } 110 111 protected void unregisterProvider(PubSubProviderDescriptor descriptor) { 112 providerDescriptors.remove(descriptor); 113 if (descriptor == providerDescriptor) { 114 // we removed the current provider, find a new one 115 int size = providerDescriptors.size(); 116 providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1); 117 providerDescriptorChanged(); 118 } 119 } 120 121 protected void providerDescriptorChanged() { 122 if (provider != null) { 123 provider.close(); 124 } 125 if (providerDescriptor == null) { 126 provider = new MemPubSubProvider(); // default implementation 127 options = Collections.emptyMap(); 128 } else { 129 provider = providerDescriptor.getInstance(); 130 options = providerDescriptor.getOptions(); 131 } 132 // initialize later, in applicationStarted 133 // provider.initialize(subscribers); 134 } 135 136 // ===== delegation to actual implementation ===== 137 138 @Override 139 public void publish(String topic, byte[] message) { 140 provider.publish(topic, message); 141 } 142 143 @Override 144 public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 145 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber); 146 } 147 148 @Override 149 public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 150 // use computeIfAbsent also for removal to avoid thread-safety issues 151 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber); 152 } 153 154}