001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.Serializable; 022import java.sql.BatchUpdateException; 023import java.sql.SQLException; 024import java.util.ArrayList; 025import java.util.LinkedHashMap; 026import java.util.List; 027import java.util.Map.Entry; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 033import org.nuxeo.ecm.core.api.Lock; 034import org.nuxeo.ecm.core.api.LockException; 035import org.nuxeo.ecm.core.api.NuxeoException; 036import org.nuxeo.ecm.core.model.LockManager; 037import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService; 038import org.nuxeo.runtime.api.Framework; 039 040/** 041 * Manager of locks that serializes access to them. 042 * <p> 043 * The public methods called by the session are {@link #setLock}, {@link #removeLock} and {@link #getLock}. Method 044 * {@link #shutdown} must be called when done with the lock manager. 045 * <p> 046 * In cluster mode, changes are executed in a begin/commit so that tests/updates can be atomic. 047 * <p> 048 * Transaction management can be done by hand because we're dealing with a low-level {@link Mapper} and not something 049 * wrapped by a JCA pool. 050 */ 051public class VCSLockManager implements LockManager { 052 053 private static final Log log = LogFactory.getLog(VCSLockManager.class); 054 055 public static final int LOCK_RETRIES = 10; 056 057 public static final long LOCK_SLEEP_DELAY = 1; // 1 ms 058 059 public static final long LOCK_SLEEP_INCREMENT = 50; // add 50 ms each time 060 061 protected final RepositoryImpl repository; 062 063 /** 064 * The mapper to use. In this mapper we only ever touch the lock table, so no need to deal with fulltext and complex 065 * saves, and we don't do prefetch. 066 */ 067 protected Mapper mapper; 068 069 /** 070 * If clustering is enabled then we have to wrap test/set and test/remove in a transaction. 071 */ 072 protected final boolean clusteringEnabled; 073 074 /** 075 * Lock serializing access to the mapper. 076 */ 077 protected final ReentrantLock serializationLock; 078 079 protected static final Lock NULL_LOCK = new Lock(null, null); 080 081 protected final boolean caching; 082 083 /** 084 * A cache of locks, used only in non-cluster mode, when this lock manager is the only one dealing with locks. 085 * <p> 086 * Used under {@link #serializationLock}. 087 */ 088 protected final LRUCache<Serializable, Lock> lockCache; 089 090 protected static final int CACHE_SIZE = 100; 091 092 protected static class LRUCache<K, V> extends LinkedHashMap<K, V> { 093 private static final long serialVersionUID = 1L; 094 095 private final int max; 096 097 public LRUCache(int max) { 098 super(max, 1.0f, true); 099 this.max = max; 100 } 101 102 @Override 103 protected boolean removeEldestEntry(Entry<K, V> eldest) { 104 return size() > max; 105 } 106 } 107 108 /** 109 * Creates a lock manager for the given repository. 110 * <p> 111 * The mapper will from then on be only used and closed by the lock manager. 112 * <p> 113 * {@link #close} must be called when done with the lock manager. 114 */ 115 public VCSLockManager(String repositoryName) { 116 this(Framework.getService(SQLRepositoryService.class).getRepositoryImpl(repositoryName)); 117 } 118 119 /** 120 * Creates a lock manager for the given repository. 121 * <p> 122 * The mapper will from then on be only used and closed by the lock manager. 123 * <p> 124 * {@link #close} must be called when done with the lock manager. 125 * 126 * @since 9.3 127 */ 128 public VCSLockManager(RepositoryImpl repository) { 129 this.repository = repository; 130 clusteringEnabled = repository.getRepositoryDescriptor().getClusteringEnabled(); 131 serializationLock = new ReentrantLock(); 132 caching = !clusteringEnabled; 133 lockCache = caching ? new LRUCache<Serializable, Lock>(CACHE_SIZE) : null; 134 } 135 136 /** 137 * Delay mapper acquisition until the repository has been fully initialized. 138 */ 139 protected Mapper getMapper() { 140 if (mapper == null) { 141 mapper = repository.newMapper(null, false); 142 } 143 return mapper; 144 } 145 146 protected Serializable idFromString(String id) { 147 return repository.getModel().idFromString(id); 148 } 149 150 @Override 151 public void closeLockManager() { 152 serializationLock.lock(); 153 try { 154 if (mapper != null) { 155 getMapper().close(); 156 } 157 } finally { 158 serializationLock.unlock(); 159 } 160 } 161 162 @Override 163 public Lock getLock(final String id) { 164 serializationLock.lock(); 165 try { 166 Lock lock; 167 if (caching && (lock = lockCache.get(id)) != null) { 168 return lock == NULL_LOCK ? null : lock; 169 } 170 // no transaction needed, single operation 171 lock = getMapper().getLock(idFromString(id)); 172 if (caching) { 173 lockCache.put(id, lock == null ? NULL_LOCK : lock); 174 } 175 return lock; 176 } finally { 177 serializationLock.unlock(); 178 } 179 } 180 181 @Override 182 public Lock setLock(String id, Lock lock) { 183 // We don't call addSuppressed() on an existing exception 184 // because constructing it beforehand when it most likely 185 // won't be needed is expensive. 186 List<Throwable> suppressed = new ArrayList<>(0); 187 long sleepDelay = LOCK_SLEEP_DELAY; 188 for (int i = 0; i < LOCK_RETRIES; i++) { 189 if (i > 0) { 190 log.debug("Retrying lock on " + id + ": try " + (i + 1)); 191 } 192 try { 193 return setLockInternal(id, lock); 194 } catch (NuxeoException e) { 195 suppressed.add(e); 196 if (shouldRetry(e)) { 197 // cluster: two simultaneous inserts 198 // retry 199 try { 200 Thread.sleep(sleepDelay); 201 } catch (InterruptedException ie) { 202 Thread.currentThread().interrupt(); 203 throw new RuntimeException(ie); 204 } 205 sleepDelay += LOCK_SLEEP_INCREMENT; 206 continue; 207 } 208 // not something to retry 209 NuxeoException exception = new NuxeoException(e); 210 for (Throwable t : suppressed) { 211 exception.addSuppressed(t); 212 } 213 throw exception; 214 } 215 } 216 LockException exception = new LockException("Failed to lock " + id + ", too much concurrency (tried " 217 + LOCK_RETRIES + " times)"); 218 for (Throwable t : suppressed) { 219 exception.addSuppressed(t); 220 } 221 throw exception; 222 } 223 224 /** 225 * Does the exception mean that we should retry the transaction? 226 */ 227 protected boolean shouldRetry(Exception e) { 228 if (e instanceof ConcurrentUpdateException) { 229 return true; 230 } 231 Throwable t = e.getCause(); 232 if (t instanceof BatchUpdateException && t.getCause() != null) { 233 t = t.getCause(); 234 } 235 return t instanceof SQLException && shouldRetry((SQLException) t); 236 } 237 238 protected boolean shouldRetry(SQLException e) { 239 String sqlState = e.getSQLState(); 240 if ("23000".equals(sqlState)) { 241 // MySQL: Duplicate entry ... for key ... 242 // Oracle: unique constraint ... violated 243 // SQL Server: Violation of PRIMARY KEY constraint 244 return true; 245 } 246 if ("23001".equals(sqlState)) { 247 // H2: Unique index or primary key violation 248 return true; 249 } 250 if ("23505".equals(sqlState)) { 251 // PostgreSQL: duplicate key value violates unique constraint 252 return true; 253 } 254 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 255 // SQL Server: Snapshot isolation transaction aborted due to update 256 // conflict 257 return true; 258 } 259 return false; 260 } 261 262 protected Lock setLockInternal(String id, Lock lock) { 263 serializationLock.lock(); 264 try { 265 Lock oldLock; 266 if (caching && (oldLock = lockCache.get(id)) != null && oldLock != NULL_LOCK) { 267 return oldLock; 268 } 269 oldLock = getMapper().setLock(idFromString(id), lock); 270 if (caching && oldLock == null) { 271 lockCache.put(id, lock == null ? NULL_LOCK : lock); 272 } 273 return oldLock; 274 } finally { 275 serializationLock.unlock(); 276 } 277 } 278 279 @Override 280 public Lock removeLock(final String id, final String owner) { 281 serializationLock.lock(); 282 try { 283 Lock oldLock = null; 284 if (caching && (oldLock = lockCache.get(id)) == NULL_LOCK) { 285 return null; 286 } 287 if (oldLock != null && !LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 288 // existing mismatched lock, flag failure 289 oldLock = new Lock(oldLock, true); 290 } else { 291 if (oldLock == null) { 292 oldLock = getMapper().removeLock(idFromString(id), owner, false); 293 } else { 294 // we know the previous lock, we can force 295 // no transaction needed, single operation 296 getMapper().removeLock(idFromString(id), owner, true); 297 } 298 } 299 if (caching) { 300 if (oldLock != null && oldLock.getFailed()) { 301 // failed, but we now know the existing lock 302 lockCache.put(id, new Lock(oldLock, false)); 303 } else { 304 lockCache.put(id, NULL_LOCK); 305 } 306 } 307 return oldLock; 308 } finally { 309 serializationLock.unlock(); 310 } 311 } 312 313 @Override 314 public void clearLockManagerCaches() { 315 serializationLock.lock(); 316 try { 317 if (caching) { 318 lockCache.clear(); 319 } 320 } finally { 321 serializationLock.unlock(); 322 } 323 } 324 325 @Override 326 public String toString() { 327 return getClass().getSimpleName() + '(' + repository.getName() + ')'; 328 } 329 330}