001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.pubsub;
020
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.function.BiConsumer;
026
027import org.nuxeo.ecm.core.api.NuxeoException;
028import org.nuxeo.ecm.core.event.EventServiceComponent;
029import org.nuxeo.runtime.RuntimeServiceEvent;
030import org.nuxeo.runtime.RuntimeServiceListener;
031import org.nuxeo.runtime.api.Framework;
032import org.nuxeo.runtime.model.ComponentContext;
033import org.nuxeo.runtime.model.ComponentInstance;
034import org.nuxeo.runtime.model.DefaultComponent;
035
036/**
037 * Implementation for the Publish/Subscribe Service.
038 *
039 * @since 9.1
040 */
041public class PubSubServiceImpl extends DefaultComponent implements PubSubService {
042
043    public static final String CONFIG_XP = "configuration";
044
045    /** All the registered descriptors. */
046    protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>();
047
048    /** The currently-configured provider. */
049    protected PubSubProvider provider;
050
051    /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */
052    protected PubSubProviderDescriptor providerDescriptor;
053
054    /** List of subscribers for each topic. */
055    protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>();
056
057    @Override
058    public void activate(ComponentContext context) {
059        providerDescriptorChanged();
060    }
061
062    @Override
063    public void deactivate(ComponentContext context) {
064        subscribers.clear();
065        provider.close();
066        provider = null;
067    }
068
069    @Override
070    public void applicationStarted(ComponentContext context) {
071        if (provider != null) {
072            provider.initialize(subscribers);
073        }
074        Framework.addListener(new RuntimeServiceListener() {
075            @Override
076            public void handleEvent(RuntimeServiceEvent event) {
077                if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) {
078                    return;
079                }
080                Framework.removeListener(this);
081                if (provider != null) {
082                    provider.close();
083                }
084            }
085        });
086    }
087
088    @Override
089    public int getApplicationStartedOrder() {
090        // let RedisComponent start before us (Redis starts before WorkManager that starts before events)
091        return EventServiceComponent.APPLICATION_STARTED_ORDER + 10;
092    }
093
094    @Override
095    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
096        if (CONFIG_XP.equals(extensionPoint)) {
097            registerProvider((PubSubProviderDescriptor) contribution);
098        } else {
099            throw new NuxeoException("Unknown extension point: " + extensionPoint);
100        }
101    }
102
103    @Override
104    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
105        if (CONFIG_XP.equals(extensionPoint)) {
106            unregisterProvider((PubSubProviderDescriptor) contribution);
107        }
108    }
109
110    protected void registerProvider(PubSubProviderDescriptor descriptor) {
111        providerDescriptors.add(descriptor);
112        providerDescriptor = descriptor;
113        providerDescriptorChanged();
114    }
115
116    protected void unregisterProvider(PubSubProviderDescriptor descriptor) {
117        providerDescriptors.remove(descriptor);
118        if (descriptor == providerDescriptor) {
119            // we removed the current provider, find a new one
120            int size = providerDescriptors.size();
121            providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1);
122            providerDescriptorChanged();
123        }
124    }
125
126    protected void providerDescriptorChanged() {
127        if (provider != null) {
128            provider.close();
129        }
130        if (providerDescriptor == null) {
131            provider = new MemPubSubProvider(); // default implementation
132        } else {
133            provider = providerDescriptor.getInstance();
134        }
135        // initialize later, in applicationStarted
136        // provider.initialize(subscribers);
137    }
138
139    // ===== delegation to actual implementation =====
140
141    @Override
142    public void publish(String topic, byte[] message) {
143        provider.publish(topic, message);
144    }
145
146    @Override
147    public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
148        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber);
149    }
150
151    @Override
152    public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
153        // use computeIfAbsent also for removal to avoid thread-safety issues
154        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber);
155    }
156
157}