001package org.nuxeo.ecm.core.redis.contribs;/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Benoit Delbosc 016 */ 017 018import java.io.IOException; 019import java.time.LocalDateTime; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.api.NuxeoException; 024import org.nuxeo.ecm.core.redis.RedisAdmin; 025import org.nuxeo.ecm.core.redis.RedisExecutor; 026import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 027import org.nuxeo.ecm.core.storage.sql.Invalidations; 028import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 029import org.nuxeo.runtime.api.Framework; 030 031import redis.clients.jedis.JedisPubSub; 032import redis.clients.jedis.Pipeline; 033 034/** 035 * Redis implementation of {@link ClusterInvalidator}. 036 * 037 * Use a single channel pubsub to send invalidations. 038 * Use an HSET to register nodes, only for debug purpose so far. 039 * 040 * @since 7.4 041 */ 042public class RedisClusterInvalidator implements ClusterInvalidator { 043 044 protected static final String PREFIX = "inval"; 045 046 // PubSub channel: nuxeo:inval:<repositoryName>:channel 047 protected static final String INVALIDATION_CHANNEL = "channel"; 048 049 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 050 protected static final String CLUSTER_NODES_KEY = "nodes"; 051 052 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 053 054 protected static final String STARTED_KEY = "started"; 055 056 protected static final String LAST_INVAL_KEY = "lastInvalSent"; 057 058 protected String nodeId; 059 060 protected String repositoryName; 061 062 protected RedisExecutor redisExecutor; 063 064 protected Invalidations receivedInvals; 065 066 protected Thread subscriberThread; 067 068 protected String namespace; 069 070 protected String startedDateTime; 071 072 private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class); 073 074 @Override 075 public void initialize(String nodeId, RepositoryImpl repository) { 076 this.nodeId = nodeId; 077 this.repositoryName = repository.getName(); 078 redisExecutor = Framework.getLocalService(RedisExecutor.class); 079 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 080 namespace = redisAdmin.namespace(PREFIX, repositoryName); 081 receivedInvals = new Invalidations(); 082 createSubscriberThread(); 083 registerNode(); 084 } 085 086 protected void createSubscriberThread() { 087 String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 088 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 089 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 090 subscriberThread.setPriority(Thread.NORM_PRIORITY); 091 subscriberThread.start(); 092 } 093 094 protected void subscribeToInvalidationChannel() { 095 log.info("Subscribe to channel: " + getChannelName()); 096 redisExecutor.execute(jedis -> { 097 jedis.subscribe(new JedisPubSub() { 098 @Override 099 public void onMessage(String channel, String message) { 100 try { 101 RedisInvalidations rInvals = new RedisInvalidations(nodeId, message); 102 103 if (log.isTraceEnabled()) { 104 log.trace("Receive invalidations: " + rInvals); 105 } 106 Invalidations invals = rInvals.getInvalidations(); 107 synchronized (RedisClusterInvalidator.this) { 108 receivedInvals.add(invals); 109 } 110 } catch (IllegalArgumentException e) { 111 log.error("Fail to read message: " + message, e); 112 } 113 } 114 }, getChannelName()); 115 return null; 116 }); 117 } 118 119 protected String getChannelName() { 120 return namespace + INVALIDATION_CHANNEL; 121 } 122 123 protected void registerNode() { 124 startedDateTime = getCurrentDateTime(); 125 log.info("Registering node: " + nodeId); 126 redisExecutor.execute(jedis -> { 127 String key = getNodeKey(); 128 Pipeline pipe = jedis.pipelined(); 129 pipe.hset(key, "started", startedDateTime); 130 // Use an expiration so we can access info after a shutdown 131 pipe.expire(key, TIMEOUT_REGISTER_SECOND); 132 pipe.sync(); 133 return null; 134 }); 135 } 136 137 protected String getNodeKey() { 138 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 139 } 140 141 @Override 142 public void close() { 143 log.debug("Closing"); 144 unsubscribeToInvalidationChannel(); 145 // The Jedis pool is already closed when the repository is shutdowned 146 receivedInvals.clear(); 147 } 148 149 protected void unsubscribeToInvalidationChannel() { 150 subscriberThread.interrupt(); 151 } 152 153 154 @Override 155 public Invalidations receiveInvalidations() { 156 Invalidations newInvals = new Invalidations(); 157 Invalidations ret; 158 synchronized (this) { 159 ret = receivedInvals; 160 receivedInvals = newInvals; 161 } 162 return ret; 163 } 164 165 @Override 166 public void sendInvalidations(Invalidations invals) { 167 redisExecutor.execute(jedis -> { 168 RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals); 169 if (log.isTraceEnabled()) { 170 log.trace("Sending invalidations: " + rInvals); 171 } 172 String key = getNodeKey(); 173 174 try { 175 Pipeline pipe = jedis.pipelined(); 176 pipe.publish(getChannelName(), rInvals.serialize()); 177 pipe.hset(key, STARTED_KEY, startedDateTime); 178 pipe.hset(key, LAST_INVAL_KEY, getCurrentDateTime()); 179 pipe.expire(key, TIMEOUT_REGISTER_SECOND); 180 pipe.sync(); 181 return null; 182 } catch (IOException e) { 183 throw new NuxeoException(e); 184 } 185 }); 186 } 187 188 protected String getCurrentDateTime() { 189 return LocalDateTime.now().toString(); 190 } 191}