001/* 002 * (C) Copyright 2010-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.io.Serializable; 028import java.sql.Connection; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.util.Map; 033 034import org.apache.commons.io.IOUtils; 035import org.apache.commons.lang3.StringUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.blob.binary.Binary; 039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 040import org.nuxeo.ecm.core.blob.binary.BinaryManager; 041import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 042import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 043import org.nuxeo.ecm.core.blob.binary.FileStorage; 044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 047import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 048import org.nuxeo.runtime.datasource.ConnectionHelper; 049 050/** 051 * A Binary Manager that stores binaries as SQL BLOBs. 052 * <p> 053 * The BLOBs are cached locally on first access for efficiency. 054 * <p> 055 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 056 * if accessed before the stream. 057 */ 058public class SQLBinaryManager extends CachingBinaryManager { 059 060 private static final Log log = LogFactory.getLog(SQLBinaryManager.class); 061 062 public static final String DS_PROP = "datasource"; 063 064 public static final String DS_PREFIX = "datasource="; 065 066 public static final String TABLE_PROP = "table"; 067 068 public static final String TABLE_PREFIX = "table="; 069 070 public static final String CACHE_SIZE_PROP = "cacheSize"; 071 072 public static final String CACHE_SIZE_PREFIX = "cachesize="; 073 074 public static final String DEFAULT_CACHE_SIZE = "10M"; 075 076 public static final String COL_ID = "id"; 077 078 public static final String COL_BIN = "bin"; 079 080 public static final String COL_MARK = "mark"; // for mark & sweep GC 081 082 protected String dataSourceName; 083 084 protected String checkSql; 085 086 protected String putSql; 087 088 protected String getSql; 089 090 protected String gcStartSql; 091 092 protected String gcMarkSql; 093 094 protected String gcStatsSql; 095 096 protected String gcSweepSql; 097 098 protected static boolean disableCheckExisting; // for unit tests 099 100 protected static boolean resetCache; // for unit tests 101 102 @Override 103 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 104 super.initialize(blobProviderId, properties); 105 106 dataSourceName = null; 107 String tableName = null; 108 String cacheSizeStr = null; 109 String key = properties.get(BinaryManager.PROP_KEY); 110 key = StringUtils.defaultIfBlank(key, ""); 111 for (String part : key.split(",")) { 112 if (part.startsWith(DS_PREFIX)) { 113 dataSourceName = part.substring(DS_PREFIX.length()).trim(); 114 } 115 if (part.startsWith(TABLE_PREFIX)) { 116 tableName = part.substring(TABLE_PREFIX.length()).trim(); 117 } 118 if (part.startsWith(CACHE_SIZE_PREFIX)) { 119 cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim(); 120 } 121 } 122 if (StringUtils.isBlank(dataSourceName)) { 123 dataSourceName = properties.get(DS_PROP); 124 if (StringUtils.isBlank(dataSourceName)) { 125 throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration"); 126 } 127 } 128 if (StringUtils.isBlank(tableName)) { 129 tableName = properties.get(TABLE_PROP); 130 if (StringUtils.isBlank(tableName)) { 131 throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration"); 132 } 133 } 134 if (StringUtils.isBlank(cacheSizeStr)) { 135 cacheSizeStr = properties.get(CACHE_SIZE_PROP); 136 if (StringUtils.isBlank(cacheSizeStr)) { 137 cacheSizeStr = DEFAULT_CACHE_SIZE; 138 } 139 } 140 141 // create the SQL statements used 142 createSql(tableName); 143 144 // create file cache 145 initializeCache(cacheSizeStr, new SQLFileStorage()); 146 createGarbageCollector(); 147 } 148 149 protected void createGarbageCollector() { 150 garbageCollector = new SQLBinaryGarbageCollector(this); 151 } 152 153 protected void createSql(String tableName) throws IOException { 154 Dialect dialect = getDialect(); 155 Database database = new Database(dialect); 156 Table table = database.addTable(tableName); 157 ColumnType dummytype = ColumnType.STRING; 158 Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null); 159 Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null); 160 Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null); 161 162 checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName()); 163 putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(), 164 idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName()); 165 getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(), 166 idCol.getQuotedName()); 167 168 gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName()); 169 gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(), 170 idCol.getQuotedName()); 171 gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 172 binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName()); 173 gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName()); 174 } 175 176 protected Dialect getDialect() throws IOException { 177 Connection connection = null; 178 try { 179 connection = getConnection(); 180 return Dialect.createDialect(connection, null); 181 } catch (SQLException e) { 182 throw new IOException(e); 183 } finally { 184 if (connection != null) { 185 try { 186 connection.close(); 187 } catch (SQLException e) { 188 log.error(e, e); 189 } 190 } 191 } 192 } 193 194 protected Connection getConnection() throws SQLException { 195 return ConnectionHelper.getConnection(dataSourceName); 196 } 197 198 protected static void logSQL(String sql, Serializable... values) { 199 if (!log.isTraceEnabled()) { 200 return; 201 } 202 StringBuilder buf = new StringBuilder(); 203 int start = 0; 204 for (Serializable v : values) { 205 int index = sql.indexOf('?', start); 206 if (index == -1) { 207 // mismatch between number of ? and number of values 208 break; 209 } 210 buf.append(sql, start, index); 211 buf.append(loggedValue(v)); 212 start = index + 1; 213 } 214 buf.append(sql, start, sql.length()); 215 log.trace("(bin) SQL: " + buf.toString()); 216 } 217 218 protected static String loggedValue(Serializable value) { 219 if (value == null) { 220 return "NULL"; 221 } 222 if (value instanceof String) { 223 String v = (String) value; 224 return "'" + v.replace("'", "''") + "'"; 225 } 226 return value.toString(); 227 } 228 229 protected static boolean isDuplicateKeyException(SQLException e) { 230 String sqlState = e.getSQLState(); 231 if ("23000".equals(sqlState)) { 232 // MySQL: Duplicate entry ... for key ... 233 // Oracle: unique constraint ... violated 234 // SQL Server: Violation of PRIMARY KEY constraint 235 return true; 236 } 237 if ("23001".equals(sqlState)) { 238 // H2: Unique index or primary key violation 239 return true; 240 } 241 if ("23505".equals(sqlState)) { 242 // H2: Unique index or primary key violation 243 // PostgreSQL: duplicate key value violates unique constraint 244 return true; 245 } 246 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 247 // SQL Server: Snapshot isolation transaction aborted due to update 248 // conflict 249 return true; 250 } 251 return false; 252 } 253 254 @Override 255 public Binary getBinary(String digest) { 256 if (resetCache) { 257 // for unit tests 258 resetCache = false; 259 fileCache.clear(); 260 } 261 return super.getBinary(digest); 262 } 263 264 @Override 265 public Binary getBinary(InputStream in) throws IOException { 266 if (resetCache) { 267 // for unit tests 268 resetCache = false; 269 fileCache.clear(); 270 } 271 return super.getBinary(in); 272 } 273 274 public class SQLFileStorage implements FileStorage { 275 276 @Override 277 public void storeFile(String digest, File file) throws IOException { 278 Connection connection = null; 279 try { 280 connection = getConnection(); 281 boolean existing; 282 if (disableCheckExisting) { 283 // for unit tests 284 existing = false; 285 } else { 286 logSQL(checkSql, digest); 287 try (PreparedStatement ps = connection.prepareStatement(checkSql)) { 288 ps.setString(1, digest); 289 try (ResultSet rs = ps.executeQuery()) { 290 existing = rs.next(); 291 } 292 } 293 } 294 if (!existing) { 295 // insert new blob 296 logSQL(putSql, digest, "somebinary", Boolean.TRUE); 297 try (PreparedStatement ps = connection.prepareStatement(putSql)) { 298 ps.setString(1, digest); 299 // needs dbcp 1.4: 300 // ps.setBlob(2, new FileInputStream(file), file.length()); 301 try (FileInputStream tmpis = new FileInputStream(file)) { 302 ps.setBinaryStream(2, tmpis, (int) file.length()); 303 ps.setBoolean(3, true); // mark new additions for GC 304 try { 305 ps.execute(); 306 } catch (SQLException e) { 307 if (!isDuplicateKeyException(e)) { 308 throw e; 309 } 310 } 311 } 312 } 313 } 314 } catch (SQLException e) { 315 throw new IOException(e); 316 } finally { 317 if (connection != null) { 318 try { 319 connection.close(); 320 } catch (SQLException e) { 321 log.error(e, e); 322 } 323 } 324 } 325 } 326 327 @Override 328 public boolean fetchFile(String digest, File tmp) throws IOException { 329 Connection connection = null; 330 try { 331 connection = getConnection(); 332 logSQL(getSql, digest); 333 try (PreparedStatement ps = connection.prepareStatement(getSql)) { 334 ps.setString(1, digest); 335 try (ResultSet rs = ps.executeQuery()) { 336 if (!rs.next()) { 337 log.error("Unknown binary: " + digest); 338 return false; 339 } 340 try (InputStream in = rs.getBinaryStream(1)) { 341 if (in == null) { 342 log.error("Missing binary: " + digest); 343 return false; 344 } 345 // store in file 346 try (OutputStream out = new FileOutputStream(tmp)) { 347 IOUtils.copy(in, out); 348 } 349 } 350 } 351 } 352 return true; 353 } catch (SQLException e) { 354 throw new IOException(e); 355 } finally { 356 if (connection != null) { 357 try { 358 connection.close(); 359 } catch (SQLException e) { 360 log.error(e, e); 361 } 362 } 363 } 364 } 365 } 366 367 public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector { 368 369 protected final SQLBinaryManager binaryManager; 370 371 protected volatile long startTime; 372 373 protected BinaryManagerStatus status; 374 375 public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) { 376 this.binaryManager = binaryManager; 377 } 378 379 @Override 380 public String getId() { 381 return "datasource:" + binaryManager.dataSourceName; 382 } 383 384 @Override 385 public BinaryManagerStatus getStatus() { 386 return status; 387 } 388 389 @Override 390 public boolean isInProgress() { 391 // volatile as this is designed to be called from another thread 392 return startTime != 0; 393 } 394 395 @Override 396 public void start() { 397 if (startTime != 0) { 398 throw new RuntimeException("Already started"); 399 } 400 startTime = System.currentTimeMillis(); 401 status = new BinaryManagerStatus(); 402 403 Connection connection = null; 404 PreparedStatement ps = null; 405 try { 406 connection = binaryManager.getConnection(); 407 logSQL(binaryManager.gcStartSql, Boolean.FALSE); 408 ps = connection.prepareStatement(binaryManager.gcStartSql); 409 ps.setBoolean(1, false); // clear marks 410 int n = ps.executeUpdate(); 411 logSQL(" -> ? rows", Long.valueOf(n)); 412 } catch (SQLException e) { 413 throw new RuntimeException(e); 414 } finally { 415 if (ps != null) { 416 try { 417 ps.close(); 418 } catch (SQLException e) { 419 log.error(e, e); 420 } 421 } 422 if (connection != null) { 423 try { 424 connection.close(); 425 } catch (SQLException e) { 426 log.error(e, e); 427 } 428 } 429 } 430 } 431 432 @Override 433 public void mark(String digest) { 434 Connection connection = null; 435 PreparedStatement ps = null; 436 try { 437 connection = binaryManager.getConnection(); 438 logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest); 439 ps = connection.prepareStatement(binaryManager.gcMarkSql); 440 ps.setBoolean(1, true); // mark 441 ps.setString(2, digest); 442 ps.execute(); 443 } catch (SQLException e) { 444 throw new RuntimeException(e); 445 } finally { 446 if (ps != null) { 447 try { 448 ps.close(); 449 } catch (SQLException e) { 450 log.error(e, e); 451 } 452 } 453 if (connection != null) { 454 try { 455 connection.close(); 456 } catch (SQLException e) { 457 log.error(e, e); 458 } 459 } 460 } 461 } 462 463 @Override 464 public void stop(boolean delete) { 465 if (startTime == 0) { 466 throw new RuntimeException("Not started"); 467 } 468 469 Connection connection = null; 470 PreparedStatement ps = null; 471 try { 472 connection = binaryManager.getConnection(); 473 // stats 474 logSQL(binaryManager.gcStatsSql, Boolean.TRUE); 475 ps = connection.prepareStatement(binaryManager.gcStatsSql); 476 ps.setBoolean(1, true); // marked 477 try (ResultSet rs = ps.executeQuery()) { 478 rs.next(); 479 status.numBinaries = rs.getLong(1); 480 status.sizeBinaries = rs.getLong(2); 481 } 482 logSQL(" -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries)); 483 logSQL(binaryManager.gcStatsSql, Boolean.FALSE); 484 ps.setBoolean(1, false); // unmarked 485 try (ResultSet rs = ps.executeQuery()) { 486 rs.next(); 487 status.numBinariesGC = rs.getLong(1); 488 status.sizeBinariesGC = rs.getLong(2); 489 } 490 logSQL(" -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC)); 491 if (delete) { 492 // sweep 493 ps.close(); 494 logSQL(binaryManager.gcSweepSql, Boolean.FALSE); 495 ps = connection.prepareStatement(binaryManager.gcSweepSql); 496 ps.setBoolean(1, false); // sweep unmarked 497 int n = ps.executeUpdate(); 498 logSQL(" -> ? rows", Long.valueOf(n)); 499 } 500 } catch (SQLException e) { 501 throw new RuntimeException(e); 502 } finally { 503 if (ps != null) { 504 try { 505 ps.close(); 506 } catch (SQLException e) { 507 log.error(e, e); 508 } 509 } 510 if (connection != null) { 511 try { 512 connection.close(); 513 } catch (SQLException e) { 514 log.error(e, e); 515 } 516 } 517 } 518 519 status.gcDuration = System.currentTimeMillis() - startTime; 520 startTime = 0; 521 } 522 } 523 524}