001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.pubsub; 020 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.function.BiConsumer; 026 027import org.nuxeo.runtime.model.ComponentContext; 028import org.nuxeo.runtime.model.ComponentInstance; 029import org.nuxeo.runtime.model.DefaultComponent; 030 031/** 032 * Implementation for the Publish/Subscribe Service. 033 * 034 * @since 9.1 035 */ 036public class PubSubServiceImpl extends DefaultComponent implements PubSubService { 037 038 public static final String CONFIG_XP = "configuration"; 039 040 /** All the registered descriptors. */ 041 protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>(); 042 043 /** The currently-configured provider. */ 044 protected PubSubProvider provider; 045 046 /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */ 047 protected PubSubProviderDescriptor providerDescriptor; 048 049 /** List of subscribers for each topic. */ 050 protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>(); 051 052 @Override 053 public void activate(ComponentContext context) { 054 providerDescriptorChanged(); 055 } 056 057 @Override 058 public void deactivate(ComponentContext context) { 059 subscribers.clear(); 060 provider.close(); 061 provider = null; 062 } 063 064 @Override 065 public void start(ComponentContext context) { 066 if (provider == null) { 067 return; 068 } 069 provider.initialize(subscribers); 070 } 071 072 @Override 073 public void stop(ComponentContext context) { 074 if (provider == null) { 075 return; 076 } 077 provider.close(); 078 } 079 080 @Override 081 public int getApplicationStartedOrder() { 082 // let RedisComponent start before us (Redis starts before WorkManager that starts before events) 083 return -500 + 10; 084 } 085 086 @Override 087 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 088 if (CONFIG_XP.equals(extensionPoint)) { 089 registerProvider((PubSubProviderDescriptor) contribution); 090 } else { 091 throw new RuntimeException("Unknown extension point: " + extensionPoint); 092 } 093 } 094 095 @Override 096 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 097 if (CONFIG_XP.equals(extensionPoint)) { 098 unregisterProvider((PubSubProviderDescriptor) contribution); 099 } 100 } 101 102 protected void registerProvider(PubSubProviderDescriptor descriptor) { 103 providerDescriptors.add(descriptor); 104 providerDescriptor = descriptor; 105 providerDescriptorChanged(); 106 } 107 108 protected void unregisterProvider(PubSubProviderDescriptor descriptor) { 109 providerDescriptors.remove(descriptor); 110 if (descriptor == providerDescriptor) { 111 // we removed the current provider, find a new one 112 int size = providerDescriptors.size(); 113 providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1); 114 providerDescriptorChanged(); 115 } 116 } 117 118 protected void providerDescriptorChanged() { 119 if (provider != null) { 120 provider.close(); 121 } 122 if (providerDescriptor == null) { 123 provider = new MemPubSubProvider(); // default implementation 124 } else { 125 provider = providerDescriptor.getInstance(); 126 } 127 // initialize later, in applicationStarted 128 // provider.initialize(subscribers); 129 } 130 131 // ===== delegation to actual implementation ===== 132 133 @Override 134 public void publish(String topic, byte[] message) { 135 provider.publish(topic, message); 136 } 137 138 @Override 139 public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 140 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber); 141 } 142 143 @Override 144 public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 145 // use computeIfAbsent also for removal to avoid thread-safety issues 146 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber); 147 } 148 149}