001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.IOException; 022import java.time.LocalDateTime; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.ecm.core.model.Repository; 032import org.nuxeo.ecm.core.redis.RedisAdmin; 033import org.nuxeo.ecm.core.redis.RedisExecutor; 034import org.nuxeo.ecm.core.storage.dbs.DBSClusterInvalidator; 035import org.nuxeo.ecm.core.storage.dbs.DBSInvalidations; 036import org.nuxeo.runtime.api.Framework; 037 038import redis.clients.jedis.JedisPubSub; 039 040/** 041 * Redis implementation of {@link DBSClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET 042 * to register nodes, only for debug purpose so far. 043 * 044 * @since 8.10 045 */ 046public class RedisDBSClusterInvalidator implements DBSClusterInvalidator { 047 048 protected static final String PREFIX = "inval"; 049 050 // PubSub channel: nuxeo:inval:<repositoryName>:channel 051 protected static final String INVALIDATION_CHANNEL = "channel"; 052 053 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 054 protected static final String CLUSTER_NODES_KEY = "nodes"; 055 056 // Keep info about a cluster node for one day 057 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 058 059 // Max delay to wait for a channel subscription 060 protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10; 061 062 protected static final String STARTED_FIELD = "started"; 063 064 protected static final String LAST_INVAL_FIELD = "lastInvalSent"; 065 066 protected String nodeId; 067 068 protected String repositoryName; 069 070 protected RedisExecutor redisExecutor; 071 072 protected DBSInvalidations receivedInvals; 073 074 protected Thread subscriberThread; 075 076 protected String namespace; 077 078 protected String startedDateTime; 079 080 private static final Log log = LogFactory.getLog(RedisDBSClusterInvalidator.class); 081 082 private CountDownLatch subscribeLatch; 083 084 private String registerSha; 085 086 private String sendSha; 087 088 @Override 089 public void initialize(String nodeId, Repository repository) { 090 this.nodeId = nodeId; 091 this.repositoryName = repository.getName(); 092 redisExecutor = Framework.getService(RedisExecutor.class); 093 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 094 namespace = redisAdmin.namespace(PREFIX, repositoryName); 095 try { 096 registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval"); 097 sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval"); 098 } catch (IOException e) { 099 throw new RuntimeException(e); 100 } 101 receivedInvals = new DBSInvalidations(); 102 createSubscriberThread(); 103 registerNode(); 104 } 105 106 protected void createSubscriberThread() { 107 subscribeLatch = new CountDownLatch(1); 108 String name = "RedisDBSClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 109 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 110 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 111 subscriberThread.setPriority(Thread.NORM_PRIORITY); 112 subscriberThread.start(); 113 try { 114 if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) { 115 log.error("Redis channel subscripion timeout after " + TIMEOUT_SUBSCRIBE_SECOND 116 + "s, continuing but this node may not receive cluster invalidations"); 117 } 118 } catch (InterruptedException e) { 119 Thread.currentThread().interrupt(); 120 throw new RuntimeException(e); 121 } 122 } 123 124 protected void subscribeToInvalidationChannel() { 125 log.info("Subscribing to channel: " + getChannelName()); 126 redisExecutor.execute(jedis -> { 127 jedis.subscribe(new JedisPubSub() { 128 @Override 129 public void onSubscribe(String channel, int subscribedChannels) { 130 super.onSubscribe(channel, subscribedChannels); 131 if (subscribeLatch != null) { 132 subscribeLatch.countDown(); 133 } 134 if (log.isDebugEnabled()) { 135 log.debug("Subscribed to channel: " + getChannelName()); 136 } 137 } 138 139 @Override 140 public void onMessage(String channel, String message) { 141 try { 142 RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, message); 143 if (log.isTraceEnabled()) { 144 log.trace("Receive invalidations: " + rInvals); 145 } 146 DBSInvalidations invals = rInvals.getInvalidations(); 147 synchronized (RedisDBSClusterInvalidator.this) { 148 receivedInvals.add(invals); 149 } 150 } catch (IllegalArgumentException e) { 151 log.error("Fail to read message: " + message, e); 152 } 153 } 154 }, getChannelName()); 155 return null; 156 }); 157 } 158 159 protected String getChannelName() { 160 return namespace + INVALIDATION_CHANNEL; 161 } 162 163 protected void registerNode() { 164 startedDateTime = getCurrentDateTime(); 165 List<String> keys = Collections.singletonList(getNodeKey()); 166 List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime, 167 Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 168 if (log.isDebugEnabled()) { 169 log.debug("Registering node: " + nodeId); 170 } 171 172 redisExecutor.evalsha(registerSha, keys, args); 173 if (log.isInfoEnabled()) { 174 log.info("Node registered: " + nodeId); 175 } 176 } 177 178 protected String getNodeKey() { 179 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 180 } 181 182 @Override 183 public void close() { 184 log.debug("Closing"); 185 unsubscribeToInvalidationChannel(); 186 // The Jedis pool is already closed when the repository is shutdowned 187 receivedInvals.clear(); 188 } 189 190 protected void unsubscribeToInvalidationChannel() { 191 subscriberThread.interrupt(); 192 } 193 194 @Override 195 public DBSInvalidations receiveInvalidations() { 196 DBSInvalidations newInvals = new DBSInvalidations(); 197 DBSInvalidations ret; 198 synchronized (this) { 199 ret = receivedInvals; 200 receivedInvals = newInvals; 201 } 202 return ret; 203 } 204 205 @Override 206 public void sendInvalidations(DBSInvalidations invals) { 207 RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, invals); 208 if (log.isTraceEnabled()) { 209 log.trace("Sending invalidations: " + rInvals); 210 } 211 List<String> keys = Arrays.asList(getChannelName(), getNodeKey()); 212 List<String> args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD, 213 getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()); 214 215 redisExecutor.evalsha(sendSha, keys, args); 216 log.trace("invals sent"); 217 } 218 219 protected String getCurrentDateTime() { 220 return LocalDateTime.now().toString(); 221 } 222}