001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.IOException;
022import java.time.LocalDateTime;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.model.Repository;
032import org.nuxeo.ecm.core.redis.RedisAdmin;
033import org.nuxeo.ecm.core.redis.RedisExecutor;
034import org.nuxeo.ecm.core.storage.dbs.DBSClusterInvalidator;
035import org.nuxeo.ecm.core.storage.dbs.DBSInvalidations;
036import org.nuxeo.runtime.api.Framework;
037
038import redis.clients.jedis.JedisPubSub;
039
040/**
041 * Redis implementation of {@link DBSClusterInvalidator}. Use a single channel pubsub to send invalidations. Use an HSET
042 * to register nodes, only for debug purpose so far.
043 *
044 * @since 8.10
045 * @deprecated since 9.1, use DBSPubSubInvalidator instead
046 */
047@Deprecated
048public class RedisDBSClusterInvalidator implements DBSClusterInvalidator {
049
050    protected static final String PREFIX = "inval";
051
052    // PubSub channel: nuxeo:inval:<repositoryName>:channel
053    protected static final String INVALIDATION_CHANNEL = "channel";
054
055    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
056    protected static final String CLUSTER_NODES_KEY = "nodes";
057
058    // Keep info about a cluster node for one day
059    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
060
061    // Max delay to wait for a channel subscription
062    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
063
064    protected static final String STARTED_FIELD = "started";
065
066    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
067
068    protected String nodeId;
069
070    protected String repositoryName;
071
072    protected RedisExecutor redisExecutor;
073
074    protected DBSInvalidations receivedInvals;
075
076    protected Thread subscriberThread;
077
078    protected String namespace;
079
080    protected String startedDateTime;
081
082    private static final Log log = LogFactory.getLog(RedisDBSClusterInvalidator.class);
083
084    private CountDownLatch subscribeLatch;
085
086    private String registerSha;
087
088    private String sendSha;
089
090    @Override
091    public void initialize(String nodeId, String repositoryName) {
092        this.nodeId = nodeId;
093        this.repositoryName = repositoryName;
094        redisExecutor = Framework.getService(RedisExecutor.class);
095        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
096        namespace = redisAdmin.namespace(PREFIX, repositoryName);
097        try {
098            registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
099            sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
100        } catch (IOException e) {
101            throw new RuntimeException(e);
102        }
103        receivedInvals = new DBSInvalidations();
104        createSubscriberThread();
105        registerNode();
106    }
107
108    protected void createSubscriberThread() {
109        subscribeLatch = new CountDownLatch(1);
110        String name = "RedisDBSClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
111        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
112        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
113        subscriberThread.setPriority(Thread.NORM_PRIORITY);
114        subscriberThread.start();
115        try {
116            if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
117                log.error("Redis channel subscription timeout after " + TIMEOUT_SUBSCRIBE_SECOND
118                        + "s, continuing but this node may not receive cluster invalidations");
119            }
120        } catch (InterruptedException e) {
121            Thread.currentThread().interrupt();
122            throw new RuntimeException(e);
123        }
124    }
125
126    protected void subscribeToInvalidationChannel() {
127        log.info("Subscribing to channel: " + getChannelName());
128        redisExecutor.subscribe(new JedisPubSub() {
129                @Override
130                public void onSubscribe(String channel, int subscribedChannels) {
131                    super.onSubscribe(channel, subscribedChannels);
132                    if (subscribeLatch != null) {
133                        subscribeLatch.countDown();
134                    }
135                    if (log.isDebugEnabled()) {
136                        log.debug("Subscribed to channel: " + getChannelName());
137                    }
138                }
139
140                @Override
141                public void onMessage(String channel, String message) {
142                    try {
143                        RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, message);
144                        if (log.isTraceEnabled()) {
145                            log.trace("Receive invalidations: " + rInvals);
146                        }
147                        DBSInvalidations invals = rInvals.getInvalidations();
148                        synchronized (RedisDBSClusterInvalidator.this) {
149                            receivedInvals.add(invals);
150                        }
151                    } catch (IllegalArgumentException e) {
152                        log.error("Fail to read message: " + message, e);
153                    }
154                }
155            }, getChannelName());
156    }
157
158    protected String getChannelName() {
159        return namespace + INVALIDATION_CHANNEL;
160    }
161
162    protected void registerNode() {
163        startedDateTime = getCurrentDateTime();
164        List<String> keys = Collections.singletonList(getNodeKey());
165        List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime,
166                Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
167        if (log.isDebugEnabled()) {
168            log.debug("Registering node: " + nodeId);
169        }
170
171        redisExecutor.evalsha(registerSha, keys, args);
172        if (log.isInfoEnabled()) {
173            log.info("Node registered: " + nodeId);
174        }
175    }
176
177    protected String getNodeKey() {
178        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
179    }
180
181    @Override
182    public void close() {
183        log.debug("Closing");
184        unsubscribeToInvalidationChannel();
185        // The Jedis pool is already closed when the repository is shutdowned
186        receivedInvals.clear();
187    }
188
189    protected void unsubscribeToInvalidationChannel() {
190        subscriberThread.interrupt();
191    }
192
193    @Override
194    public DBSInvalidations receiveInvalidations() {
195        DBSInvalidations newInvals = new DBSInvalidations();
196        DBSInvalidations ret;
197        synchronized (this) {
198            ret = receivedInvals;
199            receivedInvals = newInvals;
200        }
201        return ret;
202    }
203
204    @Override
205    public void sendInvalidations(DBSInvalidations invals) {
206        RedisDBSInvalidations rInvals = new RedisDBSInvalidations(nodeId, invals);
207        if (log.isTraceEnabled()) {
208            log.trace("Sending invalidations: " + rInvals);
209        }
210        List<String> keys = Arrays.asList(getChannelName(), getNodeKey());
211        List<String> args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime, LAST_INVAL_FIELD,
212                getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
213
214        redisExecutor.evalsha(sendSha, keys, args);
215        log.trace("invals sent");
216    }
217
218    protected String getCurrentDateTime() {
219        return LocalDateTime.now().toString();
220    }
221}