001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.IOException; 022import java.time.LocalDateTime; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.api.NuxeoException; 027import org.nuxeo.ecm.core.redis.RedisAdmin; 028import org.nuxeo.ecm.core.redis.RedisExecutor; 029import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 030import org.nuxeo.ecm.core.storage.sql.Invalidations; 031import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 032import org.nuxeo.runtime.api.Framework; 033 034import redis.clients.jedis.JedisPubSub; 035import redis.clients.jedis.Pipeline; 036 037/** 038 * Redis implementation of {@link ClusterInvalidator}. 039 * 040 * Use a single channel pubsub to send invalidations. 041 * Use an HSET to register nodes, only for debug purpose so far. 042 * 043 * @since 7.4 044 */ 045public class RedisClusterInvalidator implements ClusterInvalidator { 046 047 protected static final String PREFIX = "inval"; 048 049 // PubSub channel: nuxeo:inval:<repositoryName>:channel 050 protected static final String INVALIDATION_CHANNEL = "channel"; 051 052 // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId> 053 protected static final String CLUSTER_NODES_KEY = "nodes"; 054 055 protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600; 056 057 protected static final String STARTED_KEY = "started"; 058 059 protected static final String LAST_INVAL_KEY = "lastInvalSent"; 060 061 protected String nodeId; 062 063 protected String repositoryName; 064 065 protected RedisExecutor redisExecutor; 066 067 protected Invalidations receivedInvals; 068 069 protected Thread subscriberThread; 070 071 protected String namespace; 072 073 protected String startedDateTime; 074 075 private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class); 076 077 @Override 078 public void initialize(String nodeId, RepositoryImpl repository) { 079 this.nodeId = nodeId; 080 this.repositoryName = repository.getName(); 081 redisExecutor = Framework.getLocalService(RedisExecutor.class); 082 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 083 namespace = redisAdmin.namespace(PREFIX, repositoryName); 084 receivedInvals = new Invalidations(); 085 createSubscriberThread(); 086 registerNode(); 087 } 088 089 protected void createSubscriberThread() { 090 String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId; 091 subscriberThread = new Thread(this::subscribeToInvalidationChannel, name); 092 subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e)); 093 subscriberThread.setPriority(Thread.NORM_PRIORITY); 094 subscriberThread.start(); 095 } 096 097 protected void subscribeToInvalidationChannel() { 098 log.info("Subscribe to channel: " + getChannelName()); 099 redisExecutor.execute(jedis -> { 100 jedis.subscribe(new JedisPubSub() { 101 @Override 102 public void onMessage(String channel, String message) { 103 try { 104 RedisInvalidations rInvals = new RedisInvalidations(nodeId, message); 105 106 if (log.isTraceEnabled()) { 107 log.trace("Receive invalidations: " + rInvals); 108 } 109 Invalidations invals = rInvals.getInvalidations(); 110 synchronized (RedisClusterInvalidator.this) { 111 receivedInvals.add(invals); 112 } 113 } catch (IllegalArgumentException e) { 114 log.error("Fail to read message: " + message, e); 115 } 116 } 117 }, getChannelName()); 118 return null; 119 }); 120 } 121 122 protected String getChannelName() { 123 return namespace + INVALIDATION_CHANNEL; 124 } 125 126 protected void registerNode() { 127 startedDateTime = getCurrentDateTime(); 128 log.info("Registering node: " + nodeId); 129 redisExecutor.execute(jedis -> { 130 String key = getNodeKey(); 131 Pipeline pipe = jedis.pipelined(); 132 pipe.hset(key, "started", startedDateTime); 133 // Use an expiration so we can access info after a shutdown 134 pipe.expire(key, TIMEOUT_REGISTER_SECOND); 135 pipe.sync(); 136 return null; 137 }); 138 } 139 140 protected String getNodeKey() { 141 return namespace + CLUSTER_NODES_KEY + ":" + nodeId; 142 } 143 144 @Override 145 public void close() { 146 log.debug("Closing"); 147 unsubscribeToInvalidationChannel(); 148 // The Jedis pool is already closed when the repository is shutdowned 149 receivedInvals.clear(); 150 } 151 152 protected void unsubscribeToInvalidationChannel() { 153 subscriberThread.interrupt(); 154 } 155 156 157 @Override 158 public Invalidations receiveInvalidations() { 159 Invalidations newInvals = new Invalidations(); 160 Invalidations ret; 161 synchronized (this) { 162 ret = receivedInvals; 163 receivedInvals = newInvals; 164 } 165 return ret; 166 } 167 168 @Override 169 public void sendInvalidations(Invalidations invals) { 170 redisExecutor.execute(jedis -> { 171 RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals); 172 if (log.isTraceEnabled()) { 173 log.trace("Sending invalidations: " + rInvals); 174 } 175 String key = getNodeKey(); 176 177 try { 178 Pipeline pipe = jedis.pipelined(); 179 pipe.publish(getChannelName(), rInvals.serialize()); 180 pipe.hset(key, STARTED_KEY, startedDateTime); 181 pipe.hset(key, LAST_INVAL_KEY, getCurrentDateTime()); 182 pipe.expire(key, TIMEOUT_REGISTER_SECOND); 183 pipe.sync(); 184 return null; 185 } catch (IOException e) { 186 throw new NuxeoException(e); 187 } 188 }); 189 } 190 191 protected String getCurrentDateTime() { 192 return LocalDateTime.now().toString(); 193 } 194}