001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.pubsub;
020
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.function.BiConsumer;
027
028import org.nuxeo.runtime.model.ComponentContext;
029import org.nuxeo.runtime.model.DefaultComponent;
030
031/**
032 * Implementation for the Publish/Subscribe Service.
033 *
034 * @since 9.1
035 */
036public class PubSubServiceImpl extends DefaultComponent implements PubSubService {
037
038    public static final String XP_CONFIG = "configuration";
039
040    /** The currently-configured provider. */
041    protected PubSubProvider provider;
042
043    /** List of subscribers for each topic. */
044    protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>();
045
046    protected Map<String, String> options;
047
048    @Override
049    public void deactivate(ComponentContext context) {
050        subscribers.clear();
051        provider.close();
052        provider = null;
053        super.deactivate(context);
054    }
055
056    @Override
057    public void start(ComponentContext context) {
058        super.start(context);
059        if (provider != null) {
060            provider.close();
061        }
062        List<PubSubProviderDescriptor> descs = getDescriptors(XP_CONFIG);
063        PubSubProviderDescriptor providerDescriptor = descs.isEmpty() ? null : descs.get(descs.size() - 1);
064        if (providerDescriptor == null) {
065            provider = new MemPubSubProvider(); // default implementation
066            options = Collections.emptyMap();
067        } else {
068            Class<? extends PubSubProvider> klass = providerDescriptor.klass;
069            // dynamic class check, the generics aren't enough
070            if (!PubSubProvider.class.isAssignableFrom(klass)) {
071                throw new RuntimeException("Class does not implement PubSubServiceProvider: " + klass.getName());
072            }
073            try {
074                provider = klass.getDeclaredConstructor().newInstance();
075                options = providerDescriptor.options;
076            } catch (ReflectiveOperationException e) {
077                throw new RuntimeException(e);
078            }
079        }
080        provider.initialize(options, subscribers);
081    }
082
083    @Override
084    public void stop(ComponentContext context) throws InterruptedException {
085        super.stop(context);
086        if (provider == null) {
087            return;
088        }
089        provider.close();
090    }
091
092    @Override
093    public int getApplicationStartedOrder() {
094        // let RedisComponent start before us (Redis starts before WorkManager that starts before events)
095        return -500 + 10;
096    }
097
098    // ===== delegation to actual implementation =====
099
100    @Override
101    public void publish(String topic, byte[] message) {
102        provider.publish(topic, message);
103    }
104
105    @Override
106    public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
107        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber);
108    }
109
110    @Override
111    public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) {
112        // use computeIfAbsent also for removal to avoid thread-safety issues
113        subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber);
114    }
115
116}