001/* 002 * (C) Copyright 2010-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.io.Serializable; 028import java.sql.Connection; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.util.Map; 033 034import org.apache.commons.io.IOUtils; 035import org.apache.commons.lang3.StringUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.blob.BlobProviderDescriptor; 039import org.nuxeo.ecm.core.blob.binary.Binary; 040import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 041import org.nuxeo.ecm.core.blob.binary.BinaryManager; 042import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 043import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 044import org.nuxeo.ecm.core.blob.binary.FileStorage; 045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 047import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 048import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 049import org.nuxeo.runtime.datasource.ConnectionHelper; 050 051/** 052 * A Binary Manager that stores binaries as SQL BLOBs. 053 * <p> 054 * The BLOBs are cached locally on first access for efficiency. 055 * <p> 056 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 057 * if accessed before the stream. 058 */ 059public class SQLBinaryManager extends CachingBinaryManager { 060 061 private static final Log log = LogFactory.getLog(SQLBinaryManager.class); 062 063 public static final String DS_PROP = "datasource"; 064 065 public static final String DS_PREFIX = "datasource="; 066 067 public static final String TABLE_PROP = "table"; 068 069 public static final String TABLE_PREFIX = "table="; 070 071 public static final String CACHE_SIZE_PROP = "cacheSize"; 072 073 public static final String CACHE_SIZE_PREFIX = "cachesize="; 074 075 public static final String DEFAULT_CACHE_SIZE = "10M"; 076 077 public static final String COL_ID = "id"; 078 079 public static final String COL_BIN = "bin"; 080 081 public static final String COL_MARK = "mark"; // for mark & sweep GC 082 083 protected String dataSourceName; 084 085 protected String checkSql; 086 087 protected String putSql; 088 089 protected String getSql; 090 091 protected String gcStartSql; 092 093 protected String gcMarkSql; 094 095 protected String gcStatsSql; 096 097 protected String gcSweepSql; 098 099 protected static boolean disableCheckExisting; // for unit tests 100 101 protected static boolean resetCache; // for unit tests 102 103 @Override 104 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 105 super.initialize(blobProviderId, properties); 106 107 dataSourceName = null; 108 String tableName = null; 109 String cacheSizeStr = null; 110 String key = properties.get(BinaryManager.PROP_KEY); 111 key = StringUtils.defaultIfBlank(key, ""); 112 for (String part : key.split(",")) { 113 if (part.startsWith(DS_PREFIX)) { 114 dataSourceName = part.substring(DS_PREFIX.length()).trim(); 115 } 116 if (part.startsWith(TABLE_PREFIX)) { 117 tableName = part.substring(TABLE_PREFIX.length()).trim(); 118 } 119 if (part.startsWith(CACHE_SIZE_PREFIX)) { 120 cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim(); 121 } 122 } 123 if (StringUtils.isBlank(dataSourceName)) { 124 dataSourceName = properties.get(DS_PROP); 125 if (StringUtils.isBlank(dataSourceName)) { 126 throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration"); 127 } 128 } 129 if (StringUtils.isBlank(tableName)) { 130 tableName = properties.get(TABLE_PROP); 131 if (StringUtils.isBlank(tableName)) { 132 throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration"); 133 } 134 } 135 if (StringUtils.isBlank(cacheSizeStr)) { 136 cacheSizeStr = properties.get(CACHE_SIZE_PROP); 137 if (StringUtils.isBlank(cacheSizeStr)) { 138 cacheSizeStr = DEFAULT_CACHE_SIZE; 139 } 140 } 141 if (StringUtils.isNotBlank(properties.get(BlobProviderDescriptor.NAMESPACE))) { 142 throw new RuntimeException("Namespaces not implemented"); 143 } 144 145 // create the SQL statements used 146 createSql(tableName); 147 148 // create file cache 149 initializeCache(cacheSizeStr, new SQLFileStorage()); 150 createGarbageCollector(); 151 } 152 153 protected void createGarbageCollector() { 154 garbageCollector = new SQLBinaryGarbageCollector(this); 155 } 156 157 protected void createSql(String tableName) throws IOException { 158 Dialect dialect = getDialect(); 159 Database database = new Database(dialect); 160 Table table = database.addTable(tableName); 161 ColumnType dummytype = ColumnType.STRING; 162 Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null); 163 Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null); 164 Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null); 165 166 checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName()); 167 putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(), 168 idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName()); 169 getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(), 170 idCol.getQuotedName()); 171 172 gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName()); 173 gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(), 174 idCol.getQuotedName()); 175 gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 176 binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName()); 177 gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName()); 178 } 179 180 protected Dialect getDialect() throws IOException { 181 Connection connection = null; 182 try { 183 connection = getConnection(); 184 return Dialect.createDialect(connection, null); 185 } catch (SQLException e) { 186 throw new IOException(e); 187 } finally { 188 if (connection != null) { 189 try { 190 connection.close(); 191 } catch (SQLException e) { 192 log.error(e, e); 193 } 194 } 195 } 196 } 197 198 protected Connection getConnection() throws SQLException { 199 return ConnectionHelper.getConnection(dataSourceName); 200 } 201 202 protected static void logSQL(String sql, Serializable... values) { 203 if (!log.isTraceEnabled()) { 204 return; 205 } 206 StringBuilder buf = new StringBuilder(); 207 int start = 0; 208 for (Serializable v : values) { 209 int index = sql.indexOf('?', start); 210 if (index == -1) { 211 // mismatch between number of ? and number of values 212 break; 213 } 214 buf.append(sql, start, index); 215 buf.append(loggedValue(v)); 216 start = index + 1; 217 } 218 buf.append(sql, start, sql.length()); 219 log.trace("(bin) SQL: " + buf.toString()); 220 } 221 222 protected static String loggedValue(Serializable value) { 223 if (value == null) { 224 return "NULL"; 225 } 226 if (value instanceof String) { 227 String v = (String) value; 228 return "'" + v.replace("'", "''") + "'"; 229 } 230 return value.toString(); 231 } 232 233 protected static boolean isDuplicateKeyException(SQLException e) { 234 String sqlState = e.getSQLState(); 235 if ("23000".equals(sqlState)) { 236 // MySQL: Duplicate entry ... for key ... 237 // Oracle: unique constraint ... violated 238 // SQL Server: Violation of PRIMARY KEY constraint 239 return true; 240 } 241 if ("23001".equals(sqlState)) { 242 // H2: Unique index or primary key violation 243 return true; 244 } 245 if ("23505".equals(sqlState)) { 246 // H2: Unique index or primary key violation 247 // PostgreSQL: duplicate key value violates unique constraint 248 return true; 249 } 250 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 251 // SQL Server: Snapshot isolation transaction aborted due to update 252 // conflict 253 return true; 254 } 255 return false; 256 } 257 258 @Override 259 public Binary getBinary(String digest) { 260 if (resetCache) { 261 // for unit tests 262 resetCache = false; 263 fileCache.clear(); 264 } 265 return super.getBinary(digest); 266 } 267 268 @Override 269 public Binary getBinary(InputStream in) throws IOException { 270 if (resetCache) { 271 // for unit tests 272 resetCache = false; 273 fileCache.clear(); 274 } 275 return super.getBinary(in); 276 } 277 278 public class SQLFileStorage implements FileStorage { 279 280 @Override 281 public void storeFile(String digest, File file) throws IOException { 282 Connection connection = null; 283 try { 284 connection = getConnection(); 285 boolean existing; 286 if (disableCheckExisting) { 287 // for unit tests 288 existing = false; 289 } else { 290 logSQL(checkSql, digest); 291 try (PreparedStatement ps = connection.prepareStatement(checkSql)) { 292 ps.setString(1, digest); 293 try (ResultSet rs = ps.executeQuery()) { 294 existing = rs.next(); 295 } 296 } 297 } 298 if (!existing) { 299 // insert new blob 300 logSQL(putSql, digest, "somebinary", Boolean.TRUE); 301 try (PreparedStatement ps = connection.prepareStatement(putSql)) { 302 ps.setString(1, digest); 303 // needs dbcp 1.4: 304 // ps.setBlob(2, new FileInputStream(file), file.length()); 305 try (FileInputStream tmpis = new FileInputStream(file)) { 306 ps.setBinaryStream(2, tmpis, (int) file.length()); 307 ps.setBoolean(3, true); // mark new additions for GC 308 try { 309 ps.execute(); 310 } catch (SQLException e) { 311 if (!isDuplicateKeyException(e)) { 312 throw e; 313 } 314 } 315 } 316 } 317 } 318 } catch (SQLException e) { 319 throw new IOException(e); 320 } finally { 321 if (connection != null) { 322 try { 323 connection.close(); 324 } catch (SQLException e) { 325 log.error(e, e); 326 } 327 } 328 } 329 } 330 331 @Override 332 public boolean fetchFile(String digest, File tmp) throws IOException { 333 Connection connection = null; 334 try { 335 connection = getConnection(); 336 logSQL(getSql, digest); 337 try (PreparedStatement ps = connection.prepareStatement(getSql)) { 338 ps.setString(1, digest); 339 try (ResultSet rs = ps.executeQuery()) { 340 if (!rs.next()) { 341 log.error("Unknown binary: " + digest); 342 return false; 343 } 344 try (InputStream in = rs.getBinaryStream(1)) { 345 if (in == null) { 346 log.error("Missing binary: " + digest); 347 return false; 348 } 349 // store in file 350 try (OutputStream out = new FileOutputStream(tmp)) { 351 IOUtils.copy(in, out); 352 } 353 } 354 } 355 } 356 return true; 357 } catch (SQLException e) { 358 throw new IOException(e); 359 } finally { 360 if (connection != null) { 361 try { 362 connection.close(); 363 } catch (SQLException e) { 364 log.error(e, e); 365 } 366 } 367 } 368 } 369 } 370 371 public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector { 372 373 protected final SQLBinaryManager binaryManager; 374 375 protected volatile long startTime; 376 377 protected BinaryManagerStatus status; 378 379 public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) { 380 this.binaryManager = binaryManager; 381 } 382 383 @Override 384 public String getId() { 385 return "datasource:" + binaryManager.dataSourceName; 386 } 387 388 @Override 389 public BinaryManagerStatus getStatus() { 390 return status; 391 } 392 393 @Override 394 public boolean isInProgress() { 395 // volatile as this is designed to be called from another thread 396 return startTime != 0; 397 } 398 399 @Override 400 public void start() { 401 if (startTime != 0) { 402 throw new RuntimeException("Already started"); 403 } 404 startTime = System.currentTimeMillis(); 405 status = new BinaryManagerStatus(); 406 407 Connection connection = null; 408 PreparedStatement ps = null; 409 try { 410 connection = binaryManager.getConnection(); 411 logSQL(binaryManager.gcStartSql, Boolean.FALSE); 412 ps = connection.prepareStatement(binaryManager.gcStartSql); 413 ps.setBoolean(1, false); // clear marks 414 int n = ps.executeUpdate(); 415 logSQL(" -> ? rows", Long.valueOf(n)); 416 } catch (SQLException e) { 417 throw new RuntimeException(e); 418 } finally { 419 if (ps != null) { 420 try { 421 ps.close(); 422 } catch (SQLException e) { 423 log.error(e, e); 424 } 425 } 426 if (connection != null) { 427 try { 428 connection.close(); 429 } catch (SQLException e) { 430 log.error(e, e); 431 } 432 } 433 } 434 } 435 436 @Override 437 public void mark(String digest) { 438 Connection connection = null; 439 PreparedStatement ps = null; 440 try { 441 connection = binaryManager.getConnection(); 442 logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest); 443 ps = connection.prepareStatement(binaryManager.gcMarkSql); 444 ps.setBoolean(1, true); // mark 445 ps.setString(2, digest); 446 ps.execute(); 447 } catch (SQLException e) { 448 throw new RuntimeException(e); 449 } finally { 450 if (ps != null) { 451 try { 452 ps.close(); 453 } catch (SQLException e) { 454 log.error(e, e); 455 } 456 } 457 if (connection != null) { 458 try { 459 connection.close(); 460 } catch (SQLException e) { 461 log.error(e, e); 462 } 463 } 464 } 465 } 466 467 @Override 468 public void stop(boolean delete) { 469 if (startTime == 0) { 470 throw new RuntimeException("Not started"); 471 } 472 473 Connection connection = null; 474 PreparedStatement ps = null; 475 try { 476 connection = binaryManager.getConnection(); 477 // stats 478 logSQL(binaryManager.gcStatsSql, Boolean.TRUE); 479 ps = connection.prepareStatement(binaryManager.gcStatsSql); 480 ps.setBoolean(1, true); // marked 481 try (ResultSet rs = ps.executeQuery()) { 482 rs.next(); 483 status.numBinaries = rs.getLong(1); 484 status.sizeBinaries = rs.getLong(2); 485 } 486 logSQL(" -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries)); 487 logSQL(binaryManager.gcStatsSql, Boolean.FALSE); 488 ps.setBoolean(1, false); // unmarked 489 try (ResultSet rs = ps.executeQuery()) { 490 rs.next(); 491 status.numBinariesGC = rs.getLong(1); 492 status.sizeBinariesGC = rs.getLong(2); 493 } 494 logSQL(" -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC)); 495 if (delete) { 496 // sweep 497 ps.close(); 498 logSQL(binaryManager.gcSweepSql, Boolean.FALSE); 499 ps = connection.prepareStatement(binaryManager.gcSweepSql); 500 ps.setBoolean(1, false); // sweep unmarked 501 int n = ps.executeUpdate(); 502 logSQL(" -> ? rows", Long.valueOf(n)); 503 } 504 } catch (SQLException e) { 505 throw new RuntimeException(e); 506 } finally { 507 if (ps != null) { 508 try { 509 ps.close(); 510 } catch (SQLException e) { 511 log.error(e, e); 512 } 513 } 514 if (connection != null) { 515 try { 516 connection.close(); 517 } catch (SQLException e) { 518 log.error(e, e); 519 } 520 } 521 } 522 523 status.gcDuration = System.currentTimeMillis() - startTime; 524 startTime = 0; 525 } 526 } 527 528}