001package org.nuxeo.ecm.core.redis.contribs;/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Benoit Delbosc 016 */ 017 018import org.apache.commons.logging.Log; 019import org.apache.commons.logging.LogFactory; 020import org.nuxeo.ecm.core.api.NuxeoException; 021import org.nuxeo.ecm.core.redis.RedisAdmin; 022import org.nuxeo.ecm.core.redis.RedisExecutor; 023import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 024import org.nuxeo.ecm.core.storage.sql.Invalidations; 025import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 026import org.nuxeo.runtime.api.Framework; 027import redis.clients.jedis.JedisPubSub; 028import redis.clients.jedis.Pipeline; 029 030import java.io.IOException; 031import java.time.LocalDateTime; 032 033/** 034 * Redis implementation of {@link ClusterInvalidator}. 035 * 036 * Use a single channel pubsub to send invalidations. 037 * Use an HSET to register nodes, only for debug purpose so far. 038 * 039 * @since 7.4 040 */ 041public class RedisClusterInvalidator implements ClusterInvalidator { 042 043 protected static final String PREFIX = "inval"; 044 045 // PubSub channel: nuxeo:inval:<repositoryName>:channel 046 protected static final String INVALIDATION_CHANNEL = "channel"; 047 048 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 049 protected static final String CLUSTER_NODES_KEY = "nodes"; 050 051 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 052 053 protected static final String STARTED_KEY = "started"; 054 055 protected static final String LAST_INVAL_KEY = "lastInvalSent"; 056 057 protected String nodeId; 058 059 protected String repositoryName; 060 061 protected RedisExecutor redisExecutor; 062 063 protected Invalidations receivedInvals; 064 065 protected Thread subscriberThread; 066 067 protected String namespace; 068 069 protected String startedDateTime; 070 071 private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class); 072 073 @Override 074 public void initialize(String nodeId, RepositoryImpl repository) { 075 this.nodeId = nodeId; 076 this.repositoryName = repository.getName(); 077 redisExecutor = Framework.getLocalService(RedisExecutor.class); 078 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 079 namespace = redisAdmin.namespace(PREFIX, repositoryName); 080 receivedInvals = new Invalidations(); 081 createSubscriberThread(); 082 registerNode(); 083 } 084 085 protected void createSubscriberThread() { 086 String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 087 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 088 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 089 subscriberThread.setPriority(Thread.NORM_PRIORITY); 090 subscriberThread.start(); 091 } 092 093 protected void subscribeToInvalidationChannel() { 094 log.info("Subscribe to channel: " + getChannelName()); 095 redisExecutor.execute(jedis -> { 096 jedis.subscribe(new JedisPubSub() { 097 @Override 098 public void onMessage(String channel, String message) { 099 try { 100 RedisInvalidations rInvals = new RedisInvalidations(nodeId, message); 101 102 if (log.isTraceEnabled()) { 103 log.trace("Receive invalidations: " + rInvals); 104 } 105 receivedInvals.add(rInvals.getInvalidations()); 106 } catch (IllegalArgumentException e) { 107 log.error("Fail to read message: " + message, e); 108 } 109 } 110 }, getChannelName()); 111 return null; 112 }); 113 } 114 115 protected String getChannelName() { 116 return namespace + INVALIDATION_CHANNEL; 117 } 118 119 protected void registerNode() { 120 startedDateTime = getCurrentDateTime(); 121 log.info("Registering node: " + nodeId); 122 redisExecutor.execute(jedis -> { 123 String key = getNodeKey(); 124 Pipeline pipe = jedis.pipelined(); 125 pipe.hset(key, "started", startedDateTime); 126 // Use an expiration so we can access info after a shutdown 127 pipe.expire(key, TIMEOUT_REGISTER_SECOND); 128 pipe.sync(); 129 return null; 130 }); 131 } 132 133 protected String getNodeKey() { 134 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 135 } 136 137 @Override 138 public void close() { 139 log.debug("Closing"); 140 unsubscribeToInvalidationChannel(); 141 // The Jedis pool is already closed when the repository is shutdowned 142 receivedInvals.clear(); 143 } 144 145 protected void unsubscribeToInvalidationChannel() { 146 subscriberThread.interrupt(); 147 } 148 149 150 @Override 151 public Invalidations receiveInvalidations() { 152 Invalidations ret = receivedInvals; 153 receivedInvals = new Invalidations(); 154 return ret; 155 } 156 157 @Override 158 public void sendInvalidations(Invalidations invals) { 159 redisExecutor.execute(jedis -> { 160 RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals); 161 if (log.isTraceEnabled()) { 162 log.trace("Sending invalidations: " + rInvals); 163 } 164 String key = getNodeKey(); 165 166 try { 167 Pipeline pipe = jedis.pipelined(); 168 pipe.publish(getChannelName(), rInvals.serialize()); 169 pipe.hset(key, STARTED_KEY, startedDateTime); 170 pipe.hset(key, LAST_INVAL_KEY, getCurrentDateTime()); 171 pipe.expire(key, TIMEOUT_REGISTER_SECOND); 172 pipe.sync(); 173 return null; 174 } catch (IOException e) { 175 throw new NuxeoException(e); 176 } 177 }); 178 } 179 180 protected String getCurrentDateTime() { 181 return LocalDateTime.now().toString(); 182 } 183}