001/* 002 * (C) Copyright 2006-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import static org.nuxeo.ecm.core.storage.sql.Model.LOCK_CREATED_KEY; 022import static org.nuxeo.ecm.core.storage.sql.Model.LOCK_OWNER_KEY; 023import static org.nuxeo.ecm.core.storage.sql.Model.LOCK_TABLE_NAME; 024import static org.nuxeo.ecm.core.storage.sql.Model.MAIN_KEY; 025 026import java.io.Serializable; 027import java.sql.BatchUpdateException; 028import java.sql.Connection; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.util.ArrayList; 033import java.util.Calendar; 034import java.util.List; 035 036import org.apache.logging.log4j.LogManager; 037import org.apache.logging.log4j.Logger; 038import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 039import org.nuxeo.ecm.core.api.Lock; 040import org.nuxeo.ecm.core.api.LockException; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.lock.LockManager; 043import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo; 044import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 046import org.nuxeo.runtime.datasource.ConnectionHelper; 047 048/** 049 * Manager of locks stored in the repository SQL database. 050 */ 051public class VCSLockManager implements LockManager { 052 053 private static final Logger log = LogManager.getLogger(VCSLockManager.class); 054 055 public static final int LOCK_RETRIES = 10; 056 057 public static final long LOCK_SLEEP_DELAY = 1; // 1 ms 058 059 public static final long LOCK_SLEEP_INCREMENT = 50; // add 50 ms each time 060 061 protected final String dataSourceName; 062 063 protected final Model model; 064 065 protected final SQLInfo sqlInfo; 066 067 /** 068 * Creates a lock manager for the given repository. 069 * <p> 070 * {@link #closeLockManager()} must be called when done with the lock manager. 071 * 072 * @since 9.3 073 */ 074 public VCSLockManager(RepositoryImpl repository) { 075 dataSourceName = "repository_" + repository.getName(); 076 model = repository.getModel(); 077 sqlInfo = repository.getSQLInfo(); 078 } 079 080 protected Connection getConnection() throws SQLException { 081 // open connection in noSharing mode 082 return ConnectionHelper.getConnection(dataSourceName, true); 083 } 084 085 protected Serializable idFromString(String id) { 086 return model.idFromString(id); 087 } 088 089 @Override 090 public Lock getLock(final String id) { 091 return readLock(idFromString(id)); 092 } 093 094 @Override 095 public Lock setLock(String id, Lock lock) { 096 // We don't call addSuppressed() on an existing exception 097 // because constructing it beforehand when it most likely 098 // won't be needed is expensive. 099 List<Throwable> suppressed = new ArrayList<>(0); 100 long sleepDelay = LOCK_SLEEP_DELAY; 101 for (int i = 0; i < LOCK_RETRIES; i++) { 102 if (i > 0) { 103 log.debug("Retrying lock on {}: try {}", id, i + 1); 104 } 105 try { 106 return writeLock(idFromString(id), lock); 107 } catch (NuxeoException e) { 108 suppressed.add(e); 109 if (shouldRetry(e)) { 110 // cluster: two simultaneous inserts 111 // retry 112 try { 113 Thread.sleep(sleepDelay); 114 } catch (InterruptedException ie) { 115 Thread.currentThread().interrupt(); 116 throw new RuntimeException(ie); 117 } 118 sleepDelay += LOCK_SLEEP_INCREMENT; 119 continue; 120 } 121 // not something to retry 122 NuxeoException exception = new NuxeoException(e); 123 for (Throwable t : suppressed) { 124 exception.addSuppressed(t); 125 } 126 throw exception; 127 } 128 } 129 LockException exception = new LockException( 130 "Failed to lock " + id + ", too much concurrency (tried " + LOCK_RETRIES + " times)"); 131 for (Throwable t : suppressed) { 132 exception.addSuppressed(t); 133 } 134 throw exception; 135 } 136 137 protected void checkConcurrentUpdate(Throwable e) { 138 if (sqlInfo.dialect.isConcurrentUpdateException(e)) { 139 log.debug(e, e); 140 // don't keep the original message, as it may reveal database-level info 141 throw new ConcurrentUpdateException("Concurrent update", e); 142 } 143 } 144 145 /** 146 * Does the exception mean that we should retry the transaction? 147 */ 148 protected boolean shouldRetry(Exception e) { 149 if (e instanceof ConcurrentUpdateException) { 150 return true; 151 } 152 Throwable t = e.getCause(); 153 if (t instanceof BatchUpdateException && t.getCause() != null) { 154 t = t.getCause(); 155 } 156 return t instanceof SQLException && shouldRetry((SQLException) t); 157 } 158 159 protected boolean shouldRetry(SQLException e) { 160 String sqlState = e.getSQLState(); 161 if ("23000".equals(sqlState)) { 162 // MySQL: Duplicate entry ... for key ... 163 // Oracle: unique constraint ... violated 164 // SQL Server: Violation of PRIMARY KEY constraint 165 return true; 166 } 167 if ("23001".equals(sqlState)) { 168 // H2: Unique index or primary key violation 169 return true; 170 } 171 if ("23505".equals(sqlState)) { 172 // PostgreSQL: duplicate key value violates unique constraint 173 return true; 174 } 175 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 176 // SQL Server: Snapshot isolation transaction aborted due to update 177 // conflict 178 return true; 179 } 180 return false; 181 } 182 183 @Override 184 public Lock removeLock(String id, String owner) { 185 return deleteLock(idFromString(id), owner); 186 } 187 188 /* 189 * ----- JDBC ----- 190 */ 191 192 protected Lock readLock(Serializable id) { 193 try (Connection connection = getConnection()) { 194 return readLock0(connection, id); 195 } catch (SQLException e) { 196 checkConcurrentUpdate(e); 197 throw new NuxeoException(e); 198 } 199 } 200 201 protected Lock readLock0(Connection connection, Serializable id) throws SQLException { 202 SQLInfoSelect select = sqlInfo.selectFragmentById.get(LOCK_TABLE_NAME); 203 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 204 for (Column column : select.whereColumns) { 205 String key = column.getKey(); 206 if (MAIN_KEY.equals(key)) { 207 column.setToPreparedStatement(ps, 1, id); 208 } else { 209 throw new NuxeoException(key); 210 } 211 } 212 log.trace("SQL: {} id={}", select.sql, id); 213 try (ResultSet rs = ps.executeQuery()) { 214 if (!rs.next()) { 215 log.trace("SQL: -> null"); 216 return null; 217 } 218 String owner = null; 219 Calendar created = null; 220 int i = 1; 221 for (Column column : select.whatColumns) { 222 String key = column.getKey(); 223 Serializable value = column.getFromResultSet(rs, i++); 224 if (LOCK_OWNER_KEY.equals(key)) { 225 owner = (String) value; 226 } else if (LOCK_CREATED_KEY.equals(key)) { 227 created = (Calendar) value; 228 } else { 229 throw new NuxeoException(key); 230 } 231 } 232 log.trace("SQL: -> {}", owner); 233 return new Lock(owner, created); 234 } 235 } 236 } 237 238 protected Lock writeLock(Serializable id, Lock lock) { 239 try (Connection connection = getConnection()) { 240 return writeLock(connection, id, lock); 241 } catch (SQLException e) { 242 checkConcurrentUpdate(e); 243 throw new NuxeoException(e); 244 } 245 } 246 247 protected Lock writeLock(Connection connection, Serializable id, Lock lock) throws SQLException { 248 try { 249 writeLock0(connection, id, lock); 250 return null; 251 } catch (SQLException e) { 252 if (!sqlInfo.dialect.isConcurrentUpdateException(e)) { 253 throw e; 254 } 255 log.trace("SQL: -> duplicate"); 256 } 257 // lock already exists, try to read it 258 Lock oldLock = readLock0(connection, id); 259 if (oldLock != null) { 260 // there indeed was another lock, return it 261 return oldLock; 262 } 263 // we attempted a write that failed, but when reading there's nothing 264 // because a concurrent transaction already removed it 265 // however we have to return an old lock per our contract 266 // retry at a higher level 267 throw new ConcurrentUpdateException("Concurrent update"); 268 } 269 270 protected void writeLock0(Connection connection, Serializable id, Lock lock) throws SQLException { 271 String sql = sqlInfo.getInsertSql(LOCK_TABLE_NAME); 272 try (PreparedStatement ps = connection.prepareStatement(sql)) { 273 int i = 1; 274 for (Column column : sqlInfo.getInsertColumns(LOCK_TABLE_NAME)) { 275 String key = column.getKey(); 276 Serializable value; 277 if (MAIN_KEY.equals(key)) { 278 value = id; 279 } else if (LOCK_OWNER_KEY.equals(key)) { 280 value = lock.getOwner(); 281 } else if (LOCK_CREATED_KEY.equals(key)) { 282 value = lock.getCreated(); 283 } else { 284 throw new NuxeoException(key); 285 } 286 column.setToPreparedStatement(ps, i++, value); 287 } 288 log.trace("SQL: {} id={} owner={}", () -> sql, () -> id, lock::getOwner); 289 ps.execute(); 290 } 291 } 292 293 protected Lock deleteLock(Serializable id, String owner) { 294 try (Connection connection = getConnection()) { 295 return deleteLock(connection, id, owner); 296 } catch (SQLException e) { 297 checkConcurrentUpdate(e); 298 throw new NuxeoException(e); 299 } 300 } 301 302 protected Lock deleteLock(Connection connection, Serializable id, String owner) throws SQLException { 303 log.trace("SQL: tx begin"); 304 connection.setAutoCommit(false); 305 try { 306 Lock oldLock = readLock0(connection, id); 307 if (owner != null) { 308 if (oldLock == null) { 309 // not locked, nothing to do 310 return null; 311 } 312 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 313 // existing mismatched lock, flag failure 314 return new Lock(oldLock, true); 315 } 316 } 317 if (oldLock != null) { 318 deleteLock0(connection, id); 319 } 320 return oldLock; 321 } finally { 322 try { 323 log.trace("SQL: tx commit"); 324 connection.commit(); 325 } finally { 326 connection.setAutoCommit(true); 327 } 328 } 329 } 330 331 protected void deleteLock0(Connection connection, Serializable id) throws SQLException { 332 String sql = sqlInfo.getDeleteSql(LOCK_TABLE_NAME, 1); 333 try (PreparedStatement ps = connection.prepareStatement(sql)) { 334 log.trace("SQL: {} id={}", sql, id); 335 sqlInfo.dialect.setId(ps, 1, id); 336 ps.executeUpdate(); 337 } 338 } 339 340 @Override 341 public String toString() { 342 return getClass().getSimpleName() + '(' + dataSourceName + ')'; 343 } 344 345}