001/*
002 * (C) Copyright 2010-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.io.Serializable;
028import java.sql.Connection;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.util.Map;
033
034import org.apache.commons.io.IOUtils;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.blob.binary.Binary;
039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
040import org.nuxeo.ecm.core.blob.binary.BinaryManager;
041import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
042import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
043import org.nuxeo.ecm.core.blob.binary.FileStorage;
044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
047import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
048import org.nuxeo.runtime.datasource.ConnectionHelper;
049
050/**
051 * A Binary Manager that stores binaries as SQL BLOBs.
052 * <p>
053 * The BLOBs are cached locally on first access for efficiency.
054 * <p>
055 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
056 * if accessed before the stream.
057 */
058public class SQLBinaryManager extends CachingBinaryManager {
059
060    private static final Log log = LogFactory.getLog(SQLBinaryManager.class);
061
062    public static final String DS_PROP = "datasource";
063
064    public static final String DS_PREFIX = "datasource=";
065
066    public static final String TABLE_PROP = "table";
067
068    public static final String TABLE_PREFIX = "table=";
069
070    public static final String CACHE_SIZE_PROP = "cacheSize";
071
072    public static final String CACHE_SIZE_PREFIX = "cachesize=";
073
074    public static final String DEFAULT_CACHE_SIZE = "10M";
075
076    public static final String COL_ID = "id";
077
078    public static final String COL_BIN = "bin";
079
080    public static final String COL_MARK = "mark"; // for mark & sweep GC
081
082    protected String dataSourceName;
083
084    protected String checkSql;
085
086    protected String putSql;
087
088    protected String getSql;
089
090    protected String gcStartSql;
091
092    protected String gcMarkSql;
093
094    protected String gcStatsSql;
095
096    protected String gcSweepSql;
097
098    protected static boolean disableCheckExisting; // for unit tests
099
100    protected static boolean resetCache; // for unit tests
101
102    @Override
103    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
104        super.initialize(blobProviderId, properties);
105
106        dataSourceName = null;
107        String tableName = null;
108        String cacheSizeStr = null;
109        String key = properties.get(BinaryManager.PROP_KEY);
110        key = StringUtils.defaultIfBlank(key, "");
111        for (String part : key.split(",")) {
112            if (part.startsWith(DS_PREFIX)) {
113                dataSourceName = part.substring(DS_PREFIX.length()).trim();
114            }
115            if (part.startsWith(TABLE_PREFIX)) {
116                tableName = part.substring(TABLE_PREFIX.length()).trim();
117            }
118            if (part.startsWith(CACHE_SIZE_PREFIX)) {
119                cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim();
120            }
121        }
122        if (StringUtils.isBlank(dataSourceName)) {
123            dataSourceName = properties.get(DS_PROP);
124            if (StringUtils.isBlank(dataSourceName)) {
125                throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration");
126            }
127        }
128        if (StringUtils.isBlank(tableName)) {
129            tableName = properties.get(TABLE_PROP);
130            if (StringUtils.isBlank(tableName)) {
131                throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration");
132            }
133        }
134        if (StringUtils.isBlank(cacheSizeStr)) {
135            cacheSizeStr = properties.get(CACHE_SIZE_PROP);
136            if (StringUtils.isBlank(cacheSizeStr)) {
137                cacheSizeStr = DEFAULT_CACHE_SIZE;
138            }
139        }
140
141        // create the SQL statements used
142        createSql(tableName);
143
144        // create file cache
145        initializeCache(cacheSizeStr, new SQLFileStorage());
146        createGarbageCollector();
147    }
148
149    protected void createGarbageCollector() {
150        garbageCollector = new SQLBinaryGarbageCollector(this);
151    }
152
153    protected void createSql(String tableName) throws IOException {
154        Dialect dialect = getDialect();
155        Database database = new Database(dialect);
156        Table table = database.addTable(tableName);
157        ColumnType dummytype = ColumnType.STRING;
158        Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null);
159        Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null);
160        Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null);
161
162        checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName());
163        putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(),
164                idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName());
165        getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(),
166                idCol.getQuotedName());
167
168        gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName());
169        gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(),
170                idCol.getQuotedName());
171        gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
172                binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName());
173        gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName());
174    }
175
176    protected Dialect getDialect() throws IOException {
177        Connection connection = null;
178        try {
179            connection = getConnection();
180            return Dialect.createDialect(connection, null);
181        } catch (SQLException e) {
182            throw new IOException(e);
183        } finally {
184            if (connection != null) {
185                try {
186                    connection.close();
187                } catch (SQLException e) {
188                    log.error(e, e);
189                }
190            }
191        }
192    }
193
194    protected Connection getConnection() throws SQLException {
195        return ConnectionHelper.getConnection(dataSourceName);
196    }
197
198    protected static void logSQL(String sql, Serializable... values) {
199        if (!log.isTraceEnabled()) {
200            return;
201        }
202        StringBuilder buf = new StringBuilder();
203        int start = 0;
204        for (Serializable v : values) {
205            int index = sql.indexOf('?', start);
206            if (index == -1) {
207                // mismatch between number of ? and number of values
208                break;
209            }
210            buf.append(sql, start, index);
211            buf.append(loggedValue(v));
212            start = index + 1;
213        }
214        buf.append(sql, start, sql.length());
215        log.trace("(bin) SQL: " + buf.toString());
216    }
217
218    protected static String loggedValue(Serializable value) {
219        if (value == null) {
220            return "NULL";
221        }
222        if (value instanceof String) {
223            String v = (String) value;
224            return "'" + v.replace("'", "''") + "'";
225        }
226        return value.toString();
227    }
228
229    protected static boolean isDuplicateKeyException(SQLException e) {
230        String sqlState = e.getSQLState();
231        if ("23000".equals(sqlState)) {
232            // MySQL: Duplicate entry ... for key ...
233            // Oracle: unique constraint ... violated
234            // SQL Server: Violation of PRIMARY KEY constraint
235            return true;
236        }
237        if ("23001".equals(sqlState)) {
238            // H2: Unique index or primary key violation
239            return true;
240        }
241        if ("23505".equals(sqlState)) {
242            // H2: Unique index or primary key violation
243            // PostgreSQL: duplicate key value violates unique constraint
244            return true;
245        }
246        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
247            // SQL Server: Snapshot isolation transaction aborted due to update
248            // conflict
249            return true;
250        }
251        return false;
252    }
253
254    @Override
255    public Binary getBinary(String digest) {
256        if (resetCache) {
257            // for unit tests
258            resetCache = false;
259            fileCache.clear();
260        }
261        return super.getBinary(digest);
262    }
263
264    @Override
265    public Binary getBinary(InputStream in) throws IOException {
266        if (resetCache) {
267            // for unit tests
268            resetCache = false;
269            fileCache.clear();
270        }
271        return super.getBinary(in);
272    }
273
274    public class SQLFileStorage implements FileStorage {
275
276        @Override
277        public void storeFile(String digest, File file) throws IOException {
278            Connection connection = null;
279            try {
280                connection = getConnection();
281                boolean existing;
282                if (disableCheckExisting) {
283                    // for unit tests
284                    existing = false;
285                } else {
286                    logSQL(checkSql, digest);
287                    try (PreparedStatement ps = connection.prepareStatement(checkSql)) {
288                        ps.setString(1, digest);
289                        try (ResultSet rs = ps.executeQuery()) {
290                            existing = rs.next();
291                        }
292                    }
293                }
294                if (!existing) {
295                    // insert new blob
296                    logSQL(putSql, digest, "somebinary", Boolean.TRUE);
297                    try (PreparedStatement ps = connection.prepareStatement(putSql)) {
298                        ps.setString(1, digest);
299                        // needs dbcp 1.4:
300                        // ps.setBlob(2, new FileInputStream(file), file.length());
301                        try (FileInputStream tmpis = new FileInputStream(file)) {
302                            ps.setBinaryStream(2, tmpis, (int) file.length());
303                            ps.setBoolean(3, true); // mark new additions for GC
304                            try {
305                                ps.execute();
306                            } catch (SQLException e) {
307                                if (!isDuplicateKeyException(e)) {
308                                    throw e;
309                                }
310                            }
311                        }
312                    }
313                }
314            } catch (SQLException e) {
315                throw new IOException(e);
316            } finally {
317                if (connection != null) {
318                    try {
319                        connection.close();
320                    } catch (SQLException e) {
321                        log.error(e, e);
322                    }
323                }
324            }
325        }
326
327        @Override
328        public boolean fetchFile(String digest, File tmp) throws IOException {
329            Connection connection = null;
330            try {
331                connection = getConnection();
332                logSQL(getSql, digest);
333                try (PreparedStatement ps = connection.prepareStatement(getSql)) {
334                    ps.setString(1, digest);
335                    try (ResultSet rs = ps.executeQuery()) {
336                        if (!rs.next()) {
337                            log.error("Unknown binary: " + digest);
338                            return false;
339                        }
340                        try (InputStream in = rs.getBinaryStream(1)) {
341                            if (in == null) {
342                                log.error("Missing binary: " + digest);
343                                return false;
344                            }
345                            // store in file
346                            try (OutputStream out = new FileOutputStream(tmp)) {
347                                IOUtils.copy(in, out);
348                            }
349                        }
350                    }
351                }
352                return true;
353            } catch (SQLException e) {
354                throw new IOException(e);
355            } finally {
356                if (connection != null) {
357                    try {
358                        connection.close();
359                    } catch (SQLException e) {
360                        log.error(e, e);
361                    }
362                }
363            }
364        }
365    }
366
367    public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector {
368
369        protected final SQLBinaryManager binaryManager;
370
371        protected volatile long startTime;
372
373        protected BinaryManagerStatus status;
374
375        public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) {
376            this.binaryManager = binaryManager;
377        }
378
379        @Override
380        public String getId() {
381            return "datasource:" + binaryManager.dataSourceName;
382        }
383
384        @Override
385        public BinaryManagerStatus getStatus() {
386            return status;
387        }
388
389        @Override
390        public boolean isInProgress() {
391            // volatile as this is designed to be called from another thread
392            return startTime != 0;
393        }
394
395        @Override
396        public void start() {
397            if (startTime != 0) {
398                throw new RuntimeException("Already started");
399            }
400            startTime = System.currentTimeMillis();
401            status = new BinaryManagerStatus();
402
403            Connection connection = null;
404            PreparedStatement ps = null;
405            try {
406                connection = binaryManager.getConnection();
407                logSQL(binaryManager.gcStartSql, Boolean.FALSE);
408                ps = connection.prepareStatement(binaryManager.gcStartSql);
409                ps.setBoolean(1, false); // clear marks
410                int n = ps.executeUpdate();
411                logSQL("  -> ? rows", Long.valueOf(n));
412            } catch (SQLException e) {
413                throw new RuntimeException(e);
414            } finally {
415                if (ps != null) {
416                    try {
417                        ps.close();
418                    } catch (SQLException e) {
419                        log.error(e, e);
420                    }
421                }
422                if (connection != null) {
423                    try {
424                        connection.close();
425                    } catch (SQLException e) {
426                        log.error(e, e);
427                    }
428                }
429            }
430        }
431
432        @Override
433        public void mark(String digest) {
434            Connection connection = null;
435            PreparedStatement ps = null;
436            try {
437                connection = binaryManager.getConnection();
438                logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest);
439                ps = connection.prepareStatement(binaryManager.gcMarkSql);
440                ps.setBoolean(1, true); // mark
441                ps.setString(2, digest);
442                ps.execute();
443            } catch (SQLException e) {
444                throw new RuntimeException(e);
445            } finally {
446                if (ps != null) {
447                    try {
448                        ps.close();
449                    } catch (SQLException e) {
450                        log.error(e, e);
451                    }
452                }
453                if (connection != null) {
454                    try {
455                        connection.close();
456                    } catch (SQLException e) {
457                        log.error(e, e);
458                    }
459                }
460            }
461        }
462
463        @Override
464        public void stop(boolean delete) {
465            if (startTime == 0) {
466                throw new RuntimeException("Not started");
467            }
468
469            Connection connection = null;
470            PreparedStatement ps = null;
471            try {
472                connection = binaryManager.getConnection();
473                // stats
474                logSQL(binaryManager.gcStatsSql, Boolean.TRUE);
475                ps = connection.prepareStatement(binaryManager.gcStatsSql);
476                ps.setBoolean(1, true); // marked
477                try (ResultSet rs = ps.executeQuery()) {
478                    rs.next();
479                    status.numBinaries = rs.getLong(1);
480                    status.sizeBinaries = rs.getLong(2);
481                }
482                logSQL("  -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries));
483                logSQL(binaryManager.gcStatsSql, Boolean.FALSE);
484                ps.setBoolean(1, false); // unmarked
485                try (ResultSet rs = ps.executeQuery()) {
486                    rs.next();
487                    status.numBinariesGC = rs.getLong(1);
488                    status.sizeBinariesGC = rs.getLong(2);
489                }
490                logSQL("  -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC));
491                if (delete) {
492                    // sweep
493                    ps.close();
494                    logSQL(binaryManager.gcSweepSql, Boolean.FALSE);
495                    ps = connection.prepareStatement(binaryManager.gcSweepSql);
496                    ps.setBoolean(1, false); // sweep unmarked
497                    int n = ps.executeUpdate();
498                    logSQL("  -> ? rows", Long.valueOf(n));
499                }
500            } catch (SQLException e) {
501                throw new RuntimeException(e);
502            } finally {
503                if (ps != null) {
504                    try {
505                        ps.close();
506                    } catch (SQLException e) {
507                        log.error(e, e);
508                    }
509                }
510                if (connection != null) {
511                    try {
512                        connection.close();
513                    } catch (SQLException e) {
514                        log.error(e, e);
515                    }
516                }
517            }
518
519            status.gcDuration = System.currentTimeMillis() - startTime;
520            startTime = 0;
521        }
522    }
523
524}