001/*
002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.cluster;
020
021import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
022import static org.apache.commons.lang3.StringUtils.isNotBlank;
023import static org.nuxeo.runtime.model.Descriptor.UNIQUE_DESCRIPTOR_ID;
024
025import java.time.Duration;
026import java.time.Instant;
027import java.util.Map;
028import java.util.Random;
029
030import org.apache.logging.log4j.LogManager;
031import org.apache.logging.log4j.Logger;
032import org.nuxeo.runtime.RuntimeServiceException;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.capabilities.CapabilitiesService;
035import org.nuxeo.runtime.kv.KeyValueService;
036import org.nuxeo.runtime.kv.KeyValueStore;
037import org.nuxeo.runtime.model.ComponentContext;
038import org.nuxeo.runtime.model.DefaultComponent;
039import org.nuxeo.runtime.transaction.TransactionHelper;
040
041/**
042 * Implementation for the Cluster Service.
043 *
044 * @since 11.1
045 */
046public class ClusterServiceImpl extends DefaultComponent implements ClusterService {
047
048    private static final Logger log = LogManager.getLogger(ClusterServiceImpl.class);
049
050    /** Very early as other services depend on us. */
051    public static final int APPLICATION_STARTED_ORDER = -1000;
052
053    public static final String XP_CONFIG = "configuration";
054
055    public static final String CLUSTERING_ENABLED_OLD_PROP = "repository.clustering.enabled";
056
057    public static final String NODE_ID_OLD_PROP = "repository.clustering.id";
058
059    /** @since 11.5 */
060    public static final String CAPABILITY_CLUSTER = "cluster";
061
062    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
063
064    protected boolean enabled;
065
066    protected String nodeId;
067
068    @Override
069    public int getApplicationStartedOrder() {
070        return APPLICATION_STARTED_ORDER;
071    }
072
073    @Override
074    public void start(ComponentContext context) {
075        ClusterNodeDescriptor descr = getDescriptor(XP_CONFIG, UNIQUE_DESCRIPTOR_ID);
076
077        // enabled
078        Boolean enabledProp = descr == null ? null : descr.getEnabled();
079        if (enabledProp != null) {
080            enabled = enabledProp.booleanValue();
081        } else {
082            // compat with old framework property
083            enabled = Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_OLD_PROP);
084        }
085
086        // node id
087        String id = descr == null ? null : defaultIfBlank(descr.getName(), null);
088        if (id != null) {
089            nodeId = id.trim();
090        } else {
091            // compat with old framework property
092            id = Framework.getProperty(NODE_ID_OLD_PROP);
093            if (isNotBlank(id)) {
094                nodeId = id.trim();
095            } else {
096                // use a random node id
097                long l;
098                do {
099                    l = RANDOM.nextLong();
100                } while (l < 0); // keep a positive value to avoid weird node ids
101                nodeId = String.valueOf(l);
102                if (enabled) {
103                    log.warn("Missing cluster node id configuration, please define it explicitly. "
104                            + "Using random cluster node id instead: {}", nodeId);
105                } else {
106                    log.info("Using random cluster node id: {}", nodeId);
107                }
108            }
109        }
110
111        // capabilities
112        Framework.getService(CapabilitiesService.class)
113                 .registerCapabilities(CAPABILITY_CLUSTER, Map.of("enabled", enabled, "nodeId", nodeId));
114
115        super.start(context);
116    }
117
118    @Override
119    public boolean isEnabled() {
120        return enabled;
121    }
122
123    @Override
124    public String getNodeId() {
125        return nodeId;
126    }
127
128    /** Allows tests to set the node id without a reload. */
129    public void setNodeId(String nodeId) {
130        if (!Framework.isTestModeSet()) {
131            throw new UnsupportedOperationException("test mode only");
132        }
133        this.nodeId = nodeId;
134    }
135
136    @Override
137    public void runAtomically(String key, Duration duration, Duration pollDelay, Runnable runnable) {
138        if (!isEnabled()) {
139            runnable.run();
140            return;
141        }
142        new ClusterLockHelper(getNodeId(), duration, pollDelay).runAtomically(key, runnable);
143    }
144
145    public static class ClusterLockHelper {
146
147        private static final Logger log = LogManager.getLogger(ClusterLockHelper.class);
148
149        public static final String KV_STORE_NAME = "cluster";
150
151        // TTL set on the lock, to make it expire if the process crashes or gets stuck
152        // this is a multiplier of the duration during which we attempt to acquire the lock
153        private static final int TTL_MULTIPLIER = 10;
154
155        protected final String nodeId;
156
157        protected final Duration duration;
158
159        protected final Duration pollDelay;
160
161        protected final KeyValueStore kvStore;
162
163        public ClusterLockHelper(String nodeId, Duration duration, Duration pollDelay) {
164            this.nodeId = nodeId;
165            this.duration = duration;
166            this.pollDelay = pollDelay;
167            kvStore = Framework.getService(KeyValueService.class).getKeyValueStore(KV_STORE_NAME);
168        }
169
170        /**
171         * Runs a {@link Runnable} atomically in a cluster-wide critical section, outside a transaction.
172         */
173        public void runAtomically(String key, Runnable runnable) {
174            runInSeparateTransaction(() -> runAtomicallyInternal(key, runnable));
175        }
176
177        /**
178         * Runs a {@link Runnable} outside the current transaction (committing and finally restarting it if needed).
179         *
180         * @implSpec this is different from {@link TransactionHelper#runWithoutTransaction(Runnable)} because that one,
181         *           in some implementations, may keep the current transaction and start the runnable in a new thread.
182         *           Here we don't want a new thread or a risk of deadlock, so we just commit the original transaction.
183         */
184        protected void runInSeparateTransaction(Runnable runnable) {
185            // check if there is a current transaction, before committing it
186            boolean transaction = TransactionHelper.isTransactionActiveOrMarkedRollback();
187            if (transaction) {
188                TransactionHelper.commitOrRollbackTransaction();
189            }
190            boolean completedAbruptly = true;
191            try {
192                if (transaction) {
193                    TransactionHelper.runInTransaction(runnable);
194                } else {
195                    runnable.run();
196                }
197                completedAbruptly = false;
198            } finally {
199                if (transaction) {
200                    // restart a transaction if there was one originally
201                    try {
202                        TransactionHelper.startTransaction();
203                    } finally {
204                        if (completedAbruptly) {
205                            // mark rollback-only if there was an exception
206                            TransactionHelper.setTransactionRollbackOnly();
207                        }
208                    }
209                }
210            }
211        }
212
213        /**
214         * Runs a {@link Runnable} atomically, in a cluster-wide critical section.
215         */
216        protected void runAtomicallyInternal(String key, Runnable runnable) {
217            String lockInfo = tryLock(key);
218            if (lockInfo != null) {
219                try {
220                    runnable.run();
221                } finally {
222                    unLock(key, lockInfo);
223                }
224            } else {
225                throw new RuntimeServiceException("Failed to acquire lock '" + key + "' after " + duration.toSeconds()
226                        + "s, owner: " + getLock(key));
227            }
228        }
229
230        // try to acquire the lock and fail if it takes too long
231        protected String tryLock(String key) {
232            log.debug("Trying to lock '{}'", key);
233            long deadline = System.nanoTime() + duration.toNanos();
234            long ttl = duration.multipliedBy(TTL_MULTIPLIER).toSeconds();
235            do {
236                // try to acquire the lock
237                String lockInfo = "node=" + nodeId + " time=" + Instant.now();
238                if (kvStore.compareAndSet(key, null, lockInfo, ttl)) {
239                    // lock acquired
240                    log.debug("Lock '{}' acquired after {}ms", () -> key,
241                            () -> (System.nanoTime() - (deadline - duration.toNanos())) / 1_000_000);
242                    return lockInfo;
243                }
244                // wait a bit before retrying
245                log.debug("  Sleeping on busy lock '{}' for {}ms", () -> key, pollDelay::toMillis);
246                try {
247                    Thread.sleep(pollDelay.toMillis());
248                } catch (InterruptedException e) {
249                    Thread.currentThread().interrupt();
250                    throw new RuntimeServiceException(e);
251                }
252            } while (System.nanoTime() < deadline);
253            log.debug("Failed to acquire lock '{}' after {}s", () -> key, duration::toSeconds);
254            return null;
255        }
256
257        protected void unLock(String key, String lockInfo) {
258            log.debug("Unlocking '{}'", key);
259            if (kvStore.compareAndSet(key, lockInfo, null)) {
260                return;
261            }
262            // couldn't remove the lock, it expired an may have been reacquired
263            String current = kvStore.getString(key);
264            if (current == null) {
265                // lock expired but was not reacquired
266                log.warn("Unlocking '{}' but the lock had already expired; "
267                        + "consider increasing the try duration for this lock", key);
268            } else {
269                // lock expired and was reacquired
270                log.error("Failed to unlock '{}', the lock expired and has a new owner: {}; "
271                        + "consider increasing the try duration for this lock", key, getLock(key));
272            }
273        }
274
275        protected String getLock(String key) {
276            return kvStore.getString(key);
277        }
278    }
279
280}