001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.api.NuxeoException;
024import org.nuxeo.ecm.core.redis.RedisAdmin;
025import org.nuxeo.ecm.core.redis.RedisExecutor;
026import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
027import org.nuxeo.ecm.core.storage.sql.Invalidations;
028import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
029import org.nuxeo.runtime.api.Framework;
030import redis.clients.jedis.JedisPubSub;
031
032import java.io.IOException;
033import java.time.LocalDateTime;
034import java.util.Arrays;
035import java.util.List;
036import java.util.concurrent.CountDownLatch;
037import java.util.concurrent.TimeUnit;
038
039/**
040 * Redis implementation of {@link ClusterInvalidator}.
041 *
042 * Use a single channel pubsub to send invalidations.
043 * Use an HSET to register nodes, only for debug purpose so far.
044 *
045 * @since 7.4
046 */
047public class RedisClusterInvalidator implements ClusterInvalidator {
048
049    protected static final String PREFIX = "inval";
050
051    // PubSub channel: nuxeo:inval:<repositoryName>:channel
052    protected static final String INVALIDATION_CHANNEL = "channel";
053
054    // Node HSET key: nuxeo:inval:<repositoryName>:nodes:<nodeId>
055    protected static final String CLUSTER_NODES_KEY = "nodes";
056
057    // Keep info about a cluster node for one day
058    protected static final int TIMEOUT_REGISTER_SECOND = 24 * 3600;
059
060    // Max delay to wait for a channel subscription
061    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
062
063    protected static final String STARTED_FIELD = "started";
064
065    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
066
067    protected String nodeId;
068
069    protected String repositoryName;
070
071    protected RedisExecutor redisExecutor;
072
073    protected Invalidations receivedInvals;
074
075    protected Thread subscriberThread;
076
077    protected String namespace;
078
079    protected String startedDateTime;
080
081    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
082
083    private CountDownLatch subscribeLatch;
084
085    private String registerSha;
086    private String sendSha;
087
088    @Override
089    public void initialize(String nodeId, RepositoryImpl repository) {
090        this.nodeId = nodeId;
091        this.repositoryName = repository.getName();
092        redisExecutor = Framework.getLocalService(RedisExecutor.class);
093        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
094        namespace = redisAdmin.namespace(PREFIX, repositoryName);
095        try {
096            registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
097            sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
098        } catch (IOException e) {
099            throw new RuntimeException(e);
100        }
101        receivedInvals = new Invalidations();
102        createSubscriberThread();
103        registerNode();
104    }
105
106    protected void createSubscriberThread() {
107        subscribeLatch = new CountDownLatch(1);
108        String name = "RedisClusterInvalidatorSubscriber:" + repositoryName + ":" + nodeId;
109        subscriberThread = new Thread(this::subscribeToInvalidationChannel, name);
110        subscriberThread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught error on thread " + t.getName(), e));
111        subscriberThread.setPriority(Thread.NORM_PRIORITY);
112        subscriberThread.start();
113        try {
114            if (!subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
115                log.error("Redis channel subscripion timeout after " + TIMEOUT_SUBSCRIBE_SECOND +
116                        "s, continuing but this node may not receive cluster invalidations");
117            }
118        } catch (InterruptedException e) {
119            Thread.currentThread().interrupt();
120            throw new RuntimeException(e);
121        }
122    }
123
124    protected void subscribeToInvalidationChannel() {
125        log.info("Subscribing to channel: " + getChannelName());
126        redisExecutor.execute(jedis -> {
127            jedis.subscribe(new JedisPubSub() {
128                @Override
129                public void onSubscribe(String channel, int subscribedChannels) {
130                    super.onSubscribe(channel, subscribedChannels);
131                    if (subscribeLatch != null) {
132                        subscribeLatch.countDown();
133                    }
134                    log.debug("Subscribed to channel: " + getChannelName());
135                }
136
137                @Override
138                public void onMessage(String channel, String message) {
139                    try {
140                        RedisInvalidations rInvals = new RedisInvalidations(nodeId, message);
141                        if (log.isTraceEnabled()) {
142                            log.trace("Receive invalidations: " + rInvals);
143                        }
144                        Invalidations invals = rInvals.getInvalidations();
145                        synchronized (RedisClusterInvalidator.this) {
146                            receivedInvals.add(invals);
147                        }
148                    } catch (IllegalArgumentException e) {
149                        log.error("Fail to read message: " + message, e);
150                    }
151                }
152            }, getChannelName());
153            return null;
154        });
155    }
156
157    protected String getChannelName() {
158        return namespace + INVALIDATION_CHANNEL;
159    }
160
161    protected void registerNode() {
162        final String startedDateTime = getCurrentDateTime();
163        final List<String> keys = Arrays.asList(getNodeKey());
164        final List<String> args = Arrays.asList(STARTED_FIELD, startedDateTime,
165                Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
166        log.debug("Registering node: " + nodeId);
167
168        redisExecutor.execute(jedis -> {
169            jedis.evalsha(registerSha, keys, args);
170            log.info("Node registered: " + nodeId);
171            return null;
172        });
173    }
174
175    protected String getNodeKey() {
176        return namespace + CLUSTER_NODES_KEY + ":" + nodeId;
177    }
178
179    @Override
180    public void close() {
181        log.debug("Closing");
182        unsubscribeToInvalidationChannel();
183        // The Jedis pool is already closed when the repository is shutdowned
184        receivedInvals.clear();
185    }
186
187    protected void unsubscribeToInvalidationChannel() {
188        subscriberThread.interrupt();
189    }
190
191
192    @Override
193    public Invalidations receiveInvalidations() {
194        Invalidations newInvals = new Invalidations();
195        Invalidations ret;
196        synchronized (this) {
197            ret = receivedInvals;
198            receivedInvals = newInvals;
199        }
200        return ret;
201    }
202
203    @Override
204    public void sendInvalidations(Invalidations invals) {
205        final String startedDateTime = getCurrentDateTime();
206        RedisInvalidations rInvals = new RedisInvalidations(nodeId, invals);
207        if (log.isTraceEnabled()) {
208            log.trace("Sending invalidations: " + rInvals);
209        }
210        final List<String> keys = Arrays.asList(getChannelName(), getNodeKey());
211        final List<String> args;
212        try {
213            args = Arrays.asList(rInvals.serialize(), STARTED_FIELD, startedDateTime,
214                    LAST_INVAL_FIELD, getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
215        } catch (IOException e) {
216            throw new NuxeoException(e);
217        }
218
219        redisExecutor.execute(jedis -> {
220            jedis.evalsha(sendSha, keys, args);
221            if (log.isTraceEnabled()) {
222                log.trace("invals sent");
223            }
224            return null;
225        });
226    }
227
228    protected String getCurrentDateTime() {
229        return LocalDateTime.now().toString();
230    }
231}