001package org.nuxeo.ecm.core.redis.contribs;/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Benoit Delbosc
016 */
017
018import java.io.IOException;
019import java.time.LocalDateTime;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.api.NuxeoException;
024import org.nuxeo.ecm.core.redis.RedisAdmin;
025import org.nuxeo.ecm.core.redis.RedisExecutor;
026import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
027import org.nuxeo.ecm.core.storage.sql.Invalidations;
028import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
029import org.nuxeo.runtime.api.Framework;
030
031import redis.clients.jedis.JedisPubSub;
032import redis.clients.jedis.Pipeline;
033
034/**
035 * Redis implementation of {@link ClusterInvalidator}.
036 *
037 * Use a single channel pubsub to send invalidations.
038 * Use an HSET to register nodes, only for debug purpose so far.
039 *
040 * @since 7.4
041 */
042public class RedisClusterInvalidator implements ClusterInvalidator {
043
044    protected static final String PREFIX = "inval";
045
046    // PubSub channel: nuxeo:inval:<repositoryName>:channel
047    protected static final String INVALIDATION_CHANNEL = "channel";
048
049    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
050    protected static final String CLUSTER_NODES_KEY = "nodes";
051
052    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
053
054    protected static final String STARTED_KEY = "started";
055
056    protected static final String LAST_INVAL_KEY = "lastInvalSent";
057
058    protected String nodeId;
059
060    protected String repositoryName;
061
062    protected RedisExecutor redisExecutor;
063
064    protected Invalidations receivedInvals;
065
066    protected Thread subscriberThread;
067
068    protected String namespace;
069
070    protected String startedDateTime;
071
072    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
073
074    @Override
075    public void initialize(String nodeId, RepositoryImpl repository) {
076        this.nodeId = nodeId;
077        this.repositoryName = repository.getName();
078        redisExecutor = Framework.getLocalService(RedisExecutor.class);
079        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
080        namespace = redisAdmin.namespace(PREFIX, repositoryName);
081        receivedInvals = new Invalidations();
082        createSubscriberThread();
083        registerNode();
084    }
085
086    protected void createSubscriberThread() {
087        String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
088        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
089        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
090        subscriberThread.setPriority(Thread.NORM_PRIORITY);
091        subscriberThread.start();
092    }
093
094    protected void subscribeToInvalidationChannel() {
095        log.info("Subscribe to channel: " + getChannelName());
096        redisExecutor.execute(jedis -> {
097            jedis.subscribe(new JedisPubSub() {
098                @Override
099                public void onMessage(String channel, String message) {
100                    try {
101                        RedisInvalidations rInvals = new RedisInvalidations(nodeId, message);
102
103                        if (log.isTraceEnabled()) {
104                            log.trace("Receive invalidations: " + rInvals);
105                        }
106                        Invalidations invals = rInvals.getInvalidations();
107                        synchronized (RedisClusterInvalidator.this) {
108                            receivedInvals.add(invals);
109                        }
110                    } catch (IllegalArgumentException e) {
111                        log.error("Fail to read message: " + message, e);
112                    }
113                }
114            }, getChannelName());
115            return null;
116        });
117    }
118
119    protected String getChannelName() {
120        return namespace + INVALIDATION_CHANNEL;
121    }
122
123    protected void registerNode() {
124        startedDateTime = getCurrentDateTime();
125        log.info("Registering node: " + nodeId);
126        redisExecutor.execute(jedis -> {
127            String key = getNodeKey();
128            Pipeline pipe = jedis.pipelined();
129            pipe.hset(key, "started", startedDateTime);
130            // Use an expiration so we can access info after a shutdown
131            pipe.expire(key, TIMEOUT_REGISTER_SECOND);
132            pipe.sync();
133            return null;
134        });
135    }
136
137    protected String getNodeKey() {
138        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
139    }
140
141    @Override
142    public void close() {
143        log.debug("Closing");
144        unsubscribeToInvalidationChannel();
145        // The Jedis pool is already closed when the repository is shutdowned
146        receivedInvals.clear();
147    }
148
149    protected void unsubscribeToInvalidationChannel() {
150        subscriberThread.interrupt();
151    }
152
153
154    @Override
155    public Invalidations receiveInvalidations() {
156        Invalidations newInvals = new Invalidations();
157        Invalidations ret;
158        synchronized (this) {
159            ret = receivedInvals;
160            receivedInvals = newInvals;
161        }
162        return ret;
163    }
164
165    @Override
166    public void sendInvalidations(Invalidations invals) {
167        redisExecutor.execute(jedis -> {
168            RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals);
169            if (log.isTraceEnabled()) {
170                log.trace("Sending invalidations: " + rInvals);
171            }
172            String key = getNodeKey();
173
174            try {
175                Pipeline pipe = jedis.pipelined();
176                pipe.publish(getChannelName(), rInvals.serialize());
177                pipe.hset(key, STARTED_KEY, startedDateTime);
178                pipe.hset(key, LAST_INVAL_KEY, getCurrentDateTime());
179                pipe.expire(key, TIMEOUT_REGISTER_SECOND);
180                pipe.sync();
181                return null;
182            } catch (IOException e) {
183                throw new NuxeoException(e);
184            }
185        });
186    }
187
188    protected String getCurrentDateTime() {
189        return LocalDateTime.now().toString();
190    }
191}