001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.IOException; 022import java.time.LocalDateTime; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.ecm.core.api.NuxeoException; 032import org.nuxeo.ecm.core.redis.RedisAdmin; 033import org.nuxeo.ecm.core.redis.RedisExecutor; 034import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 035import org.nuxeo.ecm.core.storage.sql.Invalidations; 036import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 037import org.nuxeo.runtime.api.Framework; 038 039import redis.clients.jedis.JedisPubSub; 040 041/** 042 * Redis implementation of {@link ClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET to 043 * register nodes, only for debug purpose so far. 044 * 045 * @since 7.4 046 */ 047public class RedisClusterInvalidator implements ClusterInvalidator { 048 049 protected static final String PREFIX = "inval"; 050 051 // PubSub channel: nuxeo:inval:<repositoryName>:channel 052 protected static final String INVALIDATION_CHANNEL = "channel"; 053 054 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 055 protected static final String CLUSTER_NODES_KEY = "nodes"; 056 057 // Keep info about a cluster node for one day 058 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 059 060 // Max delay to wait for a channel subscription 061 protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10; 062 063 protected static final String STARTED_FIELD = "started"; 064 065 protected static final String LAST_INVAL_FIELD = "lastInvalSent"; 066 067 protected String nodeId; 068 069 protected String repositoryName; 070 071 protected RedisExecutor redisExecutor; 072 073 protected Invalidations receivedInvals; 074 075 protected Thread subscriberThread; 076 077 protected String namespace; 078 079 protected String startedDateTime; 080 081 private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class); 082 083 private CountDownLatch subscribeLatch; 084 085 private String registerSha; 086 087 private String sendSha; 088 089 @Override 090 public void initialize(String nodeId, RepositoryImpl repository) { 091 this.nodeId = nodeId; 092 this.repositoryName = repository.getName(); 093 redisExecutor = Framework.getLocalService(RedisExecutor.class); 094 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 095 namespace = redisAdmin.namespace(PREFIX, repositoryName); 096 try { 097 registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval"); 098 sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval"); 099 } catch (IOException e) { 100 throw new RuntimeException(e); 101 } 102 receivedInvals = new Invalidations(); 103 createSubscriberThread(); 104 registerNode(); 105 } 106 107 protected void createSubscriberThread() { 108 subscribeLatch = new CountDownLatch(1); 109 String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 110 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 111 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 112 subscriberThread.setPriority(Thread.NORM_PRIORITY); 113 subscriberThread.start(); 114 try { 115 if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) { 116 log.error("Redis channel subscripion timeout after " + TIMEOUT_SUBSCRIBE_SECOND 117 + "s, continuing but this node may not receive cluster invalidations"); 118 } 119 } catch (InterruptedException e) { 120 Thread.currentThread().interrupt(); 121 throw new RuntimeException(e); 122 } 123 } 124 125 protected void subscribeToInvalidationChannel() { 126 log.info("Subscribing to channel: " + getChannelName()); 127 redisExecutor.execute(jedis -> { 128 jedis.subscribe(new JedisPubSub() { 129 @Override 130 public void onSubscribe(String channel, int subscribedChannels) { 131 super.onSubscribe(channel, subscribedChannels); 132 if (subscribeLatch != null) { 133 subscribeLatch.countDown(); 134 } 135 if (log.isDebugEnabled()) { 136 log.debug("Subscribed to channel: " + getChannelName()); 137 } 138 } 139 140 @Override 141 public void onMessage(String channel, String message) { 142 try { 143 RedisInvalidations rInvals = new RedisInvalidations(nodeId, message); 144 if (log.isTraceEnabled()) { 145 log.trace("Receive invalidations: " + rInvals); 146 } 147 Invalidations invals = rInvals.getInvalidations(); 148 synchronized (RedisClusterInvalidator.this) { 149 receivedInvals.add(invals); 150 } 151 } catch (IllegalArgumentException e) { 152 log.error("Fail to read message: " + message, e); 153 } 154 } 155 }, getChannelName()); 156 return null; 157 }); 158 } 159 160 protected String getChannelName() { 161 return namespace + INVALIDATION_CHANNEL; 162 } 163 164 protected void registerNode() { 165 startedDateTime = getCurrentDateTime(); 166 List<String> keys = Collections.singletonList(getNodeKey()); 167 List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime, 168 Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 169 if (log.isDebugEnabled()) { 170 log.debug("Registering node: " + nodeId); 171 } 172 173 redisExecutor.evalsha(registerSha, keys, args); 174 if (log.isInfoEnabled()) { 175 log.info("Node registered: " + nodeId); 176 } 177 } 178 179 protected String getNodeKey() { 180 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 181 } 182 183 @Override 184 public void close() { 185 log.debug("Closing"); 186 unsubscribeToInvalidationChannel(); 187 // The Jedis pool is already closed when the repository is shutdowned 188 receivedInvals.clear(); 189 } 190 191 protected void unsubscribeToInvalidationChannel() { 192 subscriberThread.interrupt(); 193 } 194 195 @Override 196 public Invalidations receiveInvalidations() { 197 Invalidations newInvals = new Invalidations(); 198 Invalidations ret; 199 synchronized (this) { 200 ret = receivedInvals; 201 receivedInvals = newInvals; 202 } 203 return ret; 204 } 205 206 @Override 207 public void sendInvalidations(Invalidations invals) { 208 RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals); 209 if (log.isTraceEnabled()) { 210 log.trace("Sending invalidations: " + rInvals); 211 } 212 List<String> keys = Arrays.asList(getChannelName(), getNodeKey()); 213 List<String> args; 214 try { 215 args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD, 216 getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 217 } catch (IOException e) { 218 throw new NuxeoException(e); 219 } 220 221 redisExecutor.evalsha(sendSha, keys, args); 222 log.trace("invals sent"); 223 } 224 225 protected String getCurrentDateTime() { 226 return LocalDateTime.now().toString(); 227 } 228}