001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.time.LocalDateTime;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.api.NuxeoException;
032import org.nuxeo.ecm.core.redis.RedisAdmin;
033import org.nuxeo.ecm.core.redis.RedisExecutor;
034import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
035import org.nuxeo.ecm.core.storage.sql.Invalidations;
036import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
037import org.nuxeo.runtime.api.Framework;
038
039import redis.clients.jedis.JedisPubSub;
040
041/**
042 * Redis implementation of {@link ClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET to
043 * register nodes, only for debug purpose so far.
044 *
045 * @since 7.4
046 */
047public class RedisClusterInvalidator implements ClusterInvalidator {
048
049    protected static final String PREFIX = "inval";
050
051    // PubSub channel: nuxeo:inval:<repositoryName>:channel
052    protected static final String INVALIDATION_CHANNEL = "channel";
053
054    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
055    protected static final String CLUSTER_NODES_KEY = "nodes";
056
057    // Keep info about a cluster node for one day
058    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
059
060    // Max delay to wait for a channel subscription
061    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
062
063    protected static final String STARTED_FIELD = "started";
064
065    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
066
067    protected String nodeId;
068
069    protected String repositoryName;
070
071    protected RedisExecutor redisExecutor;
072
073    protected Invalidations receivedInvals;
074
075    protected Thread subscriberThread;
076
077    protected String namespace;
078
079    protected String startedDateTime;
080
081    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
082
083    private CountDownLatch subscribeLatch;
084
085    private String registerSha;
086
087    private String sendSha;
088
089    @Override
090    public void initialize(String nodeId, RepositoryImpl repository) {
091        this.nodeId = nodeId;
092        this.repositoryName = repository.getName();
093        redisExecutor = Framework.getLocalService(RedisExecutor.class);
094        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
095        namespace = redisAdmin.namespace(PREFIX, repositoryName);
096        try {
097            registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
098            sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
099        } catch (IOException e) {
100            throw new RuntimeException(e);
101        }
102        receivedInvals = new Invalidations();
103        createSubscriberThread();
104        registerNode();
105    }
106
107    protected void createSubscriberThread() {
108        subscribeLatch = new CountDownLatch(1);
109        String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
110        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
111        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
112        subscriberThread.setPriority(Thread.NORM_PRIORITY);
113        subscriberThread.start();
114        try {
115            if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
116                log.error("Redis channel subscripion timeout after " + TIMEOUT_SUBSCRIBE_SECOND
117                        + "s, continuing but this node may not receive cluster invalidations");
118            }
119        } catch (InterruptedException e) {
120            Thread.currentThread().interrupt();
121            throw new RuntimeException(e);
122        }
123    }
124
125    protected void subscribeToInvalidationChannel() {
126        log.info("Subscribing to channel: " + getChannelName());
127        redisExecutor.execute(jedis -> {
128            jedis.subscribe(new JedisPubSub() {
129                @Override
130                public void onSubscribe(String channel, int subscribedChannels) {
131                    super.onSubscribe(channel, subscribedChannels);
132                    if (subscribeLatch != null) {
133                        subscribeLatch.countDown();
134                    }
135                    if (log.isDebugEnabled()) {
136                        log.debug("Subscribed to channel: " + getChannelName());
137                    }
138                }
139
140                @Override
141                public void onMessage(String channel, String message) {
142                    try {
143                        RedisInvalidations rInvals = new RedisInvalidations(nodeId, message);
144                        if (log.isTraceEnabled()) {
145                            log.trace("Receive invalidations: " + rInvals);
146                        }
147                        Invalidations invals = rInvals.getInvalidations();
148                        synchronized (RedisClusterInvalidator.this) {
149                            receivedInvals.add(invals);
150                        }
151                    } catch (IllegalArgumentException e) {
152                        log.error("Fail to read message: " + message, e);
153                    }
154                }
155            }, getChannelName());
156            return null;
157        });
158    }
159
160    protected String getChannelName() {
161        return namespace + INVALIDATION_CHANNEL;
162    }
163
164    protected void registerNode() {
165        startedDateTime = getCurrentDateTime();
166        List<String> keys = Collections.singletonList(getNodeKey());
167        List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime,
168                Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
169        if (log.isDebugEnabled()) {
170            log.debug("Registering node: " + nodeId);
171        }
172
173        redisExecutor.evalsha(registerSha, keys, args);
174        if (log.isInfoEnabled()) {
175            log.info("Node registered: " + nodeId);
176        }
177    }
178
179    protected String getNodeKey() {
180        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
181    }
182
183    @Override
184    public void close() {
185        log.debug("Closing");
186        unsubscribeToInvalidationChannel();
187        // The Jedis pool is already closed when the repository is shutdowned
188        receivedInvals.clear();
189    }
190
191    protected void unsubscribeToInvalidationChannel() {
192        subscriberThread.interrupt();
193    }
194
195    @Override
196    public Invalidations receiveInvalidations() {
197        Invalidations newInvals = new Invalidations();
198        Invalidations ret;
199        synchronized (this) {
200            ret = receivedInvals;
201            receivedInvals = newInvals;
202        }
203        return ret;
204    }
205
206    @Override
207    public void sendInvalidations(Invalidations invals) {
208        RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals);
209        if (log.isTraceEnabled()) {
210            log.trace("Sending invalidations: " + rInvals);
211        }
212        List<String> keys = Arrays.asList(getChannelName(), getNodeKey());
213        List<String> args;
214        try {
215            args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD,
216                    getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
217        } catch (IOException e) {
218            throw new NuxeoException(e);
219        }
220
221        redisExecutor.evalsha(sendSha, keys, args);
222        log.trace("invals sent");
223    }
224
225    protected String getCurrentDateTime() {
226        return LocalDateTime.now().toString();
227    }
228}