001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.pubsub;
020
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.function.BiConsumer;
027
028import org.nuxeo.runtime.model.ComponentContext;
029import org.nuxeo.runtime.model.ComponentInstance;
030import org.nuxeo.runtime.model.DefaultComponent;
031
032/**
033 * Implementation for the Publish/Subscribe Service.
034 *
035 * @since 9.1
036 */
037public class PubSubServiceImpl extends DefaultComponent implements PubSubService {
038
039    public static final String CONFIG_XP = "configuration";
040
041    /** All the registered descriptors. */
042    protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>();
043
044    /** The currently-configured provider. */
045    protected PubSubProvider provider;
046
047    /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */
048    protected PubSubProviderDescriptor providerDescriptor;
049
050    /** List of subscribers for each topic. */
051    protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>();
052
053    protected Map<String, String> options;
054
055    @Override
056    public void activate(ComponentContext context) {
057        providerDescriptorChanged();
058    }
059
060    @Override
061    public void deactivate(ComponentContext context) {
062        subscribers.clear();
063        provider.close();
064        provider = null;
065    }
066
067    @Override
068    public void start(ComponentContext context) {
069        if (provider == null) {
070            return;
071        }
072        provider.initialize(options, subscribers);
073    }
074
075    @Override
076    public void stop(ComponentContext context) {
077        if (provider == null) {
078            return;
079        }
080        provider.close();
081    }
082
083    @Override
084    public int getApplicationStartedOrder() {
085        // let RedisComponent start before us (Redis starts before WorkManager that starts before events)
086        return -500 + 10;
087    }
088
089    @Override
090    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
091        if (CONFIG_XP.equals(extensionPoint)) {
092            registerProvider((PubSubProviderDescriptor) contribution);
093        } else {
094            throw new RuntimeException("Unknown extension point: " + extensionPoint);
095        }
096    }
097
098    @Override
099    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
100        if (CONFIG_XP.equals(extensionPoint)) {
101            unregisterProvider((PubSubProviderDescriptor) contribution);
102        }
103    }
104
105    protected void registerProvider(PubSubProviderDescriptor descriptor) {
106        providerDescriptors.add(descriptor);
107        providerDescriptor = descriptor;
108        providerDescriptorChanged();
109    }
110
111    protected void unregisterProvider(PubSubProviderDescriptor descriptor) {
112        providerDescriptors.remove(descriptor);
113        if (descriptor == providerDescriptor) {
114            // we removed the current provider, find a new one
115            int size = providerDescriptors.size();
116            providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1);
117            providerDescriptorChanged();
118        }
119    }
120
121    protected void providerDescriptorChanged() {
122        if (provider != null) {
123            provider.close();
124        }
125        if (providerDescriptor == null) {
126            provider = new MemPubSubProvider(); // default implementation
127            options = Collections.emptyMap();
128        } else {
129            provider = providerDescriptor.getInstance();
130            options = providerDescriptor.getOptions();
131        }
132        // initialize later, in applicationStarted
133        // provider.initialize(subscribers);
134    }
135
136    // ===== delegation to actual implementation =====
137
138    @Override
139    public void publish(String topic, byte[] message) {
140        provider.publish(topic, message);
141    }
142
143    @Override
144    public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
145        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber);
146    }
147
148    @Override
149    public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
150        // use computeIfAbsent also for removal to avoid thread-safety issues
151        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber);
152    }
153
154}