001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.time.LocalDateTime;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.api.NuxeoException;
027import org.nuxeo.ecm.core.redis.RedisAdmin;
028import org.nuxeo.ecm.core.redis.RedisExecutor;
029import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
030import org.nuxeo.ecm.core.storage.sql.Invalidations;
031import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
032import org.nuxeo.runtime.api.Framework;
033
034import redis.clients.jedis.JedisPubSub;
035import redis.clients.jedis.Pipeline;
036
037/**
038 * Redis implementation of {@link ClusterInvalidator}.
039 *
040 * Use a single channel pubsub to send invalidations.
041 * Use an HSET to register nodes, only for debug purpose so far.
042 *
043 * @since 7.4
044 */
045public class RedisClusterInvalidator implements ClusterInvalidator {
046
047    protected static final String PREFIX = "inval";
048
049    // PubSub channel: nuxeo:inval:<repositoryName>:channel
050    protected static final String INVALIDATION_CHANNEL = "channel";
051
052    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
053    protected static final String CLUSTER_NODES_KEY = "nodes";
054
055    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
056
057    protected static final String STARTED_KEY = "started";
058
059    protected static final String LAST_INVAL_KEY = "lastInvalSent";
060
061    protected String nodeId;
062
063    protected String repositoryName;
064
065    protected RedisExecutor redisExecutor;
066
067    protected Invalidations receivedInvals;
068
069    protected Thread subscriberThread;
070
071    protected String namespace;
072
073    protected String startedDateTime;
074
075    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
076
077    @Override
078    public void initialize(String nodeId, RepositoryImpl repository) {
079        this.nodeId = nodeId;
080        this.repositoryName = repository.getName();
081        redisExecutor = Framework.getLocalService(RedisExecutor.class);
082        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
083        namespace = redisAdmin.namespace(PREFIX, repositoryName);
084        receivedInvals = new Invalidations();
085        createSubscriberThread();
086        registerNode();
087    }
088
089    protected void createSubscriberThread() {
090        String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
091        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
092        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
093        subscriberThread.setPriority(Thread.NORM_PRIORITY);
094        subscriberThread.start();
095    }
096
097    protected void subscribeToInvalidationChannel() {
098        log.info("Subscribe to channel: " + getChannelName());
099        redisExecutor.execute(jedis -> {
100            jedis.subscribe(new JedisPubSub() {
101                @Override
102                public void onMessage(String channel, String message) {
103                    try {
104                        RedisInvalidations rInvals = new RedisInvalidations(nodeId, message);
105
106                        if (log.isTraceEnabled()) {
107                            log.trace("Receive invalidations: " + rInvals);
108                        }
109                        Invalidations invals = rInvals.getInvalidations();
110                        synchronized (RedisClusterInvalidator.this) {
111                            receivedInvals.add(invals);
112                        }
113                    } catch (IllegalArgumentException e) {
114                        log.error("Fail to read message: " + message, e);
115                    }
116                }
117            }, getChannelName());
118            return null;
119        });
120    }
121
122    protected String getChannelName() {
123        return namespace + INVALIDATION_CHANNEL;
124    }
125
126    protected void registerNode() {
127        startedDateTime = getCurrentDateTime();
128        log.info("Registering node: " + nodeId);
129        redisExecutor.execute(jedis -> {
130            String key = getNodeKey();
131            Pipeline pipe = jedis.pipelined();
132            pipe.hset(key, "started", startedDateTime);
133            // Use an expiration so we can access info after a shutdown
134            pipe.expire(key, TIMEOUT_REGISTER_SECOND);
135            pipe.sync();
136            return null;
137        });
138    }
139
140    protected String getNodeKey() {
141        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
142    }
143
144    @Override
145    public void close() {
146        log.debug("Closing");
147        unsubscribeToInvalidationChannel();
148        // The Jedis pool is already closed when the repository is shutdowned
149        receivedInvals.clear();
150    }
151
152    protected void unsubscribeToInvalidationChannel() {
153        subscriberThread.interrupt();
154    }
155
156
157    @Override
158    public Invalidations receiveInvalidations() {
159        Invalidations newInvals = new Invalidations();
160        Invalidations ret;
161        synchronized (this) {
162            ret = receivedInvals;
163            receivedInvals = newInvals;
164        }
165        return ret;
166    }
167
168    @Override
169    public void sendInvalidations(Invalidations invals) {
170        redisExecutor.execute(jedis -> {
171            RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals);
172            if (log.isTraceEnabled()) {
173                log.trace("Sending invalidations: " + rInvals);
174            }
175            String key = getNodeKey();
176
177            try {
178                Pipeline pipe = jedis.pipelined();
179                pipe.publish(getChannelName(), rInvals.serialize());
180                pipe.hset(key, STARTED_KEY, startedDateTime);
181                pipe.hset(key, LAST_INVAL_KEY, getCurrentDateTime());
182                pipe.expire(key, TIMEOUT_REGISTER_SECOND);
183                pipe.sync();
184                return null;
185            } catch (IOException e) {
186                throw new NuxeoException(e);
187            }
188        });
189    }
190
191    protected String getCurrentDateTime() {
192        return LocalDateTime.now().toString();
193    }
194}