001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.Serializable;
022import java.sql.BatchUpdateException;
023import java.sql.SQLException;
024import java.util.ArrayList;
025import java.util.LinkedHashMap;
026import java.util.List;
027import java.util.Map.Entry;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
033import org.nuxeo.ecm.core.api.Lock;
034import org.nuxeo.ecm.core.api.LockException;
035import org.nuxeo.ecm.core.api.NuxeoException;
036import org.nuxeo.ecm.core.model.LockManager;
037import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService;
038import org.nuxeo.runtime.api.Framework;
039
040/**
041 * Manager of locks that serializes access to them.
042 * <p>
043 * The public methods called by the session are {@link #setLock}, {@link #removeLock} and {@link #getLock}. Method
044 * {@link #shutdown} must be called when done with the lock manager.
045 * <p>
046 * In cluster mode, changes are executed in a begin/commit so that tests/updates can be atomic.
047 * <p>
048 * Transaction management can be done by hand because we're dealing with a low-level {@link Mapper} and not something
049 * wrapped by a JCA pool.
050 */
051public class VCSLockManager implements LockManager {
052
053    private static final Log log = LogFactory.getLog(VCSLockManager.class);
054
055    public static final int LOCK_RETRIES = 10;
056
057    public static final long LOCK_SLEEP_DELAY = 1; // 1 ms
058
059    public static final long LOCK_SLEEP_INCREMENT = 50; // add 50 ms each time
060
061    protected final RepositoryImpl repository;
062
063    /**
064     * The mapper to use. In this mapper we only ever touch the lock table, so no need to deal with fulltext and complex
065     * saves, and we don't do prefetch.
066     */
067    protected Mapper mapper;
068
069    /**
070     * If clustering is enabled then we have to wrap test/set and test/remove in a transaction.
071     */
072    protected final boolean clusteringEnabled;
073
074    /**
075     * Lock serializing access to the mapper.
076     */
077    protected final ReentrantLock serializationLock;
078
079    protected static final Lock NULL_LOCK = new Lock(null, null);
080
081    protected final boolean caching;
082
083    /**
084     * A cache of locks, used only in non-cluster mode, when this lock manager is the only one dealing with locks.
085     * <p>
086     * Used under {@link #serializationLock}.
087     */
088    protected final LRUCache<Serializable, Lock> lockCache;
089
090    protected static final int CACHE_SIZE = 100;
091
092    protected static class LRUCache<K, V> extends LinkedHashMap<K, V> {
093        private static final long serialVersionUID = 1L;
094
095        private final int max;
096
097        public LRUCache(int max) {
098            super(max, 1.0f, true);
099            this.max = max;
100        }
101
102        @Override
103        protected boolean removeEldestEntry(Entry<K, V> eldest) {
104            return size() > max;
105        }
106    }
107
108    /**
109     * Creates a lock manager for the given repository.
110     * <p>
111     * The mapper will from then on be only used and closed by the lock manager.
112     * <p>
113     * {@link #close} must be called when done with the lock manager.
114     */
115    public VCSLockManager(String repositoryName) {
116        this(Framework.getService(SQLRepositoryService.class).getRepositoryImpl(repositoryName));
117    }
118
119    /**
120     * Creates a lock manager for the given repository.
121     * <p>
122     * The mapper will from then on be only used and closed by the lock manager.
123     * <p>
124     * {@link #close} must be called when done with the lock manager.
125     *
126     * @since 9.3
127     */
128    public VCSLockManager(RepositoryImpl repository) {
129        this.repository = repository;
130        clusteringEnabled = repository.getRepositoryDescriptor().getClusteringEnabled();
131        serializationLock = new ReentrantLock();
132        caching = !clusteringEnabled;
133        lockCache = caching ? new LRUCache<Serializable, Lock>(CACHE_SIZE) : null;
134    }
135
136    /**
137     * Delay mapper acquisition until the repository has been fully initialized.
138     */
139    protected Mapper getMapper() {
140        if (mapper == null) {
141            mapper = repository.newMapper(null, false);
142        }
143        return mapper;
144    }
145
146    protected Serializable idFromString(String id) {
147        return repository.getModel().idFromString(id);
148    }
149
150    @Override
151    public void closeLockManager() {
152        serializationLock.lock();
153        try {
154            if (mapper != null) {
155                getMapper().close();
156            }
157        } finally {
158            serializationLock.unlock();
159        }
160    }
161
162    @Override
163    public Lock getLock(final String id) {
164        serializationLock.lock();
165        try {
166            Lock lock;
167            if (caching && (lock = lockCache.get(id)) != null) {
168                return lock == NULL_LOCK ? null : lock;
169            }
170            // no transaction needed, single operation
171            lock = getMapper().getLock(idFromString(id));
172            if (caching) {
173                lockCache.put(id, lock == null ? NULL_LOCK : lock);
174            }
175            return lock;
176        } finally {
177            serializationLock.unlock();
178        }
179    }
180
181    @Override
182    public Lock setLock(String id, Lock lock) {
183        // We don't call addSuppressed() on an existing exception
184        // because constructing it beforehand when it most likely
185        // won't be needed is expensive.
186        List<Throwable> suppressed = new ArrayList<>(0);
187        long sleepDelay = LOCK_SLEEP_DELAY;
188        for (int i = 0; i < LOCK_RETRIES; i++) {
189            if (i > 0) {
190                log.debug("Retrying lock on " + id + ": try " + (i + 1));
191            }
192            try {
193                return setLockInternal(id, lock);
194            } catch (NuxeoException e) {
195                suppressed.add(e);
196                if (shouldRetry(e)) {
197                    // cluster: two simultaneous inserts
198                    // retry
199                    try {
200                        Thread.sleep(sleepDelay);
201                    } catch (InterruptedException ie) {
202                        // restore interrupted status
203                        Thread.currentThread().interrupt();
204                        throw new RuntimeException(ie);
205                    }
206                    sleepDelay += LOCK_SLEEP_INCREMENT;
207                    continue;
208                }
209                // not something to retry
210                NuxeoException exception = new NuxeoException(e);
211                for (Throwable t : suppressed) {
212                    exception.addSuppressed(t);
213                }
214                throw exception;
215            }
216        }
217        LockException exception = new LockException("Failed to lock " + id + ", too much concurrency (tried "
218                + LOCK_RETRIES + " times)");
219        for (Throwable t : suppressed) {
220            exception.addSuppressed(t);
221        }
222        throw exception;
223    }
224
225    /**
226     * Does the exception mean that we should retry the transaction?
227     */
228    protected boolean shouldRetry(Exception e) {
229        if (e instanceof ConcurrentUpdateException) {
230            return true;
231        }
232        Throwable t = e.getCause();
233        if (t instanceof BatchUpdateException && t.getCause() != null) {
234            t = t.getCause();
235        }
236        return t instanceof SQLException && shouldRetry((SQLException) t);
237    }
238
239    protected boolean shouldRetry(SQLException e) {
240        String sqlState = e.getSQLState();
241        if ("23000".equals(sqlState)) {
242            // MySQL: Duplicate entry ... for key ...
243            // Oracle: unique constraint ... violated
244            // SQL Server: Violation of PRIMARY KEY constraint
245            return true;
246        }
247        if ("23001".equals(sqlState)) {
248            // H2: Unique index or primary key violation
249            return true;
250        }
251        if ("23505".equals(sqlState)) {
252            // PostgreSQL: duplicate key value violates unique constraint
253            return true;
254        }
255        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
256            // SQL Server: Snapshot isolation transaction aborted due to update
257            // conflict
258            return true;
259        }
260        return false;
261    }
262
263    protected Lock setLockInternal(String id, Lock lock) {
264        serializationLock.lock();
265        try {
266            Lock oldLock;
267            if (caching && (oldLock = lockCache.get(id)) != null && oldLock != NULL_LOCK) {
268                return oldLock;
269            }
270            oldLock = getMapper().setLock(idFromString(id), lock);
271            if (caching && oldLock == null) {
272                lockCache.put(id, lock == null ? NULL_LOCK : lock);
273            }
274            return oldLock;
275        } finally {
276            serializationLock.unlock();
277        }
278    }
279
280    @Override
281    public Lock removeLock(final String id, final String owner) {
282        serializationLock.lock();
283        try {
284            Lock oldLock = null;
285            if (caching && (oldLock = lockCache.get(id)) == NULL_LOCK) {
286                return null;
287            }
288            if (oldLock != null && !LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
289                // existing mismatched lock, flag failure
290                oldLock = new Lock(oldLock, true);
291            } else {
292                if (oldLock == null) {
293                    oldLock = getMapper().removeLock(idFromString(id), owner, false);
294                } else {
295                    // we know the previous lock, we can force
296                    // no transaction needed, single operation
297                    getMapper().removeLock(idFromString(id), owner, true);
298                }
299            }
300            if (caching) {
301                if (oldLock != null && oldLock.getFailed()) {
302                    // failed, but we now know the existing lock
303                    lockCache.put(id, new Lock(oldLock, false));
304                } else {
305                    lockCache.put(id, NULL_LOCK);
306                }
307            }
308            return oldLock;
309        } finally {
310            serializationLock.unlock();
311        }
312    }
313
314    @Override
315    public void clearLockManagerCaches() {
316        serializationLock.lock();
317        try {
318            if (caching) {
319                lockCache.clear();
320            }
321        } finally {
322            serializationLock.unlock();
323        }
324    }
325
326    @Override
327    public String toString() {
328        return getClass().getSimpleName() + '(' + repository.getName() + ')';
329    }
330
331}