001package org.nuxeo.ecm.core.redis.contribs;/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Benoit Delbosc
016 */
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.nuxeo.ecm.core.api.NuxeoException;
021import org.nuxeo.ecm.core.redis.RedisAdmin;
022import org.nuxeo.ecm.core.redis.RedisExecutor;
023import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
024import org.nuxeo.ecm.core.storage.sql.Invalidations;
025import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
026import org.nuxeo.runtime.api.Framework;
027import redis.clients.jedis.JedisPubSub;
028import redis.clients.jedis.Pipeline;
029
030import java.io.IOException;
031import java.time.LocalDateTime;
032
033/**
034 * Redis implementation of {@link ClusterInvalidator}.
035 *
036 * Use a single channel pubsub to send invalidations.
037 * Use an HSET to register nodes, only for debug purpose so far.
038 *
039 * @since 7.4
040 */
041public class RedisClusterInvalidator implements ClusterInvalidator {
042
043    protected static final String PREFIX = "inval";
044
045    // PubSub channel: nuxeo:inval:<repositoryName>:channel
046    protected static final String INVALIDATION_CHANNEL = "channel";
047
048    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
049    protected static final String CLUSTER_NODES_KEY = "nodes";
050
051    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
052
053    protected static final String STARTED_KEY = "started";
054
055    protected static final String LAST_INVAL_KEY = "lastInvalSent";
056
057    protected String nodeId;
058
059    protected String repositoryName;
060
061    protected RedisExecutor redisExecutor;
062
063    protected Invalidations receivedInvals;
064
065    protected Thread subscriberThread;
066
067    protected String namespace;
068
069    protected String startedDateTime;
070
071    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
072
073    @Override
074    public void initialize(String nodeId, RepositoryImpl repository) {
075        this.nodeId = nodeId;
076        this.repositoryName = repository.getName();
077        redisExecutor = Framework.getLocalService(RedisExecutor.class);
078        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
079        namespace = redisAdmin.namespace(PREFIX, repositoryName);
080        receivedInvals = new Invalidations();
081        createSubscriberThread();
082        registerNode();
083    }
084
085    protected void createSubscriberThread() {
086        String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
087        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
088        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
089        subscriberThread.setPriority(Thread.NORM_PRIORITY);
090        subscriberThread.start();
091    }
092
093    protected void subscribeToInvalidationChannel() {
094        log.info("Subscribe to channel: " + getChannelName());
095        redisExecutor.execute(jedis -> {
096            jedis.subscribe(new JedisPubSub() {
097                @Override
098                public void onMessage(String channel, String message) {
099                    try {
100                        RedisInvalidations rInvals = new RedisInvalidations(nodeId, message);
101
102                        if (log.isTraceEnabled()) {
103                            log.trace("Receive invalidations: " + rInvals);
104                        }
105                        receivedInvals.add(rInvals.getInvalidations());
106                    } catch (IllegalArgumentException e) {
107                        log.error("Fail to read message: " + message, e);
108                    }
109                }
110            }, getChannelName());
111            return null;
112        });
113    }
114
115    protected String getChannelName() {
116        return namespace + INVALIDATION_CHANNEL;
117    }
118
119    protected void registerNode() {
120        startedDateTime = getCurrentDateTime();
121        log.info("Registering node: " + nodeId);
122        redisExecutor.execute(jedis -> {
123            String key = getNodeKey();
124            Pipeline pipe = jedis.pipelined();
125            pipe.hset(key, "started", startedDateTime);
126            // Use an expiration so we can access info after a shutdown
127            pipe.expire(key, TIMEOUT_REGISTER_SECOND);
128            pipe.sync();
129            return null;
130        });
131    }
132
133    protected String getNodeKey() {
134        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
135    }
136
137    @Override
138    public void close() {
139        log.debug("Closing");
140        unsubscribeToInvalidationChannel();
141        // The Jedis pool is already closed when the repository is shutdowned
142        receivedInvals.clear();
143    }
144
145    protected void unsubscribeToInvalidationChannel() {
146        subscriberThread.interrupt();
147    }
148
149
150    @Override
151    public Invalidations receiveInvalidations() {
152        Invalidations ret = receivedInvals;
153        receivedInvals = new Invalidations();
154        return ret;
155    }
156
157    @Override
158    public void sendInvalidations(Invalidations invals) {
159        redisExecutor.execute(jedis -> {
160            RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals);
161            if (log.isTraceEnabled()) {
162                log.trace("Sending invalidations: " + rInvals);
163            }
164            String key = getNodeKey();
165
166            try {
167                Pipeline pipe = jedis.pipelined();
168                pipe.publish(getChannelName(), rInvals.serialize());
169                pipe.hset(key, STARTED_KEY, startedDateTime);
170                pipe.hset(key, LAST_INVAL_KEY, getCurrentDateTime());
171                pipe.expire(key, TIMEOUT_REGISTER_SECOND);
172                pipe.sync();
173                return null;
174            } catch (IOException e) {
175                throw new NuxeoException(e);
176            }
177        });
178    }
179
180    protected String getCurrentDateTime() {
181        return LocalDateTime.now().toString();
182    }
183}