001/* 002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012package org.nuxeo.ecm.core.storage.sql; 013 014import java.io.Serializable; 015import java.sql.BatchUpdateException; 016import java.sql.SQLException; 017import java.util.ArrayList; 018import java.util.LinkedHashMap; 019import java.util.List; 020import java.util.Map.Entry; 021import java.util.concurrent.locks.ReentrantLock; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 026import org.nuxeo.ecm.core.api.Lock; 027import org.nuxeo.ecm.core.api.LockException; 028import org.nuxeo.ecm.core.api.NuxeoException; 029import org.nuxeo.ecm.core.model.LockManager; 030import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService; 031import org.nuxeo.runtime.api.Framework; 032 033/** 034 * Manager of locks that serializes access to them. 035 * <p> 036 * The public methods called by the session are {@link #setLock}, {@link #removeLock} and {@link #getLock}. Method 037 * {@link #shutdown} must be called when done with the lock manager. 038 * <p> 039 * In cluster mode, changes are executed in a begin/commit so that tests/updates can be atomic. 040 * <p> 041 * Transaction management can be done by hand because we're dealing with a low-level {@link Mapper} and not something 042 * wrapped by a JCA pool. 043 */ 044public class VCSLockManager implements LockManager { 045 046 private static final Log log = LogFactory.getLog(VCSLockManager.class); 047 048 public static final int LOCK_RETRIES = 10; 049 050 public static final long LOCK_SLEEP_DELAY = 1; // 1 ms 051 052 public static final long LOCK_SLEEP_INCREMENT = 50; // add 50 ms each time 053 054 protected final RepositoryImpl repository; 055 056 /** 057 * The mapper to use. In this mapper we only ever touch the lock table, so no need to deal with fulltext and complex 058 * saves, and we don't do prefetch. 059 */ 060 protected Mapper mapper; 061 062 /** 063 * If clustering is enabled then we have to wrap test/set and test/remove in a transaction. 064 */ 065 protected final boolean clusteringEnabled; 066 067 /** 068 * Lock serializing access to the mapper. 069 */ 070 protected final ReentrantLock serializationLock; 071 072 protected static final Lock NULL_LOCK = new Lock(null, null); 073 074 protected final boolean caching; 075 076 /** 077 * A cache of locks, used only in non-cluster mode, when this lock manager is the only one dealing with locks. 078 * <p> 079 * Used under {@link #serializationLock}. 080 */ 081 protected final LRUCache<Serializable, Lock> lockCache; 082 083 protected static final int CACHE_SIZE = 100; 084 085 protected static class LRUCache<K, V> extends LinkedHashMap<K, V> { 086 private static final long serialVersionUID = 1L; 087 088 private final int max; 089 090 public LRUCache(int max) { 091 super(max, 1.0f, true); 092 this.max = max; 093 } 094 095 @Override 096 protected boolean removeEldestEntry(Entry<K, V> eldest) { 097 return size() > max; 098 } 099 } 100 101 /** 102 * Creates a lock manager for the given repository. 103 * <p> 104 * The mapper will from then on be only used and closed by the lock manager. 105 * <p> 106 * {@link #close} must be called when done with the lock manager. 107 */ 108 public VCSLockManager(String repositoryName) { 109 SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class); 110 repository = repositoryService.getRepositoryImpl(repositoryName); 111 clusteringEnabled = repository.getRepositoryDescriptor().getClusteringEnabled(); 112 serializationLock = new ReentrantLock(); 113 caching = !clusteringEnabled; 114 lockCache = caching ? new LRUCache<Serializable, Lock>(CACHE_SIZE) : null; 115 } 116 117 /** 118 * Delay mapper acquisition until the repository has been fully initialized. 119 */ 120 protected Mapper getMapper() { 121 if (mapper == null) { 122 mapper = repository.newMapper(null, false); 123 } 124 return mapper; 125 } 126 127 protected Serializable idFromString(String id) { 128 return repository.getModel().idFromString(id); 129 } 130 131 @Override 132 public void closeLockManager() { 133 serializationLock.lock(); 134 try { 135 getMapper().close(); 136 } finally { 137 serializationLock.unlock(); 138 } 139 } 140 141 @Override 142 public Lock getLock(final String id) { 143 serializationLock.lock(); 144 try { 145 Lock lock; 146 if (caching && (lock = lockCache.get(id)) != null) { 147 return lock == NULL_LOCK ? null : lock; 148 } 149 // no transaction needed, single operation 150 lock = getMapper().getLock(idFromString(id)); 151 if (caching) { 152 lockCache.put(id, lock == null ? NULL_LOCK : lock); 153 } 154 return lock; 155 } finally { 156 serializationLock.unlock(); 157 } 158 } 159 160 @Override 161 public Lock setLock(String id, Lock lock) { 162 // We don't call addSuppressed() on an existing exception 163 // because constructing it beforehand when it most likely 164 // won't be needed is expensive. 165 List<Throwable> suppressed = new ArrayList<>(0); 166 long sleepDelay = LOCK_SLEEP_DELAY; 167 for (int i = 0; i < LOCK_RETRIES; i++) { 168 if (i > 0) { 169 log.debug("Retrying lock on " + id + ": try " + (i + 1)); 170 } 171 try { 172 return setLockInternal(id, lock); 173 } catch (NuxeoException e) { 174 suppressed.add(e); 175 if (shouldRetry(e)) { 176 // cluster: two simultaneous inserts 177 // retry 178 try { 179 Thread.sleep(sleepDelay); 180 } catch (InterruptedException ie) { 181 // restore interrupted status 182 Thread.currentThread().interrupt(); 183 throw new RuntimeException(ie); 184 } 185 sleepDelay += LOCK_SLEEP_INCREMENT; 186 continue; 187 } 188 // not something to retry 189 NuxeoException exception = new NuxeoException(e); 190 for (Throwable t : suppressed) { 191 exception.addSuppressed(t); 192 } 193 throw exception; 194 } 195 } 196 LockException exception = new LockException("Failed to lock " + id + ", too much concurrency (tried " 197 + LOCK_RETRIES + " times)"); 198 for (Throwable t : suppressed) { 199 exception.addSuppressed(t); 200 } 201 throw exception; 202 } 203 204 /** 205 * Does the exception mean that we should retry the transaction? 206 */ 207 protected boolean shouldRetry(Exception e) { 208 if (e instanceof ConcurrentUpdateException) { 209 return true; 210 } 211 Throwable t = e.getCause(); 212 if (t instanceof BatchUpdateException && t.getCause() != null) { 213 t = t.getCause(); 214 } 215 return t instanceof SQLException && shouldRetry((SQLException) t); 216 } 217 218 protected boolean shouldRetry(SQLException e) { 219 String sqlState = e.getSQLState(); 220 if ("23000".equals(sqlState)) { 221 // MySQL: Duplicate entry ... for key ... 222 // Oracle: unique constraint ... violated 223 // SQL Server: Violation of PRIMARY KEY constraint 224 return true; 225 } 226 if ("23001".equals(sqlState)) { 227 // H2: Unique index or primary key violation 228 return true; 229 } 230 if ("23505".equals(sqlState)) { 231 // PostgreSQL: duplicate key value violates unique constraint 232 return true; 233 } 234 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 235 // SQL Server: Snapshot isolation transaction aborted due to update 236 // conflict 237 return true; 238 } 239 return false; 240 } 241 242 protected Lock setLockInternal(String id, Lock lock) { 243 serializationLock.lock(); 244 try { 245 Lock oldLock; 246 if (caching && (oldLock = lockCache.get(id)) != null && oldLock != NULL_LOCK) { 247 return oldLock; 248 } 249 oldLock = getMapper().setLock(idFromString(id), lock); 250 if (caching && oldLock == null) { 251 lockCache.put(id, lock == null ? NULL_LOCK : lock); 252 } 253 return oldLock; 254 } finally { 255 serializationLock.unlock(); 256 } 257 } 258 259 @Override 260 public Lock removeLock(final String id, final String owner) { 261 serializationLock.lock(); 262 try { 263 Lock oldLock = null; 264 if (caching && (oldLock = lockCache.get(id)) == NULL_LOCK) { 265 return null; 266 } 267 if (oldLock != null && !LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 268 // existing mismatched lock, flag failure 269 oldLock = new Lock(oldLock, true); 270 } else { 271 if (oldLock == null) { 272 oldLock = getMapper().removeLock(idFromString(id), owner, false); 273 } else { 274 // we know the previous lock, we can force 275 // no transaction needed, single operation 276 getMapper().removeLock(idFromString(id), owner, true); 277 } 278 } 279 if (caching) { 280 if (oldLock != null && oldLock.getFailed()) { 281 // failed, but we now know the existing lock 282 lockCache.put(id, new Lock(oldLock, false)); 283 } else { 284 lockCache.put(id, NULL_LOCK); 285 } 286 } 287 return oldLock; 288 } finally { 289 serializationLock.unlock(); 290 } 291 } 292 293 @Override 294 public void clearLockManagerCaches() { 295 serializationLock.lock(); 296 try { 297 if (caching) { 298 lockCache.clear(); 299 } 300 } finally { 301 serializationLock.unlock(); 302 } 303 } 304 305 @Override 306 public String toString() { 307 return getClass().getSimpleName() + '(' + repository.getName() + ')'; 308 } 309 310}