001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.pubsub; 020 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.function.BiConsumer; 026 027import org.nuxeo.ecm.core.api.NuxeoException; 028import org.nuxeo.ecm.core.event.EventServiceComponent; 029import org.nuxeo.runtime.RuntimeServiceEvent; 030import org.nuxeo.runtime.RuntimeServiceListener; 031import org.nuxeo.runtime.api.Framework; 032import org.nuxeo.runtime.model.ComponentContext; 033import org.nuxeo.runtime.model.ComponentInstance; 034import org.nuxeo.runtime.model.DefaultComponent; 035 036/** 037 * Implementation for the Publish/Subscribe Service. 038 * 039 * @since 9.1 040 */ 041public class PubSubServiceImpl extends DefaultComponent implements PubSubService { 042 043 public static final String CONFIG_XP = "configuration"; 044 045 /** All the registered descriptors. */ 046 protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>(); 047 048 /** The currently-configured provider. */ 049 protected PubSubProvider provider; 050 051 /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */ 052 protected PubSubProviderDescriptor providerDescriptor; 053 054 /** List of subscribers for each topic. */ 055 protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>(); 056 057 @Override 058 public void activate(ComponentContext context) { 059 providerDescriptorChanged(); 060 } 061 062 @Override 063 public void deactivate(ComponentContext context) { 064 subscribers.clear(); 065 provider.close(); 066 provider = null; 067 } 068 069 @Override 070 public void applicationStarted(ComponentContext context) { 071 if (provider != null) { 072 provider.initialize(subscribers); 073 } 074 Framework.addListener(new RuntimeServiceListener() { 075 @Override 076 public void handleEvent(RuntimeServiceEvent event) { 077 if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) { 078 return; 079 } 080 Framework.removeListener(this); 081 if (provider != null) { 082 provider.close(); 083 } 084 } 085 }); 086 } 087 088 @Override 089 public int getApplicationStartedOrder() { 090 // let RedisComponent start before us (Redis starts before WorkManager that starts before events) 091 return EventServiceComponent.APPLICATION_STARTED_ORDER + 10; 092 } 093 094 @Override 095 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 096 if (CONFIG_XP.equals(extensionPoint)) { 097 registerProvider((PubSubProviderDescriptor) contribution); 098 } else { 099 throw new NuxeoException("Unknown extension point: " + extensionPoint); 100 } 101 } 102 103 @Override 104 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 105 if (CONFIG_XP.equals(extensionPoint)) { 106 unregisterProvider((PubSubProviderDescriptor) contribution); 107 } 108 } 109 110 protected void registerProvider(PubSubProviderDescriptor descriptor) { 111 providerDescriptors.add(descriptor); 112 providerDescriptor = descriptor; 113 providerDescriptorChanged(); 114 } 115 116 protected void unregisterProvider(PubSubProviderDescriptor descriptor) { 117 providerDescriptors.remove(descriptor); 118 if (descriptor == providerDescriptor) { 119 // we removed the current provider, find a new one 120 int size = providerDescriptors.size(); 121 providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1); 122 providerDescriptorChanged(); 123 } 124 } 125 126 protected void providerDescriptorChanged() { 127 if (provider != null) { 128 provider.close(); 129 } 130 if (providerDescriptor == null) { 131 provider = new MemPubSubProvider(); // default implementation 132 } else { 133 provider = providerDescriptor.getInstance(); 134 } 135 // initialize later, in applicationStarted 136 // provider.initialize(subscribers); 137 } 138 139 // ===== delegation to actual implementation ===== 140 141 @Override 142 public void publish(String topic, byte[] message) { 143 provider.publish(topic, message); 144 } 145 146 @Override 147 public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 148 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber); 149 } 150 151 @Override 152 public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 153 // use computeIfAbsent also for removal to avoid thread-safety issues 154 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber); 155 } 156 157}