001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.api.NuxeoException; 024import org.nuxeo.ecm.core.redis.RedisAdmin; 025import org.nuxeo.ecm.core.redis.RedisExecutor; 026import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 027import org.nuxeo.ecm.core.storage.sql.Invalidations; 028import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 029import org.nuxeo.runtime.api.Framework; 030import redis.clients.jedis.JedisPubSub; 031 032import java.io.IOException; 033import java.time.LocalDateTime; 034import java.util.Arrays; 035import java.util.List; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.TimeUnit; 038 039/** 040 * Redis implementation of {@link ClusterInvalidator}. 041 * 042 * Use a single channel pubsub to send invalidations. 043 * Use an HSET to register nodes, only for debug purpose so far. 044 * 045 * @since 7.4 046 */ 047public class RedisClusterInvalidator implements ClusterInvalidator { 048 049 protected static final String PREFIX = "inval"; 050 051 // PubSub channel: nuxeo:inval:<repositoryName>:channel 052 protected static final String INVALIDATION_CHANNEL = "channel"; 053 054 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 055 protected static final String CLUSTER_NODES_KEY = "nodes"; 056 057 // Keep info about a cluster node for one day 058 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 059 060 // Max delay to wait for a channel subscription 061 protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10; 062 063 protected static final String STARTED_FIELD = "started"; 064 065 protected static final String LAST_INVAL_FIELD = "lastInvalSent"; 066 067 protected String nodeId; 068 069 protected String repositoryName; 070 071 protected RedisExecutor redisExecutor; 072 073 protected Invalidations receivedInvals; 074 075 protected Thread subscriberThread; 076 077 protected String namespace; 078 079 protected String startedDateTime; 080 081 private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class); 082 083 private CountDownLatch subscribeLatch; 084 085 private String registerSha; 086 private String sendSha; 087 088 @Override 089 public void initialize(String nodeId, RepositoryImpl repository) { 090 this.nodeId = nodeId; 091 this.repositoryName = repository.getName(); 092 redisExecutor = Framework.getLocalService(RedisExecutor.class); 093 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 094 namespace = redisAdmin.namespace(PREFIX, repositoryName); 095 try { 096 registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval"); 097 sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval"); 098 } catch (IOException e) { 099 throw new RuntimeException(e); 100 } 101 receivedInvals = new Invalidations(); 102 createSubscriberThread(); 103 registerNode(); 104 } 105 106 protected void createSubscriberThread() { 107 subscribeLatch = new CountDownLatch(1); 108 String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 109 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 110 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 111 subscriberThread.setPriority(Thread.NORM_PRIORITY); 112 subscriberThread.start(); 113 try { 114 if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) { 115 log.error("Redis channel subscripion timeout after " + TIMEOUT_SUBSCRIBE_SECOND + 116 "s, continuing but this node may not receive cluster invalidations"); 117 } 118 } catch (InterruptedException e) { 119 Thread.currentThread().interrupt(); 120 throw new RuntimeException(e); 121 } 122 } 123 124 protected void subscribeToInvalidationChannel() { 125 log.info("Subscribing to channel: " + getChannelName()); 126 redisExecutor.execute(jedis -> { 127 jedis.subscribe(new JedisPubSub() { 128 @Override 129 public void onSubscribe(String channel, int subscribedChannels) { 130 super.onSubscribe(channel, subscribedChannels); 131 if (subscribeLatch != null) { 132 subscribeLatch.countDown(); 133 } 134 log.debug("Subscribed to channel: " + getChannelName()); 135 } 136 137 @Override 138 public void onMessage(String channel, String message) { 139 try { 140 RedisInvalidations rInvals = new RedisInvalidations(nodeId, message); 141 if (log.isTraceEnabled()) { 142 log.trace("Receive invalidations: " + rInvals); 143 } 144 Invalidations invals = rInvals.getInvalidations(); 145 synchronized (RedisClusterInvalidator.this) { 146 receivedInvals.add(invals); 147 } 148 } catch (IllegalArgumentException e) { 149 log.error("Fail to read message: " + message, e); 150 } 151 } 152 }, getChannelName()); 153 return null; 154 }); 155 } 156 157 protected String getChannelName() { 158 return namespace + INVALIDATION_CHANNEL; 159 } 160 161 protected void registerNode() { 162 final String startedDateTime = getCurrentDateTime(); 163 final List<String> keys = Arrays.asList(getNodeKey()); 164 final List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime, 165 Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 166 log.debug("Registering node: " + nodeId); 167 168 redisExecutor.execute(jedis -> { 169 jedis.evalsha(registerSha, keys, args); 170 log.info("Node registered: " + nodeId); 171 return null; 172 }); 173 } 174 175 protected String getNodeKey() { 176 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 177 } 178 179 @Override 180 public void close() { 181 log.debug("Closing"); 182 unsubscribeToInvalidationChannel(); 183 // The Jedis pool is already closed when the repository is shutdowned 184 receivedInvals.clear(); 185 } 186 187 protected void unsubscribeToInvalidationChannel() { 188 subscriberThread.interrupt(); 189 } 190 191 192 @Override 193 public Invalidations receiveInvalidations() { 194 Invalidations newInvals = new Invalidations(); 195 Invalidations ret; 196 synchronized (this) { 197 ret = receivedInvals; 198 receivedInvals = newInvals; 199 } 200 return ret; 201 } 202 203 @Override 204 public void sendInvalidations(Invalidations invals) { 205 final String startedDateTime = getCurrentDateTime(); 206 RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals); 207 if (log.isTraceEnabled()) { 208 log.trace("Sending invalidations: " + rInvals); 209 } 210 final List<String> keys = Arrays.asList(getChannelName(), getNodeKey()); 211 final List<String> args; 212 try { 213 args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, 214 LAST_INVAL_FIELD, getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 215 } catch (IOException e) { 216 throw new NuxeoException(e); 217 } 218 219 redisExecutor.execute(jedis -> { 220 jedis.evalsha(sendSha, keys, args); 221 if (log.isTraceEnabled()) { 222 log.trace("invals sent"); 223 } 224 return null; 225 }); 226 } 227 228 protected String getCurrentDateTime() { 229 return LocalDateTime.now().toString(); 230 } 231}