001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.IOException; 022import java.time.LocalDateTime; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.ecm.core.model.Repository; 032import org.nuxeo.ecm.core.redis.RedisAdmin; 033import org.nuxeo.ecm.core.redis.RedisExecutor; 034import org.nuxeo.ecm.core.storage.dbs.DBSClusterInvalidator; 035import org.nuxeo.ecm.core.storage.dbs.DBSInvalidations; 036import org.nuxeo.runtime.api.Framework; 037 038import redis.clients.jedis.JedisPubSub; 039 040/** 041 * Redis implementation of {@link DBSClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET 042 * to register nodes, only for debug purpose so far. 043 * 044 * @since 8.10 045 * @deprecated since 9.1, use DBSPubSubInvalidator instead 046 */ 047@Deprecated 048public class RedisDBSClusterInvalidator implements DBSClusterInvalidator { 049 050 protected static final String PREFIX = "inval"; 051 052 // PubSub channel: nuxeo:inval:<repositoryName>:channel 053 protected static final String INVALIDATION_CHANNEL = "channel"; 054 055 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 056 protected static final String CLUSTER_NODES_KEY = "nodes"; 057 058 // Keep info about a cluster node for one day 059 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 060 061 // Max delay to wait for a channel subscription 062 protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10; 063 064 protected static final String STARTED_FIELD = "started"; 065 066 protected static final String LAST_INVAL_FIELD = "lastInvalSent"; 067 068 protected String nodeId; 069 070 protected String repositoryName; 071 072 protected RedisExecutor redisExecutor; 073 074 protected DBSInvalidations receivedInvals; 075 076 protected Thread subscriberThread; 077 078 protected String namespace; 079 080 protected String startedDateTime; 081 082 private static final Log log = LogFactory.getLog(RedisDBSClusterInvalidator.class); 083 084 private CountDownLatch subscribeLatch; 085 086 private String registerSha; 087 088 private String sendSha; 089 090 @Override 091 public void initialize(String nodeId, String repositoryName) { 092 this.nodeId = nodeId; 093 this.repositoryName = repositoryName; 094 redisExecutor = Framework.getService(RedisExecutor.class); 095 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 096 namespace = redisAdmin.namespace(PREFIX, repositoryName); 097 try { 098 registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval"); 099 sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval"); 100 } catch (IOException e) { 101 throw new RuntimeException(e); 102 } 103 receivedInvals = new DBSInvalidations(); 104 createSubscriberThread(); 105 registerNode(); 106 } 107 108 protected void createSubscriberThread() { 109 subscribeLatch = new CountDownLatch(1); 110 String name = "RedisDBSClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 111 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 112 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 113 subscriberThread.setPriority(Thread.NORM_PRIORITY); 114 subscriberThread.start(); 115 try { 116 if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) { 117 log.error("Redis channel subscription timeout after " + TIMEOUT_SUBSCRIBE_SECOND 118 + "s, continuing but this node may not receive cluster invalidations"); 119 } 120 } catch (InterruptedException e) { 121 Thread.currentThread().interrupt(); 122 throw new RuntimeException(e); 123 } 124 } 125 126 protected void subscribeToInvalidationChannel() { 127 log.info("Subscribing to channel: " + getChannelName()); 128 redisExecutor.subscribe(new JedisPubSub() { 129 @Override 130 public void onSubscribe(String channel, int subscribedChannels) { 131 super.onSubscribe(channel, subscribedChannels); 132 if (subscribeLatch != null) { 133 subscribeLatch.countDown(); 134 } 135 if (log.isDebugEnabled()) { 136 log.debug("Subscribed to channel: " + getChannelName()); 137 } 138 } 139 140 @Override 141 public void onMessage(String channel, String message) { 142 try { 143 RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, message); 144 if (log.isTraceEnabled()) { 145 log.trace("Receive invalidations: " + rInvals); 146 } 147 DBSInvalidations invals = rInvals.getInvalidations(); 148 synchronized (RedisDBSClusterInvalidator.this) { 149 receivedInvals.add(invals); 150 } 151 } catch (IllegalArgumentException e) { 152 log.error("Fail to read message: " + message, e); 153 } 154 } 155 }, getChannelName()); 156 } 157 158 protected String getChannelName() { 159 return namespace + INVALIDATION_CHANNEL; 160 } 161 162 protected void registerNode() { 163 startedDateTime = getCurrentDateTime(); 164 List<String> keys = Collections.singletonList(getNodeKey()); 165 List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime, 166 Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 167 if (log.isDebugEnabled()) { 168 log.debug("Registering node: " + nodeId); 169 } 170 171 redisExecutor.evalsha(registerSha, keys, args); 172 if (log.isInfoEnabled()) { 173 log.info("Node registered: " + nodeId); 174 } 175 } 176 177 protected String getNodeKey() { 178 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 179 } 180 181 @Override 182 public void close() { 183 log.debug("Closing"); 184 unsubscribeToInvalidationChannel(); 185 // The Jedis pool is already closed when the repository is shutdowned 186 receivedInvals.clear(); 187 } 188 189 protected void unsubscribeToInvalidationChannel() { 190 subscriberThread.interrupt(); 191 } 192 193 @Override 194 public DBSInvalidations receiveInvalidations() { 195 DBSInvalidations newInvals = new DBSInvalidations(); 196 DBSInvalidations ret; 197 synchronized (this) { 198 ret = receivedInvals; 199 receivedInvals = newInvals; 200 } 201 return ret; 202 } 203 204 @Override 205 public void sendInvalidations(DBSInvalidations invals) { 206 RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, invals); 207 if (log.isTraceEnabled()) { 208 log.trace("Sending invalidations: " + rInvals); 209 } 210 List<String> keys = Arrays.asList(getChannelName(), getNodeKey()); 211 List<String> args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD, 212 getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 213 214 redisExecutor.evalsha(sendSha, keys, args); 215 log.trace("invals sent"); 216 } 217 218 protected String getCurrentDateTime() { 219 return LocalDateTime.now().toString(); 220 } 221}