001/* 002 * (C) Copyright 2010-2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.storage.sql; 018 019import java.io.File; 020import java.io.FileInputStream; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.io.Serializable; 026import java.sql.Connection; 027import java.sql.PreparedStatement; 028import java.sql.ResultSet; 029import java.sql.SQLException; 030import java.util.Map; 031 032import javax.naming.NamingException; 033import javax.sql.DataSource; 034 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.lang.StringUtils; 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.blob.binary.Binary; 040import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 041import org.nuxeo.ecm.core.blob.binary.BinaryManager; 042import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 043import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 044import org.nuxeo.ecm.core.blob.binary.FileStorage; 045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 047import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 048import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 049import org.nuxeo.runtime.datasource.ConnectionHelper; 050import org.nuxeo.runtime.datasource.DataSourceHelper; 051 052/** 053 * A Binary Manager that stores binaries as SQL BLOBs. 054 * <p> 055 * The BLOBs are cached locally on first access for efficiency. 056 * <p> 057 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 058 * if accessed before the stream. 059 */ 060public class SQLBinaryManager extends CachingBinaryManager { 061 062 private static final Log log = LogFactory.getLog(SQLBinaryManager.class); 063 064 public static final String DS_PROP = "datasource"; 065 066 public static final String DS_PREFIX = "datasource="; 067 068 public static final String TABLE_PROP = "table"; 069 070 public static final String TABLE_PREFIX = "table="; 071 072 public static final String CACHE_SIZE_PROP = "cacheSize"; 073 074 public static final String CACHE_SIZE_PREFIX = "cachesize="; 075 076 public static final String DEFAULT_CACHE_SIZE = "10M"; 077 078 public static final String COL_ID = "id"; 079 080 public static final String COL_BIN = "bin"; 081 082 public static final String COL_MARK = "mark"; // for mark & sweep GC 083 084 protected String dataSourceName; 085 086 protected String checkSql; 087 088 protected String putSql; 089 090 protected String getSql; 091 092 protected String getLengthSql; 093 094 protected String gcStartSql; 095 096 protected String gcMarkSql; 097 098 protected String gcStatsSql; 099 100 protected String gcSweepSql; 101 102 protected static boolean disableCheckExisting; // for unit tests 103 104 protected static boolean resetCache; // for unit tests 105 106 @Override 107 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 108 super.initialize(blobProviderId, properties); 109 110 dataSourceName = null; 111 String tableName = null; 112 String cacheSizeStr = null; 113 String key = properties.get(BinaryManager.PROP_KEY); 114 key = StringUtils.defaultIfBlank(key, ""); 115 for (String part : key.split(",")) { 116 if (part.startsWith(DS_PREFIX)) { 117 dataSourceName = part.substring(DS_PREFIX.length()).trim(); 118 } 119 if (part.startsWith(TABLE_PREFIX)) { 120 tableName = part.substring(TABLE_PREFIX.length()).trim(); 121 } 122 if (part.startsWith(CACHE_SIZE_PREFIX)) { 123 cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim(); 124 } 125 } 126 if (StringUtils.isBlank(dataSourceName)) { 127 dataSourceName = properties.get(DS_PROP); 128 if (StringUtils.isBlank(dataSourceName)) { 129 throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration"); 130 } 131 } 132 if (StringUtils.isBlank(tableName)) { 133 tableName = properties.get(TABLE_PROP); 134 if (StringUtils.isBlank(tableName)) { 135 throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration"); 136 } 137 } 138 if (StringUtils.isBlank(cacheSizeStr)) { 139 cacheSizeStr = properties.get(CACHE_SIZE_PROP); 140 if (StringUtils.isBlank(cacheSizeStr)) { 141 cacheSizeStr = DEFAULT_CACHE_SIZE; 142 } 143 } 144 145 // create the SQL statements used 146 createSql(tableName); 147 148 // create file cache 149 initializeCache(cacheSizeStr, new SQLFileStorage()); 150 createGarbageCollector(); 151 } 152 153 protected void createGarbageCollector() { 154 garbageCollector = new SQLBinaryGarbageCollector(this); 155 } 156 157 protected void createSql(String tableName) throws IOException { 158 Dialect dialect = getDialect(); 159 Database database = new Database(dialect); 160 Table table = database.addTable(tableName); 161 ColumnType dummytype = ColumnType.STRING; 162 Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null); 163 Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null); 164 Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null); 165 166 checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName()); 167 putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(), 168 idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName()); 169 getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(), 170 idCol.getQuotedName()); 171 getLengthSql = String.format("SELECT %s(%s) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 172 binCol.getQuotedName(), table.getQuotedName(), idCol.getQuotedName()); 173 174 gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName()); 175 gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(), 176 idCol.getQuotedName()); 177 gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 178 binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName()); 179 gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName()); 180 } 181 182 protected Dialect getDialect() throws IOException { 183 Connection connection = null; 184 try { 185 connection = getConnection(); 186 return Dialect.createDialect(connection, null); 187 } catch (SQLException e) { 188 throw new IOException(e); 189 } finally { 190 if (connection != null) { 191 try { 192 connection.close(); 193 } catch (SQLException e) { 194 log.error(e, e); 195 } 196 } 197 } 198 } 199 200 protected Connection getConnection() throws SQLException { 201 return ConnectionHelper.getConnection(dataSourceName); 202 } 203 204 protected static void logSQL(String sql, Serializable... values) { 205 if (!log.isTraceEnabled()) { 206 return; 207 } 208 StringBuilder buf = new StringBuilder(); 209 int start = 0; 210 for (Serializable v : values) { 211 int index = sql.indexOf('?', start); 212 if (index == -1) { 213 // mismatch between number of ? and number of values 214 break; 215 } 216 buf.append(sql, start, index); 217 buf.append(loggedValue(v)); 218 start = index + 1; 219 } 220 buf.append(sql, start, sql.length()); 221 log.trace("(bin) SQL: " + buf.toString()); 222 } 223 224 protected static String loggedValue(Serializable value) { 225 if (value == null) { 226 return "NULL"; 227 } 228 if (value instanceof String) { 229 String v = (String) value; 230 return "'" + v.replace("'", "''") + "'"; 231 } 232 return value.toString(); 233 } 234 235 protected static boolean isDuplicateKeyException(SQLException e) { 236 String sqlState = e.getSQLState(); 237 if ("23000".equals(sqlState)) { 238 // MySQL: Duplicate entry ... for key ... 239 // Oracle: unique constraint ... violated 240 // SQL Server: Violation of PRIMARY KEY constraint 241 return true; 242 } 243 if ("23001".equals(sqlState)) { 244 // H2: Unique index or primary key violation 245 return true; 246 } 247 if ("23505".equals(sqlState)) { 248 // H2: Unique index or primary key violation 249 // PostgreSQL: duplicate key value violates unique constraint 250 return true; 251 } 252 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 253 // SQL Server: Snapshot isolation transaction aborted due to update 254 // conflict 255 return true; 256 } 257 return false; 258 } 259 260 @Override 261 public Binary getBinary(String digest) { 262 if (resetCache) { 263 // for unit tests 264 resetCache = false; 265 fileCache.clear(); 266 } 267 return super.getBinary(digest); 268 } 269 270 @Override 271 public Binary getBinary(InputStream in) throws IOException { 272 if (resetCache) { 273 // for unit tests 274 resetCache = false; 275 fileCache.clear(); 276 } 277 return super.getBinary(in); 278 } 279 280 public class SQLFileStorage implements FileStorage { 281 282 @Override 283 public void storeFile(String digest, File file) throws IOException { 284 Connection connection = null; 285 try { 286 connection = getConnection(); 287 boolean existing; 288 if (disableCheckExisting) { 289 // for unit tests 290 existing = false; 291 } else { 292 logSQL(checkSql, digest); 293 PreparedStatement ps = connection.prepareStatement(checkSql); 294 ps.setString(1, digest); 295 ResultSet rs = ps.executeQuery(); 296 existing = rs.next(); 297 ps.close(); 298 } 299 if (!existing) { 300 // insert new blob 301 logSQL(putSql, digest, "somebinary", Boolean.TRUE); 302 PreparedStatement ps = connection.prepareStatement(putSql); 303 ps.setString(1, digest); 304 // needs dbcp 1.4: 305 // ps.setBlob(2, new FileInputStream(file), file.length()); 306 FileInputStream tmpis = new FileInputStream(file); 307 try { 308 ps.setBinaryStream(2, tmpis, (int) file.length()); 309 ps.setBoolean(3, true); // mark new additions for GC 310 try { 311 ps.execute(); 312 } catch (SQLException e) { 313 if (!isDuplicateKeyException(e)) { 314 throw e; 315 } 316 } 317 } finally { 318 IOUtils.closeQuietly(tmpis); 319 } 320 ps.close(); 321 } 322 } catch (SQLException e) { 323 throw new IOException(e); 324 } finally { 325 if (connection != null) { 326 try { 327 connection.close(); 328 } catch (SQLException e) { 329 log.error(e, e); 330 } 331 } 332 } 333 } 334 335 @Override 336 public boolean fetchFile(String digest, File tmp) throws IOException { 337 Connection connection = null; 338 try { 339 connection = getConnection(); 340 logSQL(getSql, digest); 341 PreparedStatement ps = connection.prepareStatement(getSql); 342 ps.setString(1, digest); 343 ResultSet rs = ps.executeQuery(); 344 if (!rs.next()) { 345 log.error("Unknown binary: " + digest); 346 return false; 347 } 348 InputStream in = rs.getBinaryStream(1); 349 OutputStream out = null; 350 try { 351 if (in == null) { 352 log.error("Missing binary: " + digest); 353 return false; 354 } 355 // store in file 356 out = new FileOutputStream(tmp); 357 IOUtils.copy(in, out); 358 } finally { 359 IOUtils.closeQuietly(in); 360 IOUtils.closeQuietly(out); 361 } 362 return true; 363 } catch (SQLException e) { 364 throw new IOException(e); 365 } finally { 366 if (connection != null) { 367 try { 368 connection.close(); 369 } catch (SQLException e) { 370 log.error(e, e); 371 } 372 } 373 } 374 } 375 376 @Override 377 public Long fetchLength(String digest) throws IOException { 378 Connection connection = null; 379 try { 380 connection = getConnection(); 381 logSQL(getLengthSql, digest); 382 PreparedStatement ps = connection.prepareStatement(getLengthSql); 383 ps.setString(1, digest); 384 ResultSet rs = ps.executeQuery(); 385 if (!rs.next()) { 386 log.error("Unknown binary: " + digest); 387 return null; 388 } 389 return Long.valueOf(rs.getLong(1)); 390 } catch (SQLException e) { 391 throw new IOException(e); 392 } finally { 393 if (connection != null) { 394 try { 395 connection.close(); 396 } catch (SQLException e) { 397 log.error(e, e); 398 } 399 } 400 } 401 } 402 } 403 404 public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector { 405 406 protected final SQLBinaryManager binaryManager; 407 408 protected volatile long startTime; 409 410 protected BinaryManagerStatus status; 411 412 public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) { 413 this.binaryManager = binaryManager; 414 } 415 416 @Override 417 public String getId() { 418 return "datasource:" + binaryManager.dataSourceName; 419 } 420 421 @Override 422 public BinaryManagerStatus getStatus() { 423 return status; 424 } 425 426 @Override 427 public boolean isInProgress() { 428 // volatile as this is designed to be called from another thread 429 return startTime != 0; 430 } 431 432 @Override 433 public void start() { 434 if (startTime != 0) { 435 throw new RuntimeException("Already started"); 436 } 437 startTime = System.currentTimeMillis(); 438 status = new BinaryManagerStatus(); 439 440 Connection connection = null; 441 PreparedStatement ps = null; 442 try { 443 connection = binaryManager.getConnection(); 444 logSQL(binaryManager.gcStartSql, Boolean.FALSE); 445 ps = connection.prepareStatement(binaryManager.gcStartSql); 446 ps.setBoolean(1, false); // clear marks 447 int n = ps.executeUpdate(); 448 logSQL(" -> ? rows", Long.valueOf(n)); 449 } catch (SQLException e) { 450 throw new RuntimeException(e); 451 } finally { 452 if (ps != null) { 453 try { 454 ps.close(); 455 } catch (SQLException e) { 456 log.error(e, e); 457 } 458 } 459 if (connection != null) { 460 try { 461 connection.close(); 462 } catch (SQLException e) { 463 log.error(e, e); 464 } 465 } 466 } 467 } 468 469 @Override 470 public void mark(String digest) { 471 Connection connection = null; 472 PreparedStatement ps = null; 473 try { 474 connection = binaryManager.getConnection(); 475 logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest); 476 ps = connection.prepareStatement(binaryManager.gcMarkSql); 477 ps.setBoolean(1, true); // mark 478 ps.setString(2, digest); 479 ps.execute(); 480 } catch (SQLException e) { 481 throw new RuntimeException(e); 482 } finally { 483 if (ps != null) { 484 try { 485 ps.close(); 486 } catch (SQLException e) { 487 log.error(e, e); 488 } 489 } 490 if (connection != null) { 491 try { 492 connection.close(); 493 } catch (SQLException e) { 494 log.error(e, e); 495 } 496 } 497 } 498 } 499 500 @Override 501 public void stop(boolean delete) { 502 if (startTime == 0) { 503 throw new RuntimeException("Not started"); 504 } 505 506 Connection connection = null; 507 PreparedStatement ps = null; 508 try { 509 connection = binaryManager.getConnection(); 510 // stats 511 logSQL(binaryManager.gcStatsSql, Boolean.TRUE); 512 ps = connection.prepareStatement(binaryManager.gcStatsSql); 513 ps.setBoolean(1, true); // marked 514 ResultSet rs = ps.executeQuery(); 515 rs.next(); 516 status.numBinaries = rs.getLong(1); 517 status.sizeBinaries = rs.getLong(2); 518 logSQL(" -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries)); 519 logSQL(binaryManager.gcStatsSql, Boolean.FALSE); 520 ps.setBoolean(1, false); // unmarked 521 rs = ps.executeQuery(); 522 rs.next(); 523 status.numBinariesGC = rs.getLong(1); 524 status.sizeBinariesGC = rs.getLong(2); 525 logSQL(" -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC)); 526 if (delete) { 527 // sweep 528 ps.close(); 529 logSQL(binaryManager.gcSweepSql, Boolean.FALSE); 530 ps = connection.prepareStatement(binaryManager.gcSweepSql); 531 ps.setBoolean(1, false); // sweep unmarked 532 int n = ps.executeUpdate(); 533 logSQL(" -> ? rows", Long.valueOf(n)); 534 } 535 } catch (SQLException e) { 536 throw new RuntimeException(e); 537 } finally { 538 if (ps != null) { 539 try { 540 ps.close(); 541 } catch (SQLException e) { 542 log.error(e, e); 543 } 544 } 545 if (connection != null) { 546 try { 547 connection.close(); 548 } catch (SQLException e) { 549 log.error(e, e); 550 } 551 } 552 } 553 554 status.gcDuration = System.currentTimeMillis() - startTime; 555 startTime = 0; 556 } 557 } 558 559}