001/*
002 * (C) Copyright 2006-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import static org.nuxeo.ecm.core.storage.sql.Model.LOCK_CREATED_KEY;
022import static org.nuxeo.ecm.core.storage.sql.Model.LOCK_OWNER_KEY;
023import static org.nuxeo.ecm.core.storage.sql.Model.LOCK_TABLE_NAME;
024import static org.nuxeo.ecm.core.storage.sql.Model.MAIN_KEY;
025
026import java.io.Serializable;
027import java.sql.BatchUpdateException;
028import java.sql.Connection;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.util.ArrayList;
033import java.util.Calendar;
034import java.util.List;
035
036import org.apache.logging.log4j.LogManager;
037import org.apache.logging.log4j.Logger;
038import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
039import org.nuxeo.ecm.core.api.Lock;
040import org.nuxeo.ecm.core.api.LockException;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.lock.LockManager;
043import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo;
044import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
046import org.nuxeo.runtime.datasource.ConnectionHelper;
047
048/**
049 * Manager of locks stored in the repository SQL database.
050 */
051public class VCSLockManager implements LockManager {
052
053    private static final Logger log = LogManager.getLogger(VCSLockManager.class);
054
055    public static final int LOCK_RETRIES = 10;
056
057    public static final long LOCK_SLEEP_DELAY = 1; // 1 ms
058
059    public static final long LOCK_SLEEP_INCREMENT = 50; // add 50 ms each time
060
061    protected final String dataSourceName;
062
063    protected final Model model;
064
065    protected final SQLInfo sqlInfo;
066
067    /**
068     * Creates a lock manager for the given repository.
069     * <p>
070     * {@link #closeLockManager()} must be called when done with the lock manager.
071     *
072     * @since 9.3
073     */
074    public VCSLockManager(RepositoryImpl repository) {
075        dataSourceName = "repository_" + repository.getName();
076        model = repository.getModel();
077        sqlInfo = repository.getSQLInfo();
078    }
079
080    protected Connection getConnection() throws SQLException {
081        // open connection in noSharing mode
082        return ConnectionHelper.getConnection(dataSourceName, true);
083    }
084
085    protected Serializable idFromString(String id) {
086        return model.idFromString(id);
087    }
088
089    @Override
090    public Lock getLock(final String id) {
091        return readLock(idFromString(id));
092    }
093
094    @Override
095    public Lock setLock(String id, Lock lock) {
096        // We don't call addSuppressed() on an existing exception
097        // because constructing it beforehand when it most likely
098        // won't be needed is expensive.
099        List<Throwable> suppressed = new ArrayList<>(0);
100        long sleepDelay = LOCK_SLEEP_DELAY;
101        for (int i = 0; i < LOCK_RETRIES; i++) {
102            if (i > 0) {
103                log.debug("Retrying lock on {}: try {}", id, i + 1);
104            }
105            try {
106                return writeLock(idFromString(id), lock);
107            } catch (NuxeoException e) {
108                suppressed.add(e);
109                if (shouldRetry(e)) {
110                    // cluster: two simultaneous inserts
111                    // retry
112                    try {
113                        Thread.sleep(sleepDelay);
114                    } catch (InterruptedException ie) {
115                        Thread.currentThread().interrupt();
116                        throw new RuntimeException(ie);
117                    }
118                    sleepDelay += LOCK_SLEEP_INCREMENT;
119                    continue;
120                }
121                // not something to retry
122                NuxeoException exception = new NuxeoException(e);
123                for (Throwable t : suppressed) {
124                    exception.addSuppressed(t);
125                }
126                throw exception;
127            }
128        }
129        LockException exception = new LockException(
130                "Failed to lock " + id + ", too much concurrency (tried " + LOCK_RETRIES + " times)");
131        for (Throwable t : suppressed) {
132            exception.addSuppressed(t);
133        }
134        throw exception;
135    }
136
137    protected void checkConcurrentUpdate(Throwable e) {
138        if (sqlInfo.dialect.isConcurrentUpdateException(e)) {
139            log.debug(e, e);
140            // don't keep the original message, as it may reveal database-level info
141            throw new ConcurrentUpdateException("Concurrent update", e);
142        }
143    }
144
145    /**
146     * Does the exception mean that we should retry the transaction?
147     */
148    protected boolean shouldRetry(Exception e) {
149        if (e instanceof ConcurrentUpdateException) {
150            return true;
151        }
152        Throwable t = e.getCause();
153        if (t instanceof BatchUpdateException && t.getCause() != null) {
154            t = t.getCause();
155        }
156        return t instanceof SQLException && shouldRetry((SQLException) t);
157    }
158
159    protected boolean shouldRetry(SQLException e) {
160        String sqlState = e.getSQLState();
161        if ("23000".equals(sqlState)) {
162            // MySQL: Duplicate entry ... for key ...
163            // Oracle: unique constraint ... violated
164            // SQL Server: Violation of PRIMARY KEY constraint
165            return true;
166        }
167        if ("23001".equals(sqlState)) {
168            // H2: Unique index or primary key violation
169            return true;
170        }
171        if ("23505".equals(sqlState)) {
172            // PostgreSQL: duplicate key value violates unique constraint
173            return true;
174        }
175        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
176            // SQL Server: Snapshot isolation transaction aborted due to update
177            // conflict
178            return true;
179        }
180        return false;
181    }
182
183    @Override
184    public Lock removeLock(String id, String owner) {
185        return deleteLock(idFromString(id), owner);
186    }
187
188    /*
189     * ----- JDBC -----
190     */
191
192    protected Lock readLock(Serializable id) {
193        try (Connection connection = getConnection()) {
194            return readLock0(connection, id);
195        } catch (SQLException e) {
196            checkConcurrentUpdate(e);
197            throw new NuxeoException(e);
198        }
199    }
200
201    protected Lock readLock0(Connection connection, Serializable id) throws SQLException {
202        SQLInfoSelect select = sqlInfo.selectFragmentById.get(LOCK_TABLE_NAME);
203        try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
204            for (Column column : select.whereColumns) {
205                String key = column.getKey();
206                if (MAIN_KEY.equals(key)) {
207                    column.setToPreparedStatement(ps, 1, id);
208                } else {
209                    throw new NuxeoException(key);
210                }
211            }
212            log.trace("SQL: {} id={}", select.sql, id);
213            try (ResultSet rs = ps.executeQuery()) {
214                if (!rs.next()) {
215                    log.trace("SQL:   -> null");
216                    return null;
217                }
218                String owner = null;
219                Calendar created = null;
220                int i = 1;
221                for (Column column : select.whatColumns) {
222                    String key = column.getKey();
223                    Serializable value = column.getFromResultSet(rs, i++);
224                    if (LOCK_OWNER_KEY.equals(key)) {
225                        owner = (String) value;
226                    } else if (LOCK_CREATED_KEY.equals(key)) {
227                        created = (Calendar) value;
228                    } else {
229                        throw new NuxeoException(key);
230                    }
231                }
232                log.trace("SQL:   -> {}", owner);
233                return new Lock(owner, created);
234            }
235        }
236    }
237
238    protected Lock writeLock(Serializable id, Lock lock) {
239        try (Connection connection = getConnection()) {
240            return writeLock(connection, id, lock);
241        } catch (SQLException e) {
242            checkConcurrentUpdate(e);
243            throw new NuxeoException(e);
244        }
245    }
246
247    protected Lock writeLock(Connection connection, Serializable id, Lock lock) throws SQLException {
248        try {
249            writeLock0(connection, id, lock);
250            return null;
251        } catch (SQLException e) {
252            if (!sqlInfo.dialect.isConcurrentUpdateException(e)) {
253                throw e;
254            }
255            log.trace("SQL:   -> duplicate");
256        }
257        // lock already exists, try to read it
258        Lock oldLock = readLock0(connection, id);
259        if (oldLock != null) {
260            // there indeed was another lock, return it
261            return oldLock;
262        }
263        // we attempted a write that failed, but when reading there's nothing
264        // because a concurrent transaction already removed it
265        // however we have to return an old lock per our contract
266        // retry at a higher level
267        throw new ConcurrentUpdateException("Concurrent update");
268    }
269
270    protected void writeLock0(Connection connection, Serializable id, Lock lock) throws SQLException {
271        String sql = sqlInfo.getInsertSql(LOCK_TABLE_NAME);
272        try (PreparedStatement ps = connection.prepareStatement(sql)) {
273            int i = 1;
274            for (Column column : sqlInfo.getInsertColumns(LOCK_TABLE_NAME)) {
275                String key = column.getKey();
276                Serializable value;
277                if (MAIN_KEY.equals(key)) {
278                    value = id;
279                } else if (LOCK_OWNER_KEY.equals(key)) {
280                    value = lock.getOwner();
281                } else if (LOCK_CREATED_KEY.equals(key)) {
282                    value = lock.getCreated();
283                } else {
284                    throw new NuxeoException(key);
285                }
286                column.setToPreparedStatement(ps, i++, value);
287            }
288            log.trace("SQL: {} id={} owner={}", () -> sql, () -> id, lock::getOwner);
289            ps.execute();
290        }
291    }
292
293    protected Lock deleteLock(Serializable id, String owner) {
294        try (Connection connection = getConnection()) {
295            return deleteLock(connection, id, owner);
296        } catch (SQLException e) {
297            checkConcurrentUpdate(e);
298            throw new NuxeoException(e);
299        }
300    }
301
302    protected Lock deleteLock(Connection connection, Serializable id, String owner) throws SQLException {
303        log.trace("SQL: tx begin");
304        connection.setAutoCommit(false);
305        try {
306            Lock oldLock = readLock0(connection, id);
307            if (owner != null) {
308                if (oldLock == null) {
309                    // not locked, nothing to do
310                    return null;
311                }
312                if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
313                    // existing mismatched lock, flag failure
314                    return new Lock(oldLock, true);
315                }
316            }
317            if (oldLock != null) {
318                deleteLock0(connection, id);
319            }
320            return oldLock;
321        } finally {
322            try {
323                log.trace("SQL: tx commit");
324                connection.commit();
325            } finally {
326                connection.setAutoCommit(true);
327            }
328        }
329    }
330
331    protected void deleteLock0(Connection connection, Serializable id) throws SQLException {
332        String sql = sqlInfo.getDeleteSql(LOCK_TABLE_NAME, 1);
333        try (PreparedStatement ps = connection.prepareStatement(sql)) {
334            log.trace("SQL: {} id={}", sql, id);
335            sqlInfo.dialect.setId(ps, 1, id);
336            ps.executeUpdate();
337        }
338    }
339
340    @Override
341    public String toString() {
342        return getClass().getSimpleName() + '(' + dataSourceName + ')';
343    }
344
345}