001/* 002 * (C) Copyright 2010-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.io.Serializable; 028import java.sql.Connection; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.util.Map; 033 034import org.apache.commons.io.IOUtils; 035import org.apache.commons.lang.StringUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.blob.binary.Binary; 039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 040import org.nuxeo.ecm.core.blob.binary.BinaryManager; 041import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 042import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager; 043import org.nuxeo.ecm.core.blob.binary.FileStorage; 044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 047import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 048import org.nuxeo.runtime.datasource.ConnectionHelper; 049 050/** 051 * A Binary Manager that stores binaries as SQL BLOBs. 052 * <p> 053 * The BLOBs are cached locally on first access for efficiency. 054 * <p> 055 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file 056 * if accessed before the stream. 057 */ 058public class SQLBinaryManager extends CachingBinaryManager { 059 060 private static final Log log = LogFactory.getLog(SQLBinaryManager.class); 061 062 public static final String DS_PROP = "datasource"; 063 064 public static final String DS_PREFIX = "datasource="; 065 066 public static final String TABLE_PROP = "table"; 067 068 public static final String TABLE_PREFIX = "table="; 069 070 public static final String CACHE_SIZE_PROP = "cacheSize"; 071 072 public static final String CACHE_SIZE_PREFIX = "cachesize="; 073 074 public static final String DEFAULT_CACHE_SIZE = "10M"; 075 076 public static final String COL_ID = "id"; 077 078 public static final String COL_BIN = "bin"; 079 080 public static final String COL_MARK = "mark"; // for mark & sweep GC 081 082 protected String dataSourceName; 083 084 protected String checkSql; 085 086 protected String putSql; 087 088 protected String getSql; 089 090 protected String gcStartSql; 091 092 protected String gcMarkSql; 093 094 protected String gcStatsSql; 095 096 protected String gcSweepSql; 097 098 protected static boolean disableCheckExisting; // for unit tests 099 100 protected static boolean resetCache; // for unit tests 101 102 @Override 103 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 104 super.initialize(blobProviderId, properties); 105 106 dataSourceName = null; 107 String tableName = null; 108 String cacheSizeStr = null; 109 String key = properties.get(BinaryManager.PROP_KEY); 110 key = StringUtils.defaultIfBlank(key, ""); 111 for (String part : key.split(",")) { 112 if (part.startsWith(DS_PREFIX)) { 113 dataSourceName = part.substring(DS_PREFIX.length()).trim(); 114 } 115 if (part.startsWith(TABLE_PREFIX)) { 116 tableName = part.substring(TABLE_PREFIX.length()).trim(); 117 } 118 if (part.startsWith(CACHE_SIZE_PREFIX)) { 119 cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim(); 120 } 121 } 122 if (StringUtils.isBlank(dataSourceName)) { 123 dataSourceName = properties.get(DS_PROP); 124 if (StringUtils.isBlank(dataSourceName)) { 125 throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration"); 126 } 127 } 128 if (StringUtils.isBlank(tableName)) { 129 tableName = properties.get(TABLE_PROP); 130 if (StringUtils.isBlank(tableName)) { 131 throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration"); 132 } 133 } 134 if (StringUtils.isBlank(cacheSizeStr)) { 135 cacheSizeStr = properties.get(CACHE_SIZE_PROP); 136 if (StringUtils.isBlank(cacheSizeStr)) { 137 cacheSizeStr = DEFAULT_CACHE_SIZE; 138 } 139 } 140 141 // create the SQL statements used 142 createSql(tableName); 143 144 // create file cache 145 initializeCache(cacheSizeStr, new SQLFileStorage()); 146 createGarbageCollector(); 147 } 148 149 protected void createGarbageCollector() { 150 garbageCollector = new SQLBinaryGarbageCollector(this); 151 } 152 153 protected void createSql(String tableName) throws IOException { 154 Dialect dialect = getDialect(); 155 Database database = new Database(dialect); 156 Table table = database.addTable(tableName); 157 ColumnType dummytype = ColumnType.STRING; 158 Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null); 159 Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null); 160 Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null); 161 162 checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName()); 163 putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(), 164 idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName()); 165 getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(), 166 idCol.getQuotedName()); 167 168 gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName()); 169 gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(), 170 idCol.getQuotedName()); 171 gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(), 172 binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName()); 173 gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName()); 174 } 175 176 protected Dialect getDialect() throws IOException { 177 Connection connection = null; 178 try { 179 connection = getConnection(); 180 return Dialect.createDialect(connection, null); 181 } catch (SQLException e) { 182 throw new IOException(e); 183 } finally { 184 if (connection != null) { 185 try { 186 connection.close(); 187 } catch (SQLException e) { 188 log.error(e, e); 189 } 190 } 191 } 192 } 193 194 protected Connection getConnection() throws SQLException { 195 return ConnectionHelper.getConnection(dataSourceName); 196 } 197 198 protected static void logSQL(String sql, Serializable... values) { 199 if (!log.isTraceEnabled()) { 200 return; 201 } 202 StringBuilder buf = new StringBuilder(); 203 int start = 0; 204 for (Serializable v : values) { 205 int index = sql.indexOf('?', start); 206 if (index == -1) { 207 // mismatch between number of ? and number of values 208 break; 209 } 210 buf.append(sql, start, index); 211 buf.append(loggedValue(v)); 212 start = index + 1; 213 } 214 buf.append(sql, start, sql.length()); 215 log.trace("(bin) SQL: " + buf.toString()); 216 } 217 218 protected static String loggedValue(Serializable value) { 219 if (value == null) { 220 return "NULL"; 221 } 222 if (value instanceof String) { 223 String v = (String) value; 224 return "'" + v.replace("'", "''") + "'"; 225 } 226 return value.toString(); 227 } 228 229 protected static boolean isDuplicateKeyException(SQLException e) { 230 String sqlState = e.getSQLState(); 231 if ("23000".equals(sqlState)) { 232 // MySQL: Duplicate entry ... for key ... 233 // Oracle: unique constraint ... violated 234 // SQL Server: Violation of PRIMARY KEY constraint 235 return true; 236 } 237 if ("23001".equals(sqlState)) { 238 // H2: Unique index or primary key violation 239 return true; 240 } 241 if ("23505".equals(sqlState)) { 242 // H2: Unique index or primary key violation 243 // PostgreSQL: duplicate key value violates unique constraint 244 return true; 245 } 246 if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) { 247 // SQL Server: Snapshot isolation transaction aborted due to update 248 // conflict 249 return true; 250 } 251 return false; 252 } 253 254 @Override 255 public Binary getBinary(String digest) { 256 if (resetCache) { 257 // for unit tests 258 resetCache = false; 259 fileCache.clear(); 260 } 261 return super.getBinary(digest); 262 } 263 264 @Override 265 public Binary getBinary(InputStream in) throws IOException { 266 if (resetCache) { 267 // for unit tests 268 resetCache = false; 269 fileCache.clear(); 270 } 271 return super.getBinary(in); 272 } 273 274 public class SQLFileStorage implements FileStorage { 275 276 @Override 277 public void storeFile(String digest, File file) throws IOException { 278 Connection connection = null; 279 try { 280 connection = getConnection(); 281 boolean existing; 282 if (disableCheckExisting) { 283 // for unit tests 284 existing = false; 285 } else { 286 logSQL(checkSql, digest); 287 PreparedStatement ps = connection.prepareStatement(checkSql); 288 ps.setString(1, digest); 289 ResultSet rs = ps.executeQuery(); 290 existing = rs.next(); 291 ps.close(); 292 } 293 if (!existing) { 294 // insert new blob 295 logSQL(putSql, digest, "somebinary", Boolean.TRUE); 296 PreparedStatement ps = connection.prepareStatement(putSql); 297 ps.setString(1, digest); 298 // needs dbcp 1.4: 299 // ps.setBlob(2, new FileInputStream(file), file.length()); 300 FileInputStream tmpis = new FileInputStream(file); 301 try { 302 ps.setBinaryStream(2, tmpis, (int) file.length()); 303 ps.setBoolean(3, true); // mark new additions for GC 304 try { 305 ps.execute(); 306 } catch (SQLException e) { 307 if (!isDuplicateKeyException(e)) { 308 throw e; 309 } 310 } 311 } finally { 312 IOUtils.closeQuietly(tmpis); 313 } 314 ps.close(); 315 } 316 } catch (SQLException e) { 317 throw new IOException(e); 318 } finally { 319 if (connection != null) { 320 try { 321 connection.close(); 322 } catch (SQLException e) { 323 log.error(e, e); 324 } 325 } 326 } 327 } 328 329 @Override 330 public boolean fetchFile(String digest, File tmp) throws IOException { 331 Connection connection = null; 332 try { 333 connection = getConnection(); 334 logSQL(getSql, digest); 335 PreparedStatement ps = connection.prepareStatement(getSql); 336 ps.setString(1, digest); 337 ResultSet rs = ps.executeQuery(); 338 if (!rs.next()) { 339 log.error("Unknown binary: " + digest); 340 return false; 341 } 342 InputStream in = rs.getBinaryStream(1); 343 OutputStream out = null; 344 try { 345 if (in == null) { 346 log.error("Missing binary: " + digest); 347 return false; 348 } 349 // store in file 350 out = new FileOutputStream(tmp); 351 IOUtils.copy(in, out); 352 } finally { 353 IOUtils.closeQuietly(in); 354 IOUtils.closeQuietly(out); 355 } 356 return true; 357 } catch (SQLException e) { 358 throw new IOException(e); 359 } finally { 360 if (connection != null) { 361 try { 362 connection.close(); 363 } catch (SQLException e) { 364 log.error(e, e); 365 } 366 } 367 } 368 } 369 } 370 371 public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector { 372 373 protected final SQLBinaryManager binaryManager; 374 375 protected volatile long startTime; 376 377 protected BinaryManagerStatus status; 378 379 public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) { 380 this.binaryManager = binaryManager; 381 } 382 383 @Override 384 public String getId() { 385 return "datasource:" + binaryManager.dataSourceName; 386 } 387 388 @Override 389 public BinaryManagerStatus getStatus() { 390 return status; 391 } 392 393 @Override 394 public boolean isInProgress() { 395 // volatile as this is designed to be called from another thread 396 return startTime != 0; 397 } 398 399 @Override 400 public void start() { 401 if (startTime != 0) { 402 throw new RuntimeException("Already started"); 403 } 404 startTime = System.currentTimeMillis(); 405 status = new BinaryManagerStatus(); 406 407 Connection connection = null; 408 PreparedStatement ps = null; 409 try { 410 connection = binaryManager.getConnection(); 411 logSQL(binaryManager.gcStartSql, Boolean.FALSE); 412 ps = connection.prepareStatement(binaryManager.gcStartSql); 413 ps.setBoolean(1, false); // clear marks 414 int n = ps.executeUpdate(); 415 logSQL(" -> ? rows", Long.valueOf(n)); 416 } catch (SQLException e) { 417 throw new RuntimeException(e); 418 } finally { 419 if (ps != null) { 420 try { 421 ps.close(); 422 } catch (SQLException e) { 423 log.error(e, e); 424 } 425 } 426 if (connection != null) { 427 try { 428 connection.close(); 429 } catch (SQLException e) { 430 log.error(e, e); 431 } 432 } 433 } 434 } 435 436 @Override 437 public void mark(String digest) { 438 Connection connection = null; 439 PreparedStatement ps = null; 440 try { 441 connection = binaryManager.getConnection(); 442 logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest); 443 ps = connection.prepareStatement(binaryManager.gcMarkSql); 444 ps.setBoolean(1, true); // mark 445 ps.setString(2, digest); 446 ps.execute(); 447 } catch (SQLException e) { 448 throw new RuntimeException(e); 449 } finally { 450 if (ps != null) { 451 try { 452 ps.close(); 453 } catch (SQLException e) { 454 log.error(e, e); 455 } 456 } 457 if (connection != null) { 458 try { 459 connection.close(); 460 } catch (SQLException e) { 461 log.error(e, e); 462 } 463 } 464 } 465 } 466 467 @Override 468 public void stop(boolean delete) { 469 if (startTime == 0) { 470 throw new RuntimeException("Not started"); 471 } 472 473 Connection connection = null; 474 PreparedStatement ps = null; 475 try { 476 connection = binaryManager.getConnection(); 477 // stats 478 logSQL(binaryManager.gcStatsSql, Boolean.TRUE); 479 ps = connection.prepareStatement(binaryManager.gcStatsSql); 480 ps.setBoolean(1, true); // marked 481 ResultSet rs = ps.executeQuery(); 482 rs.next(); 483 status.numBinaries = rs.getLong(1); 484 status.sizeBinaries = rs.getLong(2); 485 logSQL(" -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries)); 486 logSQL(binaryManager.gcStatsSql, Boolean.FALSE); 487 ps.setBoolean(1, false); // unmarked 488 rs = ps.executeQuery(); 489 rs.next(); 490 status.numBinariesGC = rs.getLong(1); 491 status.sizeBinariesGC = rs.getLong(2); 492 logSQL(" -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC)); 493 if (delete) { 494 // sweep 495 ps.close(); 496 logSQL(binaryManager.gcSweepSql, Boolean.FALSE); 497 ps = connection.prepareStatement(binaryManager.gcSweepSql); 498 ps.setBoolean(1, false); // sweep unmarked 499 int n = ps.executeUpdate(); 500 logSQL(" -> ? rows", Long.valueOf(n)); 501 } 502 } catch (SQLException e) { 503 throw new RuntimeException(e); 504 } finally { 505 if (ps != null) { 506 try { 507 ps.close(); 508 } catch (SQLException e) { 509 log.error(e, e); 510 } 511 } 512 if (connection != null) { 513 try { 514 connection.close(); 515 } catch (SQLException e) { 516 log.error(e, e); 517 } 518 } 519 } 520 521 status.gcDuration = System.currentTimeMillis() - startTime; 522 startTime = 0; 523 } 524 } 525 526}