001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.time.LocalDateTime;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.api.NuxeoException;
032import org.nuxeo.ecm.core.redis.RedisAdmin;
033import org.nuxeo.ecm.core.redis.RedisExecutor;
034import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
035import org.nuxeo.ecm.core.storage.sql.Invalidations;
036import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
037import org.nuxeo.runtime.api.Framework;
038import redis.clients.jedis.JedisPubSub;
039
040/**
041 * Redis implementation of {@link ClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET to
042 * register nodes, only for debug purpose so far.
043 *
044 * @since 7.4
045 * @deprecated since 9.1, use VCSPubSubInvalidator instead
046 */
047@Deprecated
048public class RedisClusterInvalidator implements ClusterInvalidator {
049
050    protected static final String PREFIX = "inval";
051
052    // PubSub channel: nuxeo:inval:<repositoryName>:channel
053    protected static final String INVALIDATION_CHANNEL = "channel";
054
055    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
056    protected static final String CLUSTER_NODES_KEY = "nodes";
057
058    // Keep info about a cluster node for one day
059    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
060
061    // Max delay to wait for a channel subscription
062    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
063
064    protected static final String STARTED_FIELD = "started";
065
066    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
067
068    protected String nodeId;
069
070    protected String repositoryName;
071
072    protected RedisExecutor redisExecutor;
073
074    protected Invalidations receivedInvals;
075
076    protected Thread subscriberThread;
077
078    protected String namespace;
079
080    protected String startedDateTime;
081
082    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
083
084    private CountDownLatch subscribeLatch;
085
086    private String registerSha;
087
088    private String sendSha;
089
090    @Override
091    public void initialize(String nodeId, RepositoryImpl repository) {
092        this.nodeId = nodeId;
093        this.repositoryName = repository.getName();
094        redisExecutor = Framework.getLocalService(RedisExecutor.class);
095        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
096        namespace = redisAdmin.namespace(PREFIX, repositoryName);
097        try {
098            registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
099            sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
100        } catch (IOException e) {
101            throw new RuntimeException(e);
102        }
103        receivedInvals = new Invalidations();
104        createSubscriberThread();
105        registerNode();
106    }
107
108    protected void createSubscriberThread() {
109        subscribeLatch = new CountDownLatch(1);
110        String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
111        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
112        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
113        subscriberThread.setPriority(Thread.NORM_PRIORITY);
114        subscriberThread.start();
115        try {
116            if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
117                log.error("Redis channel subscription timeout after " + TIMEOUT_SUBSCRIBE_SECOND
118                        + "s, continuing but this node may not receive cluster invalidations");
119            }
120        } catch (InterruptedException e) {
121            Thread.currentThread().interrupt();
122            throw new RuntimeException(e);
123        }
124    }
125
126    protected void subscribeToInvalidationChannel() {
127        log.info("Subscribing to channel: " + getChannelName());
128        redisExecutor.subscribe(new JedisPubSub() {
129                @Override
130                public void onSubscribe(String channel, int subscribedChannels) {
131                    super.onSubscribe(channel, subscribedChannels);
132                    if (subscribeLatch != null) {
133                        subscribeLatch.countDown();
134                    }
135                    if (log.isDebugEnabled()) {
136                        log.debug("Subscribed to channel: " + getChannelName());
137                    }
138                }
139
140                @Override
141                public void onMessage(String channel, String message) {
142                    try {
143                        RedisInvalidations rInvals = new RedisInvalidations(nodeId, message);
144                        if (log.isTraceEnabled()) {
145                            log.trace("Receive invalidations: " + rInvals);
146                        }
147                        Invalidations invals = rInvals.getInvalidations();
148                        synchronized (RedisClusterInvalidator.this) {
149                            receivedInvals.add(invals);
150                        }
151                    } catch (IllegalArgumentException e) {
152                        // don't log error because of unit tests
153                        // this subscriber is not correctly unregistered when shutting down
154                        // and later unit tests send messages in unexpected format to it
155                        log.debug("Fail to read message: " + message, e);
156                    }
157                }
158            }, getChannelName());
159    }
160
161    protected String getChannelName() {
162        return namespace + INVALIDATION_CHANNEL;
163    }
164
165    protected void registerNode() {
166        startedDateTime = getCurrentDateTime();
167        List<String> keys = Collections.singletonList(getNodeKey());
168        List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime,
169                Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
170        if (log.isDebugEnabled()) {
171            log.debug("Registering node: " + nodeId);
172        }
173
174        redisExecutor.evalsha(registerSha, keys, args);
175        if (log.isInfoEnabled()) {
176            log.info("Node registered: " + nodeId);
177        }
178    }
179
180    protected String getNodeKey() {
181        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
182    }
183
184    @Override
185    public void close() {
186        log.debug("Closing");
187        unsubscribeToInvalidationChannel();
188        // The Jedis pool is already closed when the repository is shutdowned
189        receivedInvals.clear();
190    }
191
192    protected void unsubscribeToInvalidationChannel() {
193        subscriberThread.interrupt();
194    }
195
196    @Override
197    public Invalidations receiveInvalidations() {
198        Invalidations newInvals = new Invalidations();
199        Invalidations ret;
200        synchronized (this) {
201            ret = receivedInvals;
202            receivedInvals = newInvals;
203        }
204        return ret;
205    }
206
207    @Override
208    public void sendInvalidations(Invalidations invals) {
209        RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals);
210        if (log.isTraceEnabled()) {
211            log.trace("Sending invalidations: " + rInvals);
212        }
213        List<String> keys = Arrays.asList(getChannelName(), getNodeKey());
214        List<String> args;
215        try {
216            args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD,
217                    getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
218        } catch (IOException e) {
219            throw new NuxeoException(e);
220        }
221
222        redisExecutor.evalsha(sendSha, keys, args);
223        log.trace("invals sent");
224    }
225
226    protected String getCurrentDateTime() {
227        return LocalDateTime.now().toString();
228    }
229}