001/*
002 * (C) Copyright 2010-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.io.Serializable;
028import java.sql.Connection;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.util.Map;
033
034import org.apache.commons.io.IOUtils;
035import org.apache.commons.lang.StringUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.blob.binary.Binary;
039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
040import org.nuxeo.ecm.core.blob.binary.BinaryManager;
041import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
042import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
043import org.nuxeo.ecm.core.blob.binary.FileStorage;
044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
047import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
048import org.nuxeo.runtime.datasource.ConnectionHelper;
049
050/**
051 * A Binary Manager that stores binaries as SQL BLOBs.
052 * <p>
053 * The BLOBs are cached locally on first access for efficiency.
054 * <p>
055 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
056 * if accessed before the stream.
057 */
058public class SQLBinaryManager extends CachingBinaryManager {
059
060    private static final Log log = LogFactory.getLog(SQLBinaryManager.class);
061
062    public static final String DS_PROP = "datasource";
063
064    public static final String DS_PREFIX = "datasource=";
065
066    public static final String TABLE_PROP = "table";
067
068    public static final String TABLE_PREFIX = "table=";
069
070    public static final String CACHE_SIZE_PROP = "cacheSize";
071
072    public static final String CACHE_SIZE_PREFIX = "cachesize=";
073
074    public static final String DEFAULT_CACHE_SIZE = "10M";
075
076    public static final String COL_ID = "id";
077
078    public static final String COL_BIN = "bin";
079
080    public static final String COL_MARK = "mark"; // for mark & sweep GC
081
082    protected String dataSourceName;
083
084    protected String checkSql;
085
086    protected String putSql;
087
088    protected String getSql;
089
090    protected String gcStartSql;
091
092    protected String gcMarkSql;
093
094    protected String gcStatsSql;
095
096    protected String gcSweepSql;
097
098    protected static boolean disableCheckExisting; // for unit tests
099
100    protected static boolean resetCache; // for unit tests
101
102    @Override
103    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
104        super.initialize(blobProviderId, properties);
105
106        dataSourceName = null;
107        String tableName = null;
108        String cacheSizeStr = null;
109        String key = properties.get(BinaryManager.PROP_KEY);
110        key = StringUtils.defaultIfBlank(key, "");
111        for (String part : key.split(",")) {
112            if (part.startsWith(DS_PREFIX)) {
113                dataSourceName = part.substring(DS_PREFIX.length()).trim();
114            }
115            if (part.startsWith(TABLE_PREFIX)) {
116                tableName = part.substring(TABLE_PREFIX.length()).trim();
117            }
118            if (part.startsWith(CACHE_SIZE_PREFIX)) {
119                cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim();
120            }
121        }
122        if (StringUtils.isBlank(dataSourceName)) {
123            dataSourceName = properties.get(DS_PROP);
124            if (StringUtils.isBlank(dataSourceName)) {
125                throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration");
126            }
127        }
128        if (StringUtils.isBlank(tableName)) {
129            tableName = properties.get(TABLE_PROP);
130            if (StringUtils.isBlank(tableName)) {
131                throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration");
132            }
133        }
134        if (StringUtils.isBlank(cacheSizeStr)) {
135            cacheSizeStr = properties.get(CACHE_SIZE_PROP);
136            if (StringUtils.isBlank(cacheSizeStr)) {
137                cacheSizeStr = DEFAULT_CACHE_SIZE;
138            }
139        }
140
141        // create the SQL statements used
142        createSql(tableName);
143
144        // create file cache
145        initializeCache(cacheSizeStr, new SQLFileStorage());
146        createGarbageCollector();
147    }
148
149    protected void createGarbageCollector() {
150        garbageCollector = new SQLBinaryGarbageCollector(this);
151    }
152
153    protected void createSql(String tableName) throws IOException {
154        Dialect dialect = getDialect();
155        Database database = new Database(dialect);
156        Table table = database.addTable(tableName);
157        ColumnType dummytype = ColumnType.STRING;
158        Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null);
159        Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null);
160        Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null);
161
162        checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName());
163        putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(),
164                idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName());
165        getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(),
166                idCol.getQuotedName());
167
168        gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName());
169        gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(),
170                idCol.getQuotedName());
171        gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
172                binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName());
173        gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName());
174    }
175
176    protected Dialect getDialect() throws IOException {
177        Connection connection = null;
178        try {
179            connection = getConnection();
180            return Dialect.createDialect(connection, null);
181        } catch (SQLException e) {
182            throw new IOException(e);
183        } finally {
184            if (connection != null) {
185                try {
186                    connection.close();
187                } catch (SQLException e) {
188                    log.error(e, e);
189                }
190            }
191        }
192    }
193
194    protected Connection getConnection() throws SQLException {
195        return ConnectionHelper.getConnection(dataSourceName);
196    }
197
198    protected static void logSQL(String sql, Serializable... values) {
199        if (!log.isTraceEnabled()) {
200            return;
201        }
202        StringBuilder buf = new StringBuilder();
203        int start = 0;
204        for (Serializable v : values) {
205            int index = sql.indexOf('?', start);
206            if (index == -1) {
207                // mismatch between number of ? and number of values
208                break;
209            }
210            buf.append(sql, start, index);
211            buf.append(loggedValue(v));
212            start = index + 1;
213        }
214        buf.append(sql, start, sql.length());
215        log.trace("(bin) SQL: " + buf.toString());
216    }
217
218    protected static String loggedValue(Serializable value) {
219        if (value == null) {
220            return "NULL";
221        }
222        if (value instanceof String) {
223            String v = (String) value;
224            return "'" + v.replace("'", "''") + "'";
225        }
226        return value.toString();
227    }
228
229    protected static boolean isDuplicateKeyException(SQLException e) {
230        String sqlState = e.getSQLState();
231        if ("23000".equals(sqlState)) {
232            // MySQL: Duplicate entry ... for key ...
233            // Oracle: unique constraint ... violated
234            // SQL Server: Violation of PRIMARY KEY constraint
235            return true;
236        }
237        if ("23001".equals(sqlState)) {
238            // H2: Unique index or primary key violation
239            return true;
240        }
241        if ("23505".equals(sqlState)) {
242            // H2: Unique index or primary key violation
243            // PostgreSQL: duplicate key value violates unique constraint
244            return true;
245        }
246        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
247            // SQL Server: Snapshot isolation transaction aborted due to update
248            // conflict
249            return true;
250        }
251        return false;
252    }
253
254    @Override
255    public Binary getBinary(String digest) {
256        if (resetCache) {
257            // for unit tests
258            resetCache = false;
259            fileCache.clear();
260        }
261        return super.getBinary(digest);
262    }
263
264    @Override
265    public Binary getBinary(InputStream in) throws IOException {
266        if (resetCache) {
267            // for unit tests
268            resetCache = false;
269            fileCache.clear();
270        }
271        return super.getBinary(in);
272    }
273
274    public class SQLFileStorage implements FileStorage {
275
276        @Override
277        public void storeFile(String digest, File file) throws IOException {
278            Connection connection = null;
279            try {
280                connection = getConnection();
281                boolean existing;
282                if (disableCheckExisting) {
283                    // for unit tests
284                    existing = false;
285                } else {
286                    logSQL(checkSql, digest);
287                    PreparedStatement ps = connection.prepareStatement(checkSql);
288                    ps.setString(1, digest);
289                    ResultSet rs = ps.executeQuery();
290                    existing = rs.next();
291                    ps.close();
292                }
293                if (!existing) {
294                    // insert new blob
295                    logSQL(putSql, digest, "somebinary", Boolean.TRUE);
296                    PreparedStatement ps = connection.prepareStatement(putSql);
297                    ps.setString(1, digest);
298                    // needs dbcp 1.4:
299                    // ps.setBlob(2, new FileInputStream(file), file.length());
300                    FileInputStream tmpis = new FileInputStream(file);
301                    try {
302                        ps.setBinaryStream(2, tmpis, (int) file.length());
303                        ps.setBoolean(3, true); // mark new additions for GC
304                        try {
305                            ps.execute();
306                        } catch (SQLException e) {
307                            if (!isDuplicateKeyException(e)) {
308                                throw e;
309                            }
310                        }
311                    } finally {
312                        IOUtils.closeQuietly(tmpis);
313                    }
314                    ps.close();
315                }
316            } catch (SQLException e) {
317                throw new IOException(e);
318            } finally {
319                if (connection != null) {
320                    try {
321                        connection.close();
322                    } catch (SQLException e) {
323                        log.error(e, e);
324                    }
325                }
326            }
327        }
328
329        @Override
330        public boolean fetchFile(String digest, File tmp) throws IOException {
331            Connection connection = null;
332            try {
333                connection = getConnection();
334                logSQL(getSql, digest);
335                PreparedStatement ps = connection.prepareStatement(getSql);
336                ps.setString(1, digest);
337                ResultSet rs = ps.executeQuery();
338                if (!rs.next()) {
339                    log.error("Unknown binary: " + digest);
340                    return false;
341                }
342                InputStream in = rs.getBinaryStream(1);
343                OutputStream out = null;
344                try {
345                    if (in == null) {
346                        log.error("Missing binary: " + digest);
347                        return false;
348                    }
349                    // store in file
350                    out = new FileOutputStream(tmp);
351                    IOUtils.copy(in, out);
352                } finally {
353                    IOUtils.closeQuietly(in);
354                    IOUtils.closeQuietly(out);
355                }
356                return true;
357            } catch (SQLException e) {
358                throw new IOException(e);
359            } finally {
360                if (connection != null) {
361                    try {
362                        connection.close();
363                    } catch (SQLException e) {
364                        log.error(e, e);
365                    }
366                }
367            }
368        }
369    }
370
371    public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector {
372
373        protected final SQLBinaryManager binaryManager;
374
375        protected volatile long startTime;
376
377        protected BinaryManagerStatus status;
378
379        public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) {
380            this.binaryManager = binaryManager;
381        }
382
383        @Override
384        public String getId() {
385            return "datasource:" + binaryManager.dataSourceName;
386        }
387
388        @Override
389        public BinaryManagerStatus getStatus() {
390            return status;
391        }
392
393        @Override
394        public boolean isInProgress() {
395            // volatile as this is designed to be called from another thread
396            return startTime != 0;
397        }
398
399        @Override
400        public void start() {
401            if (startTime != 0) {
402                throw new RuntimeException("Already started");
403            }
404            startTime = System.currentTimeMillis();
405            status = new BinaryManagerStatus();
406
407            Connection connection = null;
408            PreparedStatement ps = null;
409            try {
410                connection = binaryManager.getConnection();
411                logSQL(binaryManager.gcStartSql, Boolean.FALSE);
412                ps = connection.prepareStatement(binaryManager.gcStartSql);
413                ps.setBoolean(1, false); // clear marks
414                int n = ps.executeUpdate();
415                logSQL("  -> ? rows", Long.valueOf(n));
416            } catch (SQLException e) {
417                throw new RuntimeException(e);
418            } finally {
419                if (ps != null) {
420                    try {
421                        ps.close();
422                    } catch (SQLException e) {
423                        log.error(e, e);
424                    }
425                }
426                if (connection != null) {
427                    try {
428                        connection.close();
429                    } catch (SQLException e) {
430                        log.error(e, e);
431                    }
432                }
433            }
434        }
435
436        @Override
437        public void mark(String digest) {
438            Connection connection = null;
439            PreparedStatement ps = null;
440            try {
441                connection = binaryManager.getConnection();
442                logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest);
443                ps = connection.prepareStatement(binaryManager.gcMarkSql);
444                ps.setBoolean(1, true); // mark
445                ps.setString(2, digest);
446                ps.execute();
447            } catch (SQLException e) {
448                throw new RuntimeException(e);
449            } finally {
450                if (ps != null) {
451                    try {
452                        ps.close();
453                    } catch (SQLException e) {
454                        log.error(e, e);
455                    }
456                }
457                if (connection != null) {
458                    try {
459                        connection.close();
460                    } catch (SQLException e) {
461                        log.error(e, e);
462                    }
463                }
464            }
465        }
466
467        @Override
468        public void stop(boolean delete) {
469            if (startTime == 0) {
470                throw new RuntimeException("Not started");
471            }
472
473            Connection connection = null;
474            PreparedStatement ps = null;
475            try {
476                connection = binaryManager.getConnection();
477                // stats
478                logSQL(binaryManager.gcStatsSql, Boolean.TRUE);
479                ps = connection.prepareStatement(binaryManager.gcStatsSql);
480                ps.setBoolean(1, true); // marked
481                ResultSet rs = ps.executeQuery();
482                rs.next();
483                status.numBinaries = rs.getLong(1);
484                status.sizeBinaries = rs.getLong(2);
485                logSQL("  -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries));
486                logSQL(binaryManager.gcStatsSql, Boolean.FALSE);
487                ps.setBoolean(1, false); // unmarked
488                rs = ps.executeQuery();
489                rs.next();
490                status.numBinariesGC = rs.getLong(1);
491                status.sizeBinariesGC = rs.getLong(2);
492                logSQL("  -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC));
493                if (delete) {
494                    // sweep
495                    ps.close();
496                    logSQL(binaryManager.gcSweepSql, Boolean.FALSE);
497                    ps = connection.prepareStatement(binaryManager.gcSweepSql);
498                    ps.setBoolean(1, false); // sweep unmarked
499                    int n = ps.executeUpdate();
500                    logSQL("  -> ? rows", Long.valueOf(n));
501                }
502            } catch (SQLException e) {
503                throw new RuntimeException(e);
504            } finally {
505                if (ps != null) {
506                    try {
507                        ps.close();
508                    } catch (SQLException e) {
509                        log.error(e, e);
510                    }
511                }
512                if (connection != null) {
513                    try {
514                        connection.close();
515                    } catch (SQLException e) {
516                        log.error(e, e);
517                    }
518                }
519            }
520
521            status.gcDuration = System.currentTimeMillis() - startTime;
522            startTime = 0;
523        }
524    }
525
526}