001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.pubsub; 020 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.function.BiConsumer; 027 028import org.nuxeo.runtime.model.ComponentContext; 029import org.nuxeo.runtime.model.DefaultComponent; 030 031/** 032 * Implementation for the Publish/Subscribe Service. 033 * 034 * @since 9.1 035 */ 036public class PubSubServiceImpl extends DefaultComponent implements PubSubService { 037 038 public static final String XP_CONFIG = "configuration"; 039 040 /** The currently-configured provider. */ 041 protected PubSubProvider provider; 042 043 /** List of subscribers for each topic. */ 044 protected Map<String, List<BiConsumer<String, byte[]>>> subscribers = new ConcurrentHashMap<>(); 045 046 protected Map<String, String> options; 047 048 @Override 049 public void deactivate(ComponentContext context) { 050 subscribers.clear(); 051 provider.close(); 052 provider = null; 053 super.deactivate(context); 054 } 055 056 @Override 057 public void start(ComponentContext context) { 058 super.start(context); 059 if (provider != null) { 060 provider.close(); 061 } 062 List<PubSubProviderDescriptor> descs = getDescriptors(XP_CONFIG); 063 PubSubProviderDescriptor providerDescriptor = descs.isEmpty() ? null : descs.get(descs.size() - 1); 064 if (providerDescriptor == null) { 065 provider = new MemPubSubProvider(); // default implementation 066 options = Collections.emptyMap(); 067 } else { 068 Class<? extends PubSubProvider> klass = providerDescriptor.klass; 069 // dynamic class check, the generics aren't enough 070 if (!PubSubProvider.class.isAssignableFrom(klass)) { 071 throw new RuntimeException("Class does not implement PubSubServiceProvider: " + klass.getName()); 072 } 073 try { 074 provider = klass.getDeclaredConstructor().newInstance(); 075 options = providerDescriptor.options; 076 } catch (ReflectiveOperationException e) { 077 throw new RuntimeException(e); 078 } 079 } 080 provider.initialize(options, subscribers); 081 } 082 083 @Override 084 public void stop(ComponentContext context) throws InterruptedException { 085 super.stop(context); 086 if (provider == null) { 087 return; 088 } 089 provider.close(); 090 } 091 092 @Override 093 public int getApplicationStartedOrder() { 094 // let RedisComponent start before us (Redis starts before WorkManager that starts before events) 095 return -500 + 10; 096 } 097 098 // ===== delegation to actual implementation ===== 099 100 @Override 101 public void publish(String topic, byte[] message) { 102 provider.publish(topic, message); 103 } 104 105 @Override 106 public void registerSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 107 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(subscriber); 108 } 109 110 @Override 111 public void unregisterSubscriber(String topic, BiConsumer<String, byte[]> subscriber) { 112 // use computeIfAbsent also for removal to avoid thread-safety issues 113 subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).remove(subscriber); 114 } 115 116}