001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.pubsub; 020 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.function.BiConsumer; 026 027import org.nuxeo.ecm.core.api.NuxeoException; 028import org.nuxeo.ecm.core.event.EventServiceComponent; 029import org.nuxeo.runtime.model.ComponentContext; 030import org.nuxeo.runtime.model.ComponentInstance; 031import org.nuxeo.runtime.model.DefaultComponent; 032 033/** 034 * Implementation for the Publish/Subscribe Service. 035 * 036 * @since 9.1 037 */ 038public class PubSubServiceImpl extends DefaultComponent implements PubSubService { 039 040 public static final String CONFIG_XP = "configuration"; 041 042 /** All the registered descriptors. */ 043 protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>(); 044 045 /** The currently-configured provider. */ 046 protected PubSubProvider provider; 047 048 /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */ 049 protected PubSubProviderDescriptor providerDescriptor; 050 051 /** List of subscribers for each topic. */ 052 protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>(); 053 054 @Override 055 public void activate(ComponentContext context) { 056 providerDescriptorChanged(); 057 } 058 059 @Override 060 public void deactivate(ComponentContext context) { 061 subscribers.clear(); 062 provider.close(); 063 provider = null; 064 } 065 066 @Override 067 public void start(ComponentContext context) { 068 if (provider == null) { 069 return; 070 } 071 provider.initialize(subscribers); 072 } 073 074 @Override 075 public void stop(ComponentContext context) { 076 if (provider == null) { 077 return; 078 } 079 provider.close(); 080 } 081 082 @Override 083 public int getApplicationStartedOrder() { 084 // let RedisComponent start before us (Redis starts before WorkManager that starts before events) 085 return EventServiceComponent.APPLICATION_STARTED_ORDER + 10; 086 } 087 088 @Override 089 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 090 if (CONFIG_XP.equals(extensionPoint)) { 091 registerProvider((PubSubProviderDescriptor) contribution); 092 } else { 093 throw new NuxeoException("Unknown extension point: " + extensionPoint); 094 } 095 } 096 097 @Override 098 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 099 if (CONFIG_XP.equals(extensionPoint)) { 100 unregisterProvider((PubSubProviderDescriptor) contribution); 101 } 102 } 103 104 protected void registerProvider(PubSubProviderDescriptor descriptor) { 105 providerDescriptors.add(descriptor); 106 providerDescriptor = descriptor; 107 providerDescriptorChanged(); 108 } 109 110 protected void unregisterProvider(PubSubProviderDescriptor descriptor) { 111 providerDescriptors.remove(descriptor); 112 if (descriptor == providerDescriptor) { 113 // we removed the current provider, find a new one 114 int size = providerDescriptors.size(); 115 providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1); 116 providerDescriptorChanged(); 117 } 118 } 119 120 protected void providerDescriptorChanged() { 121 if (provider != null) { 122 provider.close(); 123 } 124 if (providerDescriptor == null) { 125 provider = new MemPubSubProvider(); // default implementation 126 } else { 127 provider = providerDescriptor.getInstance(); 128 } 129 // initialize later, in applicationStarted 130 // provider.initialize(subscribers); 131 } 132 133 // ===== delegation to actual implementation ===== 134 135 @Override 136 public void publish(String topic, byte[] message) { 137 provider.publish(topic, message); 138 } 139 140 @Override 141 public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 142 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber); 143 } 144 145 @Override 146 public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 147 // use computeIfAbsent also for removal to avoid thread-safety issues 148 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber); 149 } 150 151}