001/* 002 * (C) Copyright 2010-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.io.Serializable; 028import java.sql.Connection; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.util.Map; 033 034import javax.naming.NamingException; 035import javax.sql.DataSource; 036 037import org.apache.commons.io.IOUtils; 038import org.apache.commons.lang.StringUtils; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.blob.binary.Binary; 042import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 043import org.nuxeo.ecm.core.blob.binary.BinaryManager; 044import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 045import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 046import org.nuxeo.ecm.core.blob.binary.FileStorage; 047import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 048import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 049import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 050import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 051import org.nuxeo.runtime.datasource.ConnectionHelper; 052import org.nuxeo.runtime.datasource.DataSourceHelper; 053 054/** 055 * A Binary Manager that stores binaries as SQL BLOBs. 056 * <p> 057 * The BLOBs are cached locally on first access for efficiency. 058 * <p> 059 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 060 * if accessed before the stream. 061 */ 062public class SQLBinaryManager extends CachingBinaryManager { 063 064 private static final Log log = LogFactory.getLog(SQLBinaryManager.class); 065 066 public static final String DS_PROP = "datasource"; 067 068 public static final String DS_PREFIX = "datasource="; 069 070 public static final String TABLE_PROP = "table"; 071 072 public static final String TABLE_PREFIX = "table="; 073 074 public static final String CACHE_SIZE_PROP = "cacheSize"; 075 076 public static final String CACHE_SIZE_PREFIX = "cachesize="; 077 078 public static final String DEFAULT_CACHE_SIZE = "10M"; 079 080 public static final String COL_ID = "id"; 081 082 public static final String COL_BIN = "bin"; 083 084 public static final String COL_MARK = "mark"; // for mark & sweep GC 085 086 protected String dataSourceName; 087 088 protected String checkSql; 089 090 protected String putSql; 091 092 protected String getSql; 093 094 protected String getLengthSql; 095 096 protected String gcStartSql; 097 098 protected String gcMarkSql; 099 100 protected String gcStatsSql; 101 102 protected String gcSweepSql; 103 104 protected static boolean disableCheckExisting; // for unit tests 105 106 protected static boolean resetCache; // for unit tests 107 108 @Override 109 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 110 super.initialize(blobProviderId, properties); 111 112 dataSourceName = null; 113 String tableName = null; 114 String cacheSizeStr = null; 115 String key = properties.get(BinaryManager.PROP_KEY); 116 key = StringUtils.defaultIfBlank(key, ""); 117 for (String part : key.split(",")) { 118 if (part.startsWith(DS_PREFIX)) { 119 dataSourceName = part.substring(DS_PREFIX.length()).trim(); 120 } 121 if (part.startsWith(TABLE_PREFIX)) { 122 tableName = part.substring(TABLE_PREFIX.length()).trim(); 123 } 124 if (part.startsWith(CACHE_SIZE_PREFIX)) { 125 cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim(); 126 } 127 } 128 if (StringUtils.isBlank(dataSourceName)) { 129 dataSourceName = properties.get(DS_PROP); 130 if (StringUtils.isBlank(dataSourceName)) { 131 throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration"); 132 } 133 } 134 if (StringUtils.isBlank(tableName)) { 135 tableName = properties.get(TABLE_PROP); 136 if (StringUtils.isBlank(tableName)) { 137 throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration"); 138 } 139 } 140 if (StringUtils.isBlank(cacheSizeStr)) { 141 cacheSizeStr = properties.get(CACHE_SIZE_PROP); 142 if (StringUtils.isBlank(cacheSizeStr)) { 143 cacheSizeStr = DEFAULT_CACHE_SIZE; 144 } 145 } 146 147 // create the SQL statements used 148 createSql(tableName); 149 150 // create file cache 151 initializeCache(cacheSizeStr, new SQLFileStorage()); 152 createGarbageCollector(); 153 } 154 155 protected void createGarbageCollector() { 156 garbageCollector = new SQLBinaryGarbageCollector(this); 157 } 158 159 protected void createSql(String tableName) throws IOException { 160 Dialect dialect = getDialect(); 161 Database database = new Database(dialect); 162 Table table = database.addTable(tableName); 163 ColumnType dummytype = ColumnType.STRING; 164 Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null); 165 Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null); 166 Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null); 167 168 checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName()); 169 putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(), 170 idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName()); 171 getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(), 172 idCol.getQuotedName()); 173 getLengthSql = String.format("SELECT %s(%s) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 174 binCol.getQuotedName(), table.getQuotedName(), idCol.getQuotedName()); 175 176 gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName()); 177 gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(), 178 idCol.getQuotedName()); 179 gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 180 binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName()); 181 gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName()); 182 } 183 184 protected Dialect getDialect() throws IOException { 185 Connection connection = null; 186 try { 187 connection = getConnection(); 188 return Dialect.createDialect(connection, null); 189 } catch (SQLException e) { 190 throw new IOException(e); 191 } finally { 192 if (connection != null) { 193 try { 194 connection.close(); 195 } catch (SQLException e) { 196 log.error(e, e); 197 } 198 } 199 } 200 } 201 202 protected Connection getConnection() throws SQLException { 203 return ConnectionHelper.getConnection(dataSourceName); 204 } 205 206 protected static void logSQL(String sql, Serializable... values) { 207 if (!log.isTraceEnabled()) { 208 return; 209 } 210 StringBuilder buf = new StringBuilder(); 211 int start = 0; 212 for (Serializable v : values) { 213 int index = sql.indexOf('?', start); 214 if (index == -1) { 215 // mismatch between number of ? and number of values 216 break; 217 } 218 buf.append(sql, start, index); 219 buf.append(loggedValue(v)); 220 start = index + 1; 221 } 222 buf.append(sql, start, sql.length()); 223 log.trace("(bin) SQL: " + buf.toString()); 224 } 225 226 protected static String loggedValue(Serializable value) { 227 if (value == null) { 228 return "NULL"; 229 } 230 if (value instanceof String) { 231 String v = (String) value; 232 return "'" + v.replace("'", "''") + "'"; 233 } 234 return value.toString(); 235 } 236 237 protected static boolean isDuplicateKeyException(SQLException e) { 238 String sqlState = e.getSQLState(); 239 if ("23000".equals(sqlState)) { 240 // MySQL: Duplicate entry ... for key ... 241 // Oracle: unique constraint ... violated 242 // SQL Server: Violation of PRIMARY KEY constraint 243 return true; 244 } 245 if ("23001".equals(sqlState)) { 246 // H2: Unique index or primary key violation 247 return true; 248 } 249 if ("23505".equals(sqlState)) { 250 // H2: Unique index or primary key violation 251 // PostgreSQL: duplicate key value violates unique constraint 252 return true; 253 } 254 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 255 // SQL Server: Snapshot isolation transaction aborted due to update 256 // conflict 257 return true; 258 } 259 return false; 260 } 261 262 @Override 263 public Binary getBinary(String digest) { 264 if (resetCache) { 265 // for unit tests 266 resetCache = false; 267 fileCache.clear(); 268 } 269 return super.getBinary(digest); 270 } 271 272 @Override 273 public Binary getBinary(InputStream in) throws IOException { 274 if (resetCache) { 275 // for unit tests 276 resetCache = false; 277 fileCache.clear(); 278 } 279 return super.getBinary(in); 280 } 281 282 public class SQLFileStorage implements FileStorage { 283 284 @Override 285 public void storeFile(String digest, File file) throws IOException { 286 Connection connection = null; 287 try { 288 connection = getConnection(); 289 boolean existing; 290 if (disableCheckExisting) { 291 // for unit tests 292 existing = false; 293 } else { 294 logSQL(checkSql, digest); 295 PreparedStatement ps = connection.prepareStatement(checkSql); 296 ps.setString(1, digest); 297 ResultSet rs = ps.executeQuery(); 298 existing = rs.next(); 299 ps.close(); 300 } 301 if (!existing) { 302 // insert new blob 303 logSQL(putSql, digest, "somebinary", Boolean.TRUE); 304 PreparedStatement ps = connection.prepareStatement(putSql); 305 ps.setString(1, digest); 306 // needs dbcp 1.4: 307 // ps.setBlob(2, new FileInputStream(file), file.length()); 308 FileInputStream tmpis = new FileInputStream(file); 309 try { 310 ps.setBinaryStream(2, tmpis, (int) file.length()); 311 ps.setBoolean(3, true); // mark new additions for GC 312 try { 313 ps.execute(); 314 } catch (SQLException e) { 315 if (!isDuplicateKeyException(e)) { 316 throw e; 317 } 318 } 319 } finally { 320 IOUtils.closeQuietly(tmpis); 321 } 322 ps.close(); 323 } 324 } catch (SQLException e) { 325 throw new IOException(e); 326 } finally { 327 if (connection != null) { 328 try { 329 connection.close(); 330 } catch (SQLException e) { 331 log.error(e, e); 332 } 333 } 334 } 335 } 336 337 @Override 338 public boolean fetchFile(String digest, File tmp) throws IOException { 339 Connection connection = null; 340 try { 341 connection = getConnection(); 342 logSQL(getSql, digest); 343 PreparedStatement ps = connection.prepareStatement(getSql); 344 ps.setString(1, digest); 345 ResultSet rs = ps.executeQuery(); 346 if (!rs.next()) { 347 log.error("Unknown binary: " + digest); 348 return false; 349 } 350 InputStream in = rs.getBinaryStream(1); 351 OutputStream out = null; 352 try { 353 if (in == null) { 354 log.error("Missing binary: " + digest); 355 return false; 356 } 357 // store in file 358 out = new FileOutputStream(tmp); 359 IOUtils.copy(in, out); 360 } finally { 361 IOUtils.closeQuietly(in); 362 IOUtils.closeQuietly(out); 363 } 364 return true; 365 } catch (SQLException e) { 366 throw new IOException(e); 367 } finally { 368 if (connection != null) { 369 try { 370 connection.close(); 371 } catch (SQLException e) { 372 log.error(e, e); 373 } 374 } 375 } 376 } 377 378 @Override 379 public Long fetchLength(String digest) throws IOException { 380 Connection connection = null; 381 try { 382 connection = getConnection(); 383 logSQL(getLengthSql, digest); 384 PreparedStatement ps = connection.prepareStatement(getLengthSql); 385 ps.setString(1, digest); 386 ResultSet rs = ps.executeQuery(); 387 if (!rs.next()) { 388 log.error("Unknown binary: " + digest); 389 return null; 390 } 391 return Long.valueOf(rs.getLong(1)); 392 } catch (SQLException e) { 393 throw new IOException(e); 394 } finally { 395 if (connection != null) { 396 try { 397 connection.close(); 398 } catch (SQLException e) { 399 log.error(e, e); 400 } 401 } 402 } 403 } 404 } 405 406 public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector { 407 408 protected final SQLBinaryManager binaryManager; 409 410 protected volatile long startTime; 411 412 protected BinaryManagerStatus status; 413 414 public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) { 415 this.binaryManager = binaryManager; 416 } 417 418 @Override 419 public String getId() { 420 return "datasource:" + binaryManager.dataSourceName; 421 } 422 423 @Override 424 public BinaryManagerStatus getStatus() { 425 return status; 426 } 427 428 @Override 429 public boolean isInProgress() { 430 // volatile as this is designed to be called from another thread 431 return startTime != 0; 432 } 433 434 @Override 435 public void start() { 436 if (startTime != 0) { 437 throw new RuntimeException("Already started"); 438 } 439 startTime = System.currentTimeMillis(); 440 status = new BinaryManagerStatus(); 441 442 Connection connection = null; 443 PreparedStatement ps = null; 444 try { 445 connection = binaryManager.getConnection(); 446 logSQL(binaryManager.gcStartSql, Boolean.FALSE); 447 ps = connection.prepareStatement(binaryManager.gcStartSql); 448 ps.setBoolean(1, false); // clear marks 449 int n = ps.executeUpdate(); 450 logSQL(" -> ? rows", Long.valueOf(n)); 451 } catch (SQLException e) { 452 throw new RuntimeException(e); 453 } finally { 454 if (ps != null) { 455 try { 456 ps.close(); 457 } catch (SQLException e) { 458 log.error(e, e); 459 } 460 } 461 if (connection != null) { 462 try { 463 connection.close(); 464 } catch (SQLException e) { 465 log.error(e, e); 466 } 467 } 468 } 469 } 470 471 @Override 472 public void mark(String digest) { 473 Connection connection = null; 474 PreparedStatement ps = null; 475 try { 476 connection = binaryManager.getConnection(); 477 logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest); 478 ps = connection.prepareStatement(binaryManager.gcMarkSql); 479 ps.setBoolean(1, true); // mark 480 ps.setString(2, digest); 481 ps.execute(); 482 } catch (SQLException e) { 483 throw new RuntimeException(e); 484 } finally { 485 if (ps != null) { 486 try { 487 ps.close(); 488 } catch (SQLException e) { 489 log.error(e, e); 490 } 491 } 492 if (connection != null) { 493 try { 494 connection.close(); 495 } catch (SQLException e) { 496 log.error(e, e); 497 } 498 } 499 } 500 } 501 502 @Override 503 public void stop(boolean delete) { 504 if (startTime == 0) { 505 throw new RuntimeException("Not started"); 506 } 507 508 Connection connection = null; 509 PreparedStatement ps = null; 510 try { 511 connection = binaryManager.getConnection(); 512 // stats 513 logSQL(binaryManager.gcStatsSql, Boolean.TRUE); 514 ps = connection.prepareStatement(binaryManager.gcStatsSql); 515 ps.setBoolean(1, true); // marked 516 ResultSet rs = ps.executeQuery(); 517 rs.next(); 518 status.numBinaries = rs.getLong(1); 519 status.sizeBinaries = rs.getLong(2); 520 logSQL(" -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries)); 521 logSQL(binaryManager.gcStatsSql, Boolean.FALSE); 522 ps.setBoolean(1, false); // unmarked 523 rs = ps.executeQuery(); 524 rs.next(); 525 status.numBinariesGC = rs.getLong(1); 526 status.sizeBinariesGC = rs.getLong(2); 527 logSQL(" -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC)); 528 if (delete) { 529 // sweep 530 ps.close(); 531 logSQL(binaryManager.gcSweepSql, Boolean.FALSE); 532 ps = connection.prepareStatement(binaryManager.gcSweepSql); 533 ps.setBoolean(1, false); // sweep unmarked 534 int n = ps.executeUpdate(); 535 logSQL(" -> ? rows", Long.valueOf(n)); 536 } 537 } catch (SQLException e) { 538 throw new RuntimeException(e); 539 } finally { 540 if (ps != null) { 541 try { 542 ps.close(); 543 } catch (SQLException e) { 544 log.error(e, e); 545 } 546 } 547 if (connection != null) { 548 try { 549 connection.close(); 550 } catch (SQLException e) { 551 log.error(e, e); 552 } 553 } 554 } 555 556 status.gcDuration = System.currentTimeMillis() - startTime; 557 startTime = 0; 558 } 559 } 560 561}