001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.cluster; 020 021import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 022import static org.apache.commons.lang3.StringUtils.isNotBlank; 023import static org.nuxeo.runtime.model.Descriptor.UNIQUE_DESCRIPTOR_ID; 024 025import java.time.Duration; 026import java.time.Instant; 027import java.util.Map; 028import java.util.Random; 029 030import org.apache.logging.log4j.LogManager; 031import org.apache.logging.log4j.Logger; 032import org.nuxeo.runtime.RuntimeServiceException; 033import org.nuxeo.runtime.api.Framework; 034import org.nuxeo.runtime.capabilities.CapabilitiesService; 035import org.nuxeo.runtime.kv.KeyValueService; 036import org.nuxeo.runtime.kv.KeyValueStore; 037import org.nuxeo.runtime.model.ComponentContext; 038import org.nuxeo.runtime.model.DefaultComponent; 039import org.nuxeo.runtime.transaction.TransactionHelper; 040 041/** 042 * Implementation for the Cluster Service. 043 * 044 * @since 11.1 045 */ 046public class ClusterServiceImpl extends DefaultComponent implements ClusterService { 047 048 private static final Logger log = LogManager.getLogger(ClusterServiceImpl.class); 049 050 /** Very early as other services depend on us. */ 051 public static final int APPLICATION_STARTED_ORDER = -1000; 052 053 public static final String XP_CONFIG = "configuration"; 054 055 public static final String CLUSTERING_ENABLED_OLD_PROP = "repository.clustering.enabled"; 056 057 public static final String NODE_ID_OLD_PROP = "repository.clustering.id"; 058 059 /** @since 11.5 */ 060 public static final String CAPABILITY_CLUSTER = "cluster"; 061 062 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 063 064 protected boolean enabled; 065 066 protected String nodeId; 067 068 @Override 069 public int getApplicationStartedOrder() { 070 return APPLICATION_STARTED_ORDER; 071 } 072 073 @Override 074 public void start(ComponentContext context) { 075 ClusterNodeDescriptor descr = getDescriptor(XP_CONFIG, UNIQUE_DESCRIPTOR_ID); 076 077 // enabled 078 Boolean enabledProp = descr == null ? null : descr.getEnabled(); 079 if (enabledProp != null) { 080 enabled = enabledProp.booleanValue(); 081 } else { 082 // compat with old framework property 083 enabled = Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_OLD_PROP); 084 } 085 086 // node id 087 String id = descr == null ? null : defaultIfBlank(descr.getName(), null); 088 if (id != null) { 089 nodeId = id.trim(); 090 } else { 091 // compat with old framework property 092 id = Framework.getProperty(NODE_ID_OLD_PROP); 093 if (isNotBlank(id)) { 094 nodeId = id.trim(); 095 } else { 096 // use a random node id 097 long l; 098 do { 099 l = RANDOM.nextLong(); 100 } while (l < 0); // keep a positive value to avoid weird node ids 101 nodeId = String.valueOf(l); 102 if (enabled) { 103 log.warn("Missing cluster node id configuration, please define it explicitly. " 104 + "Using random cluster node id instead: {}", nodeId); 105 } else { 106 log.info("Using random cluster node id: {}", nodeId); 107 } 108 } 109 } 110 111 // capabilities 112 Framework.getService(CapabilitiesService.class) 113 .registerCapabilities(CAPABILITY_CLUSTER, Map.of("enabled", enabled, "nodeId", nodeId)); 114 115 super.start(context); 116 } 117 118 @Override 119 public boolean isEnabled() { 120 return enabled; 121 } 122 123 @Override 124 public String getNodeId() { 125 return nodeId; 126 } 127 128 /** Allows tests to set the node id without a reload. */ 129 public void setNodeId(String nodeId) { 130 if (!Framework.isTestModeSet()) { 131 throw new UnsupportedOperationException("test mode only"); 132 } 133 this.nodeId = nodeId; 134 } 135 136 @Override 137 public void runAtomically(String key, Duration duration, Duration pollDelay, Runnable runnable) { 138 if (!isEnabled()) { 139 runnable.run(); 140 return; 141 } 142 new ClusterLockHelper(getNodeId(), duration, pollDelay).runAtomically(key, runnable); 143 } 144 145 public static class ClusterLockHelper { 146 147 private static final Logger log = LogManager.getLogger(ClusterLockHelper.class); 148 149 public static final String KV_STORE_NAME = "cluster"; 150 151 // TTL set on the lock, to make it expire if the process crashes or gets stuck 152 // this is a multiplier of the duration during which we attempt to acquire the lock 153 private static final int TTL_MULTIPLIER = 10; 154 155 protected final String nodeId; 156 157 protected final Duration duration; 158 159 protected final Duration pollDelay; 160 161 protected final KeyValueStore kvStore; 162 163 public ClusterLockHelper(String nodeId, Duration duration, Duration pollDelay) { 164 this.nodeId = nodeId; 165 this.duration = duration; 166 this.pollDelay = pollDelay; 167 kvStore = Framework.getService(KeyValueService.class).getKeyValueStore(KV_STORE_NAME); 168 } 169 170 /** 171 * Runs a {@link Runnable} atomically in a cluster-wide critical section, outside a transaction. 172 */ 173 public void runAtomically(String key, Runnable runnable) { 174 runInSeparateTransaction(() -> runAtomicallyInternal(key, runnable)); 175 } 176 177 /** 178 * Runs a {@link Runnable} outside the current transaction (committing and finally restarting it if needed). 179 * 180 * @implSpec this is different from {@link TransactionHelper#runWithoutTransaction(Runnable)} because that one, 181 * in some implementations, may keep the current transaction and start the runnable in a new thread. 182 * Here we don't want a new thread or a risk of deadlock, so we just commit the original transaction. 183 */ 184 protected void runInSeparateTransaction(Runnable runnable) { 185 // check if there is a current transaction, before committing it 186 boolean transaction = TransactionHelper.isTransactionActiveOrMarkedRollback(); 187 if (transaction) { 188 TransactionHelper.commitOrRollbackTransaction(); 189 } 190 boolean completedAbruptly = true; 191 try { 192 if (transaction) { 193 TransactionHelper.runInTransaction(runnable); 194 } else { 195 runnable.run(); 196 } 197 completedAbruptly = false; 198 } finally { 199 if (transaction) { 200 // restart a transaction if there was one originally 201 try { 202 TransactionHelper.startTransaction(); 203 } finally { 204 if (completedAbruptly) { 205 // mark rollback-only if there was an exception 206 TransactionHelper.setTransactionRollbackOnly(); 207 } 208 } 209 } 210 } 211 } 212 213 /** 214 * Runs a {@link Runnable} atomically, in a cluster-wide critical section. 215 */ 216 protected void runAtomicallyInternal(String key, Runnable runnable) { 217 String lockInfo = tryLock(key); 218 if (lockInfo != null) { 219 try { 220 runnable.run(); 221 } finally { 222 unLock(key, lockInfo); 223 } 224 } else { 225 throw new RuntimeServiceException("Failed to acquire lock '" + key + "' after " + duration.toSeconds() 226 + "s, owner: " + getLock(key)); 227 } 228 } 229 230 // try to acquire the lock and fail if it takes too long 231 protected String tryLock(String key) { 232 log.debug("Trying to lock '{}'", key); 233 long deadline = System.nanoTime() + duration.toNanos(); 234 long ttl = duration.multipliedBy(TTL_MULTIPLIER).toSeconds(); 235 do { 236 // try to acquire the lock 237 String lockInfo = "node=" + nodeId + " time=" + Instant.now(); 238 if (kvStore.compareAndSet(key, null, lockInfo, ttl)) { 239 // lock acquired 240 log.debug("Lock '{}' acquired after {}ms", () -> key, 241 () -> (System.nanoTime() - (deadline - duration.toNanos())) / 1_000_000); 242 return lockInfo; 243 } 244 // wait a bit before retrying 245 log.debug(" Sleeping on busy lock '{}' for {}ms", () -> key, pollDelay::toMillis); 246 try { 247 Thread.sleep(pollDelay.toMillis()); 248 } catch (InterruptedException e) { 249 Thread.currentThread().interrupt(); 250 throw new RuntimeServiceException(e); 251 } 252 } while (System.nanoTime() < deadline); 253 log.debug("Failed to acquire lock '{}' after {}s", () -> key, duration::toSeconds); 254 return null; 255 } 256 257 protected void unLock(String key, String lockInfo) { 258 log.debug("Unlocking '{}'", key); 259 if (kvStore.compareAndSet(key, lockInfo, null)) { 260 return; 261 } 262 // couldn't remove the lock, it expired an may have been reacquired 263 String current = kvStore.getString(key); 264 if (current == null) { 265 // lock expired but was not reacquired 266 log.warn("Unlocking '{}' but the lock had already expired; " 267 + "consider increasing the try duration for this lock", key); 268 } else { 269 // lock expired and was reacquired 270 log.error("Failed to unlock '{}', the lock expired and has a new owner: {}; " 271 + "consider increasing the try duration for this lock", key, getLock(key)); 272 } 273 } 274 275 protected String getLock(String key) { 276 return kvStore.getString(key); 277 } 278 } 279 280}