001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.time.LocalDateTime;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.model.Repository;
032import org.nuxeo.ecm.core.redis.RedisAdmin;
033import org.nuxeo.ecm.core.redis.RedisExecutor;
034import org.nuxeo.ecm.core.storage.dbs.DBSClusterInvalidator;
035import org.nuxeo.ecm.core.storage.dbs.DBSInvalidations;
036import org.nuxeo.runtime.api.Framework;
037
038import redis.clients.jedis.JedisPubSub;
039
040/**
041 * Redis implementation of {@link DBSClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET
042 * to register nodes, only for debug purpose so far.
043 *
044 * @since 8.10
045 */
046public class RedisDBSClusterInvalidator implements DBSClusterInvalidator {
047
048    protected static final String PREFIX = "inval";
049
050    // PubSub channel: nuxeo:inval:<repositoryName>:channel
051    protected static final String INVALIDATION_CHANNEL = "channel";
052
053    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
054    protected static final String CLUSTER_NODES_KEY = "nodes";
055
056    // Keep info about a cluster node for one day
057    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
058
059    // Max delay to wait for a channel subscription
060    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
061
062    protected static final String STARTED_FIELD = "started";
063
064    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
065
066    protected String nodeId;
067
068    protected String repositoryName;
069
070    protected RedisExecutor redisExecutor;
071
072    protected DBSInvalidations receivedInvals;
073
074    protected Thread subscriberThread;
075
076    protected String namespace;
077
078    protected String startedDateTime;
079
080    private static final Log log = LogFactory.getLog(RedisDBSClusterInvalidator.class);
081
082    private CountDownLatch subscribeLatch;
083
084    private String registerSha;
085
086    private String sendSha;
087
088    @Override
089    public void initialize(String nodeId, Repository repository) {
090        this.nodeId = nodeId;
091        this.repositoryName = repository.getName();
092        redisExecutor = Framework.getService(RedisExecutor.class);
093        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
094        namespace = redisAdmin.namespace(PREFIX, repositoryName);
095        try {
096            registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
097            sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
098        } catch (IOException e) {
099            throw new RuntimeException(e);
100        }
101        receivedInvals = new DBSInvalidations();
102        createSubscriberThread();
103        registerNode();
104    }
105
106    protected void createSubscriberThread() {
107        subscribeLatch = new CountDownLatch(1);
108        String name = "RedisDBSClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
109        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
110        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
111        subscriberThread.setPriority(Thread.NORM_PRIORITY);
112        subscriberThread.start();
113        try {
114            if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
115                log.error("Redis channel subscripion timeout after " + TIMEOUT_SUBSCRIBE_SECOND
116                        + "s, continuing but this node may not receive cluster invalidations");
117            }
118        } catch (InterruptedException e) {
119            Thread.currentThread().interrupt();
120            throw new RuntimeException(e);
121        }
122    }
123
124    protected void subscribeToInvalidationChannel() {
125        log.info("Subscribing to channel: " + getChannelName());
126        redisExecutor.execute(jedis -> {
127            jedis.subscribe(new JedisPubSub() {
128                @Override
129                public void onSubscribe(String channel, int subscribedChannels) {
130                    super.onSubscribe(channel, subscribedChannels);
131                    if (subscribeLatch != null) {
132                        subscribeLatch.countDown();
133                    }
134                    if (log.isDebugEnabled()) {
135                        log.debug("Subscribed to channel: " + getChannelName());
136                    }
137                }
138
139                @Override
140                public void onMessage(String channel, String message) {
141                    try {
142                        RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, message);
143                        if (log.isTraceEnabled()) {
144                            log.trace("Receive invalidations: " + rInvals);
145                        }
146                        DBSInvalidations invals = rInvals.getInvalidations();
147                        synchronized (RedisDBSClusterInvalidator.this) {
148                            receivedInvals.add(invals);
149                        }
150                    } catch (IllegalArgumentException e) {
151                        log.error("Fail to read message: " + message, e);
152                    }
153                }
154            }, getChannelName());
155            return null;
156        });
157    }
158
159    protected String getChannelName() {
160        return namespace + INVALIDATION_CHANNEL;
161    }
162
163    protected void registerNode() {
164        startedDateTime = getCurrentDateTime();
165        List<String> keys = Collections.singletonList(getNodeKey());
166        List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime,
167                Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
168        if (log.isDebugEnabled()) {
169            log.debug("Registering node: " + nodeId);
170        }
171
172        redisExecutor.evalsha(registerSha, keys, args);
173        if (log.isInfoEnabled()) {
174            log.info("Node registered: " + nodeId);
175        }
176    }
177
178    protected String getNodeKey() {
179        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
180    }
181
182    @Override
183    public void close() {
184        log.debug("Closing");
185        unsubscribeToInvalidationChannel();
186        // The Jedis pool is already closed when the repository is shutdowned
187        receivedInvals.clear();
188    }
189
190    protected void unsubscribeToInvalidationChannel() {
191        subscriberThread.interrupt();
192    }
193
194    @Override
195    public DBSInvalidations receiveInvalidations() {
196        DBSInvalidations newInvals = new DBSInvalidations();
197        DBSInvalidations ret;
198        synchronized (this) {
199            ret = receivedInvals;
200            receivedInvals = newInvals;
201        }
202        return ret;
203    }
204
205    @Override
206    public void sendInvalidations(DBSInvalidations invals) {
207        RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, invals);
208        if (log.isTraceEnabled()) {
209            log.trace("Sending invalidations: " + rInvals);
210        }
211        List<String> keys = Arrays.asList(getChannelName(), getNodeKey());
212        List<String> args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD,
213                getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
214
215        redisExecutor.evalsha(sendSha, keys, args);
216        log.trace("invals sent");
217    }
218
219    protected String getCurrentDateTime() {
220        return LocalDateTime.now().toString();
221    }
222}