001/*
002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.ecm.core.storage.sql;
013
014import java.io.Serializable;
015import java.sql.BatchUpdateException;
016import java.sql.SQLException;
017import java.util.ArrayList;
018import java.util.LinkedHashMap;
019import java.util.List;
020import java.util.Map.Entry;
021import java.util.concurrent.locks.ReentrantLock;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
026import org.nuxeo.ecm.core.api.Lock;
027import org.nuxeo.ecm.core.api.LockException;
028import org.nuxeo.ecm.core.api.NuxeoException;
029import org.nuxeo.ecm.core.model.LockManager;
030import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService;
031import org.nuxeo.runtime.api.Framework;
032
033/**
034 * Manager of locks that serializes access to them.
035 * <p>
036 * The public methods called by the session are {@link #setLock}, {@link #removeLock} and {@link #getLock}. Method
037 * {@link #shutdown} must be called when done with the lock manager.
038 * <p>
039 * In cluster mode, changes are executed in a begin/commit so that tests/updates can be atomic.
040 * <p>
041 * Transaction management can be done by hand because we're dealing with a low-level {@link Mapper} and not something
042 * wrapped by a JCA pool.
043 */
044public class VCSLockManager implements LockManager {
045
046    private static final Log log = LogFactory.getLog(VCSLockManager.class);
047
048    public static final int LOCK_RETRIES = 10;
049
050    public static final long LOCK_SLEEP_DELAY = 1; // 1 ms
051
052    public static final long LOCK_SLEEP_INCREMENT = 50; // add 50 ms each time
053
054    protected final RepositoryImpl repository;
055
056    /**
057     * The mapper to use. In this mapper we only ever touch the lock table, so no need to deal with fulltext and complex
058     * saves, and we don't do prefetch.
059     */
060    protected Mapper mapper;
061
062    /**
063     * If clustering is enabled then we have to wrap test/set and test/remove in a transaction.
064     */
065    protected final boolean clusteringEnabled;
066
067    /**
068     * Lock serializing access to the mapper.
069     */
070    protected final ReentrantLock serializationLock;
071
072    protected static final Lock NULL_LOCK = new Lock(null, null);
073
074    protected final boolean caching;
075
076    /**
077     * A cache of locks, used only in non-cluster mode, when this lock manager is the only one dealing with locks.
078     * <p>
079     * Used under {@link #serializationLock}.
080     */
081    protected final LRUCache<Serializable, Lock> lockCache;
082
083    protected static final int CACHE_SIZE = 100;
084
085    protected static class LRUCache<K, V> extends LinkedHashMap<K, V> {
086        private static final long serialVersionUID = 1L;
087
088        private final int max;
089
090        public LRUCache(int max) {
091            super(max, 1.0f, true);
092            this.max = max;
093        }
094
095        @Override
096        protected boolean removeEldestEntry(Entry<K, V> eldest) {
097            return size() > max;
098        }
099    }
100
101    /**
102     * Creates a lock manager for the given repository.
103     * <p>
104     * The mapper will from then on be only used and closed by the lock manager.
105     * <p>
106     * {@link #close} must be called when done with the lock manager.
107     */
108    public VCSLockManager(String repositoryName) {
109        SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class);
110        repository = repositoryService.getRepositoryImpl(repositoryName);
111        clusteringEnabled = repository.getRepositoryDescriptor().getClusteringEnabled();
112        serializationLock = new ReentrantLock();
113        caching = !clusteringEnabled;
114        lockCache = caching ? new LRUCache<Serializable, Lock>(CACHE_SIZE) : null;
115    }
116
117    /**
118     * Delay mapper acquisition until the repository has been fully initialized.
119     */
120    protected Mapper getMapper() {
121        if (mapper == null) {
122            mapper = repository.newMapper(null, false);
123        }
124        return mapper;
125    }
126
127    protected Serializable idFromString(String id) {
128        return repository.getModel().idFromString(id);
129    }
130
131    @Override
132    public void closeLockManager() {
133        serializationLock.lock();
134        try {
135            getMapper().close();
136        } finally {
137            serializationLock.unlock();
138        }
139    }
140
141    @Override
142    public Lock getLock(final String id) {
143        serializationLock.lock();
144        try {
145            Lock lock;
146            if (caching && (lock = lockCache.get(id)) != null) {
147                return lock == NULL_LOCK ? null : lock;
148            }
149            // no transaction needed, single operation
150            lock = getMapper().getLock(idFromString(id));
151            if (caching) {
152                lockCache.put(id, lock == null ? NULL_LOCK : lock);
153            }
154            return lock;
155        } finally {
156            serializationLock.unlock();
157        }
158    }
159
160    @Override
161    public Lock setLock(String id, Lock lock) {
162        // We don't call addSuppressed() on an existing exception
163        // because constructing it beforehand when it most likely
164        // won't be needed is expensive.
165        List<Throwable> suppressed = new ArrayList<>(0);
166        long sleepDelay = LOCK_SLEEP_DELAY;
167        for (int i = 0; i < LOCK_RETRIES; i++) {
168            if (i > 0) {
169                log.debug("Retrying lock on " + id + ": try " + (i + 1));
170            }
171            try {
172                return setLockInternal(id, lock);
173            } catch (NuxeoException e) {
174                suppressed.add(e);
175                if (shouldRetry(e)) {
176                    // cluster: two simultaneous inserts
177                    // retry
178                    try {
179                        Thread.sleep(sleepDelay);
180                    } catch (InterruptedException ie) {
181                        // restore interrupted status
182                        Thread.currentThread().interrupt();
183                        throw new RuntimeException(ie);
184                    }
185                    sleepDelay += LOCK_SLEEP_INCREMENT;
186                    continue;
187                }
188                // not something to retry
189                NuxeoException exception = new NuxeoException(e);
190                for (Throwable t : suppressed) {
191                    exception.addSuppressed(t);
192                }
193                throw exception;
194            }
195        }
196        LockException exception = new LockException("Failed to lock " + id + ", too much concurrency (tried "
197                + LOCK_RETRIES + " times)");
198        for (Throwable t : suppressed) {
199            exception.addSuppressed(t);
200        }
201        throw exception;
202    }
203
204    /**
205     * Does the exception mean that we should retry the transaction?
206     */
207    protected boolean shouldRetry(Exception e) {
208        if (e instanceof ConcurrentUpdateException) {
209            return true;
210        }
211        Throwable t = e.getCause();
212        if (t instanceof BatchUpdateException && t.getCause() != null) {
213            t = t.getCause();
214        }
215        return t instanceof SQLException && shouldRetry((SQLException) t);
216    }
217
218    protected boolean shouldRetry(SQLException e) {
219        String sqlState = e.getSQLState();
220        if ("23000".equals(sqlState)) {
221            // MySQL: Duplicate entry ... for key ...
222            // Oracle: unique constraint ... violated
223            // SQL Server: Violation of PRIMARY KEY constraint
224            return true;
225        }
226        if ("23001".equals(sqlState)) {
227            // H2: Unique index or primary key violation
228            return true;
229        }
230        if ("23505".equals(sqlState)) {
231            // PostgreSQL: duplicate key value violates unique constraint
232            return true;
233        }
234        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
235            // SQL Server: Snapshot isolation transaction aborted due to update
236            // conflict
237            return true;
238        }
239        return false;
240    }
241
242    protected Lock setLockInternal(String id, Lock lock) {
243        serializationLock.lock();
244        try {
245            Lock oldLock;
246            if (caching && (oldLock = lockCache.get(id)) != null && oldLock != NULL_LOCK) {
247                return oldLock;
248            }
249            oldLock = getMapper().setLock(idFromString(id), lock);
250            if (caching && oldLock == null) {
251                lockCache.put(id, lock == null ? NULL_LOCK : lock);
252            }
253            return oldLock;
254        } finally {
255            serializationLock.unlock();
256        }
257    }
258
259    @Override
260    public Lock removeLock(final String id, final String owner) {
261        serializationLock.lock();
262        try {
263            Lock oldLock = null;
264            if (caching && (oldLock = lockCache.get(id)) == NULL_LOCK) {
265                return null;
266            }
267            if (oldLock != null && !LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
268                // existing mismatched lock, flag failure
269                oldLock = new Lock(oldLock, true);
270            } else {
271                if (oldLock == null) {
272                    oldLock = getMapper().removeLock(idFromString(id), owner, false);
273                } else {
274                    // we know the previous lock, we can force
275                    // no transaction needed, single operation
276                    getMapper().removeLock(idFromString(id), owner, true);
277                }
278            }
279            if (caching) {
280                if (oldLock != null && oldLock.getFailed()) {
281                    // failed, but we now know the existing lock
282                    lockCache.put(id, new Lock(oldLock, false));
283                } else {
284                    lockCache.put(id, NULL_LOCK);
285                }
286            }
287            return oldLock;
288        } finally {
289            serializationLock.unlock();
290        }
291    }
292
293    @Override
294    public void clearLockManagerCaches() {
295        serializationLock.lock();
296        try {
297            if (caching) {
298                lockCache.clear();
299            }
300        } finally {
301            serializationLock.unlock();
302        }
303    }
304
305    @Override
306    public String toString() {
307        return getClass().getSimpleName() + '(' + repository.getName() + ')';
308    }
309
310}