001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.pubsub;
020
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.function.BiConsumer;
026
027import org.nuxeo.ecm.core.api.NuxeoException;
028import org.nuxeo.ecm.core.event.EventServiceComponent;
029import org.nuxeo.runtime.model.ComponentContext;
030import org.nuxeo.runtime.model.ComponentInstance;
031import org.nuxeo.runtime.model.DefaultComponent;
032
033/**
034 * Implementation for the Publish/Subscribe Service.
035 *
036 * @since 9.1
037 */
038public class PubSubServiceImpl extends DefaultComponent implements PubSubService {
039
040    public static final String CONFIG_XP = "configuration";
041
042    /** All the registered descriptors. */
043    protected List<PubSubProviderDescriptor> providerDescriptors = new CopyOnWriteArrayList<>();
044
045    /** The currently-configured provider. */
046    protected PubSubProvider provider;
047
048    /** The descriptor for the currently-configured provider, or {@code null} if it's the default. */
049    protected PubSubProviderDescriptor providerDescriptor;
050
051    /** List of subscribers for each topic. */
052    protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>();
053
054    @Override
055    public void activate(ComponentContext context) {
056        providerDescriptorChanged();
057    }
058
059    @Override
060    public void deactivate(ComponentContext context) {
061        subscribers.clear();
062        provider.close();
063        provider = null;
064    }
065
066    @Override
067    public void start(ComponentContext context) {
068        if (provider == null) {
069            return;
070        }
071        provider.initialize(subscribers);
072    }
073
074    @Override
075    public void stop(ComponentContext context) {
076        if (provider == null) {
077            return;
078        }
079        provider.close();
080    }
081
082    @Override
083    public int getApplicationStartedOrder() {
084        // let RedisComponent start before us (Redis starts before WorkManager that starts before events)
085        return EventServiceComponent.APPLICATION_STARTED_ORDER + 10;
086    }
087
088    @Override
089    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
090        if (CONFIG_XP.equals(extensionPoint)) {
091            registerProvider((PubSubProviderDescriptor) contribution);
092        } else {
093            throw new NuxeoException("Unknown extension point: " + extensionPoint);
094        }
095    }
096
097    @Override
098    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
099        if (CONFIG_XP.equals(extensionPoint)) {
100            unregisterProvider((PubSubProviderDescriptor) contribution);
101        }
102    }
103
104    protected void registerProvider(PubSubProviderDescriptor descriptor) {
105        providerDescriptors.add(descriptor);
106        providerDescriptor = descriptor;
107        providerDescriptorChanged();
108    }
109
110    protected void unregisterProvider(PubSubProviderDescriptor descriptor) {
111        providerDescriptors.remove(descriptor);
112        if (descriptor == providerDescriptor) {
113            // we removed the current provider, find a new one
114            int size = providerDescriptors.size();
115            providerDescriptor = size == 0 ? null : providerDescriptors.get(size - 1);
116            providerDescriptorChanged();
117        }
118    }
119
120    protected void providerDescriptorChanged() {
121        if (provider != null) {
122            provider.close();
123        }
124        if (providerDescriptor == null) {
125            provider = new MemPubSubProvider(); // default implementation
126        } else {
127            provider = providerDescriptor.getInstance();
128        }
129        // initialize later, in applicationStarted
130        // provider.initialize(subscribers);
131    }
132
133    // ===== delegation to actual implementation =====
134
135    @Override
136    public void publish(String topic, byte[] message) {
137        provider.publish(topic, message);
138    }
139
140    @Override
141    public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
142        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber);
143    }
144
145    @Override
146    public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
147        // use computeIfAbsent also for removal to avoid thread-safety issues
148        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber);
149    }
150
151}