001/*
002 * (C) Copyright 2010-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.io.Serializable;
028import java.sql.Connection;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.util.Map;
033
034import org.apache.commons.io.IOUtils;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.blob.BlobProviderDescriptor;
039import org.nuxeo.ecm.core.blob.binary.Binary;
040import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
041import org.nuxeo.ecm.core.blob.binary.BinaryManager;
042import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
043import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
044import org.nuxeo.ecm.core.blob.binary.FileStorage;
045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
047import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
048import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
049import org.nuxeo.runtime.datasource.ConnectionHelper;
050
051/**
052 * A Binary Manager that stores binaries as SQL BLOBs.
053 * <p>
054 * The BLOBs are cached locally on first access for efficiency.
055 * <p>
056 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
057 * if accessed before the stream.
058 */
059public class SQLBinaryManager extends CachingBinaryManager {
060
061    private static final Log log = LogFactory.getLog(SQLBinaryManager.class);
062
063    public static final String DS_PROP = "datasource";
064
065    public static final String DS_PREFIX = "datasource=";
066
067    public static final String TABLE_PROP = "table";
068
069    public static final String TABLE_PREFIX = "table=";
070
071    public static final String CACHE_SIZE_PROP = "cacheSize";
072
073    public static final String CACHE_SIZE_PREFIX = "cachesize=";
074
075    public static final String DEFAULT_CACHE_SIZE = "10M";
076
077    public static final String COL_ID = "id";
078
079    public static final String COL_BIN = "bin";
080
081    public static final String COL_MARK = "mark"; // for mark & sweep GC
082
083    protected String dataSourceName;
084
085    protected String checkSql;
086
087    protected String putSql;
088
089    protected String getSql;
090
091    protected String gcStartSql;
092
093    protected String gcMarkSql;
094
095    protected String gcStatsSql;
096
097    protected String gcSweepSql;
098
099    protected static boolean disableCheckExisting; // for unit tests
100
101    protected static boolean resetCache; // for unit tests
102
103    @Override
104    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
105        super.initialize(blobProviderId, properties);
106
107        dataSourceName = null;
108        String tableName = null;
109        String cacheSizeStr = null;
110        String key = properties.get(BinaryManager.PROP_KEY);
111        key = StringUtils.defaultIfBlank(key, "");
112        for (String part : key.split(",")) {
113            if (part.startsWith(DS_PREFIX)) {
114                dataSourceName = part.substring(DS_PREFIX.length()).trim();
115            }
116            if (part.startsWith(TABLE_PREFIX)) {
117                tableName = part.substring(TABLE_PREFIX.length()).trim();
118            }
119            if (part.startsWith(CACHE_SIZE_PREFIX)) {
120                cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim();
121            }
122        }
123        if (StringUtils.isBlank(dataSourceName)) {
124            dataSourceName = properties.get(DS_PROP);
125            if (StringUtils.isBlank(dataSourceName)) {
126                throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration");
127            }
128        }
129        if (StringUtils.isBlank(tableName)) {
130            tableName = properties.get(TABLE_PROP);
131            if (StringUtils.isBlank(tableName)) {
132                throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration");
133            }
134        }
135        if (StringUtils.isBlank(cacheSizeStr)) {
136            cacheSizeStr = properties.get(CACHE_SIZE_PROP);
137            if (StringUtils.isBlank(cacheSizeStr)) {
138                cacheSizeStr = DEFAULT_CACHE_SIZE;
139            }
140        }
141        if (StringUtils.isNotBlank(properties.get(BlobProviderDescriptor.NAMESPACE))) {
142            throw new RuntimeException("Namespaces not implemented");
143        }
144
145        // create the SQL statements used
146        createSql(tableName);
147
148        // create file cache
149        initializeCache(cacheSizeStr, new SQLFileStorage());
150        createGarbageCollector();
151    }
152
153    protected void createGarbageCollector() {
154        garbageCollector = new SQLBinaryGarbageCollector(this);
155    }
156
157    protected void createSql(String tableName) throws IOException {
158        Dialect dialect = getDialect();
159        Database database = new Database(dialect);
160        Table table = database.addTable(tableName);
161        ColumnType dummytype = ColumnType.STRING;
162        Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null);
163        Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null);
164        Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null);
165
166        checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName());
167        putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(),
168                idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName());
169        getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(),
170                idCol.getQuotedName());
171
172        gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName());
173        gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(),
174                idCol.getQuotedName());
175        gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
176                binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName());
177        gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName());
178    }
179
180    protected Dialect getDialect() throws IOException {
181        Connection connection = null;
182        try {
183            connection = getConnection();
184            return Dialect.createDialect(connection, null);
185        } catch (SQLException e) {
186            throw new IOException(e);
187        } finally {
188            if (connection != null) {
189                try {
190                    connection.close();
191                } catch (SQLException e) {
192                    log.error(e, e);
193                }
194            }
195        }
196    }
197
198    protected Connection getConnection() throws SQLException {
199        return ConnectionHelper.getConnection(dataSourceName);
200    }
201
202    protected static void logSQL(String sql, Serializable... values) {
203        if (!log.isTraceEnabled()) {
204            return;
205        }
206        StringBuilder buf = new StringBuilder();
207        int start = 0;
208        for (Serializable v : values) {
209            int index = sql.indexOf('?', start);
210            if (index == -1) {
211                // mismatch between number of ? and number of values
212                break;
213            }
214            buf.append(sql, start, index);
215            buf.append(loggedValue(v));
216            start = index + 1;
217        }
218        buf.append(sql, start, sql.length());
219        log.trace("(bin) SQL: " + buf.toString());
220    }
221
222    protected static String loggedValue(Serializable value) {
223        if (value == null) {
224            return "NULL";
225        }
226        if (value instanceof String) {
227            String v = (String) value;
228            return "'" + v.replace("'", "''") + "'";
229        }
230        return value.toString();
231    }
232
233    protected static boolean isDuplicateKeyException(SQLException e) {
234        String sqlState = e.getSQLState();
235        if ("23000".equals(sqlState)) {
236            // MySQL: Duplicate entry ... for key ...
237            // Oracle: unique constraint ... violated
238            // SQL Server: Violation of PRIMARY KEY constraint
239            return true;
240        }
241        if ("23001".equals(sqlState)) {
242            // H2: Unique index or primary key violation
243            return true;
244        }
245        if ("23505".equals(sqlState)) {
246            // H2: Unique index or primary key violation
247            // PostgreSQL: duplicate key value violates unique constraint
248            return true;
249        }
250        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
251            // SQL Server: Snapshot isolation transaction aborted due to update
252            // conflict
253            return true;
254        }
255        return false;
256    }
257
258    @Override
259    public Binary getBinary(String digest) {
260        if (resetCache) {
261            // for unit tests
262            resetCache = false;
263            fileCache.clear();
264        }
265        return super.getBinary(digest);
266    }
267
268    @Override
269    public Binary getBinary(InputStream in) throws IOException {
270        if (resetCache) {
271            // for unit tests
272            resetCache = false;
273            fileCache.clear();
274        }
275        return super.getBinary(in);
276    }
277
278    public class SQLFileStorage implements FileStorage {
279
280        @Override
281        public void storeFile(String digest, File file) throws IOException {
282            Connection connection = null;
283            try {
284                connection = getConnection();
285                boolean existing;
286                if (disableCheckExisting) {
287                    // for unit tests
288                    existing = false;
289                } else {
290                    logSQL(checkSql, digest);
291                    try (PreparedStatement ps = connection.prepareStatement(checkSql)) {
292                        ps.setString(1, digest);
293                        try (ResultSet rs = ps.executeQuery()) {
294                            existing = rs.next();
295                        }
296                    }
297                }
298                if (!existing) {
299                    // insert new blob
300                    logSQL(putSql, digest, "somebinary", Boolean.TRUE);
301                    try (PreparedStatement ps = connection.prepareStatement(putSql)) {
302                        ps.setString(1, digest);
303                        // needs dbcp 1.4:
304                        // ps.setBlob(2, new FileInputStream(file), file.length());
305                        try (FileInputStream tmpis = new FileInputStream(file)) {
306                            ps.setBinaryStream(2, tmpis, (int) file.length());
307                            ps.setBoolean(3, true); // mark new additions for GC
308                            try {
309                                ps.execute();
310                            } catch (SQLException e) {
311                                if (!isDuplicateKeyException(e)) {
312                                    throw e;
313                                }
314                            }
315                        }
316                    }
317                }
318            } catch (SQLException e) {
319                throw new IOException(e);
320            } finally {
321                if (connection != null) {
322                    try {
323                        connection.close();
324                    } catch (SQLException e) {
325                        log.error(e, e);
326                    }
327                }
328            }
329        }
330
331        @Override
332        public boolean fetchFile(String digest, File tmp) throws IOException {
333            Connection connection = null;
334            try {
335                connection = getConnection();
336                logSQL(getSql, digest);
337                try (PreparedStatement ps = connection.prepareStatement(getSql)) {
338                    ps.setString(1, digest);
339                    try (ResultSet rs = ps.executeQuery()) {
340                        if (!rs.next()) {
341                            log.error("Unknown binary: " + digest);
342                            return false;
343                        }
344                        try (InputStream in = rs.getBinaryStream(1)) {
345                            if (in == null) {
346                                log.error("Missing binary: " + digest);
347                                return false;
348                            }
349                            // store in file
350                            try (OutputStream out = new FileOutputStream(tmp)) {
351                                IOUtils.copy(in, out);
352                            }
353                        }
354                    }
355                }
356                return true;
357            } catch (SQLException e) {
358                throw new IOException(e);
359            } finally {
360                if (connection != null) {
361                    try {
362                        connection.close();
363                    } catch (SQLException e) {
364                        log.error(e, e);
365                    }
366                }
367            }
368        }
369    }
370
371    public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector {
372
373        protected final SQLBinaryManager binaryManager;
374
375        protected volatile long startTime;
376
377        protected BinaryManagerStatus status;
378
379        public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) {
380            this.binaryManager = binaryManager;
381        }
382
383        @Override
384        public String getId() {
385            return "datasource:" + binaryManager.dataSourceName;
386        }
387
388        @Override
389        public BinaryManagerStatus getStatus() {
390            return status;
391        }
392
393        @Override
394        public boolean isInProgress() {
395            // volatile as this is designed to be called from another thread
396            return startTime != 0;
397        }
398
399        @Override
400        public void start() {
401            if (startTime != 0) {
402                throw new RuntimeException("Already started");
403            }
404            startTime = System.currentTimeMillis();
405            status = new BinaryManagerStatus();
406
407            Connection connection = null;
408            PreparedStatement ps = null;
409            try {
410                connection = binaryManager.getConnection();
411                logSQL(binaryManager.gcStartSql, Boolean.FALSE);
412                ps = connection.prepareStatement(binaryManager.gcStartSql);
413                ps.setBoolean(1, false); // clear marks
414                int n = ps.executeUpdate();
415                logSQL("  -> ? rows", Long.valueOf(n));
416            } catch (SQLException e) {
417                throw new RuntimeException(e);
418            } finally {
419                if (ps != null) {
420                    try {
421                        ps.close();
422                    } catch (SQLException e) {
423                        log.error(e, e);
424                    }
425                }
426                if (connection != null) {
427                    try {
428                        connection.close();
429                    } catch (SQLException e) {
430                        log.error(e, e);
431                    }
432                }
433            }
434        }
435
436        @Override
437        public void mark(String digest) {
438            Connection connection = null;
439            PreparedStatement ps = null;
440            try {
441                connection = binaryManager.getConnection();
442                logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest);
443                ps = connection.prepareStatement(binaryManager.gcMarkSql);
444                ps.setBoolean(1, true); // mark
445                ps.setString(2, digest);
446                ps.execute();
447            } catch (SQLException e) {
448                throw new RuntimeException(e);
449            } finally {
450                if (ps != null) {
451                    try {
452                        ps.close();
453                    } catch (SQLException e) {
454                        log.error(e, e);
455                    }
456                }
457                if (connection != null) {
458                    try {
459                        connection.close();
460                    } catch (SQLException e) {
461                        log.error(e, e);
462                    }
463                }
464            }
465        }
466
467        @Override
468        public void stop(boolean delete) {
469            if (startTime == 0) {
470                throw new RuntimeException("Not started");
471            }
472
473            Connection connection = null;
474            PreparedStatement ps = null;
475            try {
476                connection = binaryManager.getConnection();
477                // stats
478                logSQL(binaryManager.gcStatsSql, Boolean.TRUE);
479                ps = connection.prepareStatement(binaryManager.gcStatsSql);
480                ps.setBoolean(1, true); // marked
481                try (ResultSet rs = ps.executeQuery()) {
482                    rs.next();
483                    status.numBinaries = rs.getLong(1);
484                    status.sizeBinaries = rs.getLong(2);
485                }
486                logSQL("  -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries));
487                logSQL(binaryManager.gcStatsSql, Boolean.FALSE);
488                ps.setBoolean(1, false); // unmarked
489                try (ResultSet rs = ps.executeQuery()) {
490                    rs.next();
491                    status.numBinariesGC = rs.getLong(1);
492                    status.sizeBinariesGC = rs.getLong(2);
493                }
494                logSQL("  -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC));
495                if (delete) {
496                    // sweep
497                    ps.close();
498                    logSQL(binaryManager.gcSweepSql, Boolean.FALSE);
499                    ps = connection.prepareStatement(binaryManager.gcSweepSql);
500                    ps.setBoolean(1, false); // sweep unmarked
501                    int n = ps.executeUpdate();
502                    logSQL("  -> ? rows", Long.valueOf(n));
503                }
504            } catch (SQLException e) {
505                throw new RuntimeException(e);
506            } finally {
507                if (ps != null) {
508                    try {
509                        ps.close();
510                    } catch (SQLException e) {
511                        log.error(e, e);
512                    }
513                }
514                if (connection != null) {
515                    try {
516                        connection.close();
517                    } catch (SQLException e) {
518                        log.error(e, e);
519                    }
520                }
521            }
522
523            status.gcDuration = System.currentTimeMillis() - startTime;
524            startTime = 0;
525        }
526    }
527
528}