001/*
002 * (C) Copyright 2010-2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.storage.sql;
018
019import java.io.File;
020import java.io.FileInputStream;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.io.Serializable;
026import java.sql.Connection;
027import java.sql.PreparedStatement;
028import java.sql.ResultSet;
029import java.sql.SQLException;
030import java.util.Map;
031
032import javax.naming.NamingException;
033import javax.sql.DataSource;
034
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.lang.StringUtils;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.blob.binary.Binary;
040import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
041import org.nuxeo.ecm.core.blob.binary.BinaryManager;
042import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
043import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
044import org.nuxeo.ecm.core.blob.binary.FileStorage;
045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
047import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
048import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
049import org.nuxeo.runtime.datasource.ConnectionHelper;
050import org.nuxeo.runtime.datasource.DataSourceHelper;
051
052/**
053 * A Binary Manager that stores binaries as SQL BLOBs.
054 * <p>
055 * The BLOBs are cached locally on first access for efficiency.
056 * <p>
057 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
058 * if accessed before the stream.
059 */
060public class SQLBinaryManager extends CachingBinaryManager {
061
062    private static final Log log = LogFactory.getLog(SQLBinaryManager.class);
063
064    public static final String DS_PROP = "datasource";
065
066    public static final String DS_PREFIX = "datasource=";
067
068    public static final String TABLE_PROP = "table";
069
070    public static final String TABLE_PREFIX = "table=";
071
072    public static final String CACHE_SIZE_PROP = "cacheSize";
073
074    public static final String CACHE_SIZE_PREFIX = "cachesize=";
075
076    public static final String DEFAULT_CACHE_SIZE = "10M";
077
078    public static final String COL_ID = "id";
079
080    public static final String COL_BIN = "bin";
081
082    public static final String COL_MARK = "mark"; // for mark & sweep GC
083
084    protected String dataSourceName;
085
086    protected String checkSql;
087
088    protected String putSql;
089
090    protected String getSql;
091
092    protected String getLengthSql;
093
094    protected String gcStartSql;
095
096    protected String gcMarkSql;
097
098    protected String gcStatsSql;
099
100    protected String gcSweepSql;
101
102    protected static boolean disableCheckExisting; // for unit tests
103
104    protected static boolean resetCache; // for unit tests
105
106    @Override
107    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
108        super.initialize(blobProviderId, properties);
109
110        dataSourceName = null;
111        String tableName = null;
112        String cacheSizeStr = null;
113        String key = properties.get(BinaryManager.PROP_KEY);
114        key = StringUtils.defaultIfBlank(key, "");
115        for (String part : key.split(",")) {
116            if (part.startsWith(DS_PREFIX)) {
117                dataSourceName = part.substring(DS_PREFIX.length()).trim();
118            }
119            if (part.startsWith(TABLE_PREFIX)) {
120                tableName = part.substring(TABLE_PREFIX.length()).trim();
121            }
122            if (part.startsWith(CACHE_SIZE_PREFIX)) {
123                cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim();
124            }
125        }
126        if (StringUtils.isBlank(dataSourceName)) {
127            dataSourceName = properties.get(DS_PROP);
128            if (StringUtils.isBlank(dataSourceName)) {
129                throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration");
130            }
131        }
132        if (StringUtils.isBlank(tableName)) {
133            tableName = properties.get(TABLE_PROP);
134            if (StringUtils.isBlank(tableName)) {
135                throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration");
136            }
137        }
138        if (StringUtils.isBlank(cacheSizeStr)) {
139            cacheSizeStr = properties.get(CACHE_SIZE_PROP);
140            if (StringUtils.isBlank(cacheSizeStr)) {
141                cacheSizeStr = DEFAULT_CACHE_SIZE;
142            }
143        }
144
145        // create the SQL statements used
146        createSql(tableName);
147
148        // create file cache
149        initializeCache(cacheSizeStr, new SQLFileStorage());
150        createGarbageCollector();
151    }
152
153    protected void createGarbageCollector() {
154        garbageCollector = new SQLBinaryGarbageCollector(this);
155    }
156
157    protected void createSql(String tableName) throws IOException {
158        Dialect dialect = getDialect();
159        Database database = new Database(dialect);
160        Table table = database.addTable(tableName);
161        ColumnType dummytype = ColumnType.STRING;
162        Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null);
163        Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null);
164        Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null);
165
166        checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName());
167        putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(),
168                idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName());
169        getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(),
170                idCol.getQuotedName());
171        getLengthSql = String.format("SELECT %s(%s) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
172                binCol.getQuotedName(), table.getQuotedName(), idCol.getQuotedName());
173
174        gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName());
175        gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(),
176                idCol.getQuotedName());
177        gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
178                binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName());
179        gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName());
180    }
181
182    protected Dialect getDialect() throws IOException {
183        Connection connection = null;
184        try {
185            connection = getConnection();
186            return Dialect.createDialect(connection, null);
187        } catch (SQLException e) {
188            throw new IOException(e);
189        } finally {
190            if (connection != null) {
191                try {
192                    connection.close();
193                } catch (SQLException e) {
194                    log.error(e, e);
195                }
196            }
197        }
198    }
199
200    protected Connection getConnection() throws SQLException {
201        return ConnectionHelper.getConnection(dataSourceName);
202    }
203
204    protected static void logSQL(String sql, Serializable... values) {
205        if (!log.isTraceEnabled()) {
206            return;
207        }
208        StringBuilder buf = new StringBuilder();
209        int start = 0;
210        for (Serializable v : values) {
211            int index = sql.indexOf('?', start);
212            if (index == -1) {
213                // mismatch between number of ? and number of values
214                break;
215            }
216            buf.append(sql, start, index);
217            buf.append(loggedValue(v));
218            start = index + 1;
219        }
220        buf.append(sql, start, sql.length());
221        log.trace("(bin) SQL: " + buf.toString());
222    }
223
224    protected static String loggedValue(Serializable value) {
225        if (value == null) {
226            return "NULL";
227        }
228        if (value instanceof String) {
229            String v = (String) value;
230            return "'" + v.replace("'", "''") + "'";
231        }
232        return value.toString();
233    }
234
235    protected static boolean isDuplicateKeyException(SQLException e) {
236        String sqlState = e.getSQLState();
237        if ("23000".equals(sqlState)) {
238            // MySQL: Duplicate entry ... for key ...
239            // Oracle: unique constraint ... violated
240            // SQL Server: Violation of PRIMARY KEY constraint
241            return true;
242        }
243        if ("23001".equals(sqlState)) {
244            // H2: Unique index or primary key violation
245            return true;
246        }
247        if ("23505".equals(sqlState)) {
248            // H2: Unique index or primary key violation
249            // PostgreSQL: duplicate key value violates unique constraint
250            return true;
251        }
252        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
253            // SQL Server: Snapshot isolation transaction aborted due to update
254            // conflict
255            return true;
256        }
257        return false;
258    }
259
260    @Override
261    public Binary getBinary(String digest) {
262        if (resetCache) {
263            // for unit tests
264            resetCache = false;
265            fileCache.clear();
266        }
267        return super.getBinary(digest);
268    }
269
270    @Override
271    public Binary getBinary(InputStream in) throws IOException {
272        if (resetCache) {
273            // for unit tests
274            resetCache = false;
275            fileCache.clear();
276        }
277        return super.getBinary(in);
278    }
279
280    public class SQLFileStorage implements FileStorage {
281
282        @Override
283        public void storeFile(String digest, File file) throws IOException {
284            Connection connection = null;
285            try {
286                connection = getConnection();
287                boolean existing;
288                if (disableCheckExisting) {
289                    // for unit tests
290                    existing = false;
291                } else {
292                    logSQL(checkSql, digest);
293                    PreparedStatement ps = connection.prepareStatement(checkSql);
294                    ps.setString(1, digest);
295                    ResultSet rs = ps.executeQuery();
296                    existing = rs.next();
297                    ps.close();
298                }
299                if (!existing) {
300                    // insert new blob
301                    logSQL(putSql, digest, "somebinary", Boolean.TRUE);
302                    PreparedStatement ps = connection.prepareStatement(putSql);
303                    ps.setString(1, digest);
304                    // needs dbcp 1.4:
305                    // ps.setBlob(2, new FileInputStream(file), file.length());
306                    FileInputStream tmpis = new FileInputStream(file);
307                    try {
308                        ps.setBinaryStream(2, tmpis, (int) file.length());
309                        ps.setBoolean(3, true); // mark new additions for GC
310                        try {
311                            ps.execute();
312                        } catch (SQLException e) {
313                            if (!isDuplicateKeyException(e)) {
314                                throw e;
315                            }
316                        }
317                    } finally {
318                        IOUtils.closeQuietly(tmpis);
319                    }
320                    ps.close();
321                }
322            } catch (SQLException e) {
323                throw new IOException(e);
324            } finally {
325                if (connection != null) {
326                    try {
327                        connection.close();
328                    } catch (SQLException e) {
329                        log.error(e, e);
330                    }
331                }
332            }
333        }
334
335        @Override
336        public boolean fetchFile(String digest, File tmp) throws IOException {
337            Connection connection = null;
338            try {
339                connection = getConnection();
340                logSQL(getSql, digest);
341                PreparedStatement ps = connection.prepareStatement(getSql);
342                ps.setString(1, digest);
343                ResultSet rs = ps.executeQuery();
344                if (!rs.next()) {
345                    log.error("Unknown binary: " + digest);
346                    return false;
347                }
348                InputStream in = rs.getBinaryStream(1);
349                OutputStream out = null;
350                try {
351                    if (in == null) {
352                        log.error("Missing binary: " + digest);
353                        return false;
354                    }
355                    // store in file
356                    out = new FileOutputStream(tmp);
357                    IOUtils.copy(in, out);
358                } finally {
359                    IOUtils.closeQuietly(in);
360                    IOUtils.closeQuietly(out);
361                }
362                return true;
363            } catch (SQLException e) {
364                throw new IOException(e);
365            } finally {
366                if (connection != null) {
367                    try {
368                        connection.close();
369                    } catch (SQLException e) {
370                        log.error(e, e);
371                    }
372                }
373            }
374        }
375
376        @Override
377        public Long fetchLength(String digest) throws IOException {
378            Connection connection = null;
379            try {
380                connection = getConnection();
381                logSQL(getLengthSql, digest);
382                PreparedStatement ps = connection.prepareStatement(getLengthSql);
383                ps.setString(1, digest);
384                ResultSet rs = ps.executeQuery();
385                if (!rs.next()) {
386                    log.error("Unknown binary: " + digest);
387                    return null;
388                }
389                return Long.valueOf(rs.getLong(1));
390            } catch (SQLException e) {
391                throw new IOException(e);
392            } finally {
393                if (connection != null) {
394                    try {
395                        connection.close();
396                    } catch (SQLException e) {
397                        log.error(e, e);
398                    }
399                }
400            }
401        }
402    }
403
404    public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector {
405
406        protected final SQLBinaryManager binaryManager;
407
408        protected volatile long startTime;
409
410        protected BinaryManagerStatus status;
411
412        public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) {
413            this.binaryManager = binaryManager;
414        }
415
416        @Override
417        public String getId() {
418            return "datasource:" + binaryManager.dataSourceName;
419        }
420
421        @Override
422        public BinaryManagerStatus getStatus() {
423            return status;
424        }
425
426        @Override
427        public boolean isInProgress() {
428            // volatile as this is designed to be called from another thread
429            return startTime != 0;
430        }
431
432        @Override
433        public void start() {
434            if (startTime != 0) {
435                throw new RuntimeException("Already started");
436            }
437            startTime = System.currentTimeMillis();
438            status = new BinaryManagerStatus();
439
440            Connection connection = null;
441            PreparedStatement ps = null;
442            try {
443                connection = binaryManager.getConnection();
444                logSQL(binaryManager.gcStartSql, Boolean.FALSE);
445                ps = connection.prepareStatement(binaryManager.gcStartSql);
446                ps.setBoolean(1, false); // clear marks
447                int n = ps.executeUpdate();
448                logSQL("  -> ? rows", Long.valueOf(n));
449            } catch (SQLException e) {
450                throw new RuntimeException(e);
451            } finally {
452                if (ps != null) {
453                    try {
454                        ps.close();
455                    } catch (SQLException e) {
456                        log.error(e, e);
457                    }
458                }
459                if (connection != null) {
460                    try {
461                        connection.close();
462                    } catch (SQLException e) {
463                        log.error(e, e);
464                    }
465                }
466            }
467        }
468
469        @Override
470        public void mark(String digest) {
471            Connection connection = null;
472            PreparedStatement ps = null;
473            try {
474                connection = binaryManager.getConnection();
475                logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest);
476                ps = connection.prepareStatement(binaryManager.gcMarkSql);
477                ps.setBoolean(1, true); // mark
478                ps.setString(2, digest);
479                ps.execute();
480            } catch (SQLException e) {
481                throw new RuntimeException(e);
482            } finally {
483                if (ps != null) {
484                    try {
485                        ps.close();
486                    } catch (SQLException e) {
487                        log.error(e, e);
488                    }
489                }
490                if (connection != null) {
491                    try {
492                        connection.close();
493                    } catch (SQLException e) {
494                        log.error(e, e);
495                    }
496                }
497            }
498        }
499
500        @Override
501        public void stop(boolean delete) {
502            if (startTime == 0) {
503                throw new RuntimeException("Not started");
504            }
505
506            Connection connection = null;
507            PreparedStatement ps = null;
508            try {
509                connection = binaryManager.getConnection();
510                // stats
511                logSQL(binaryManager.gcStatsSql, Boolean.TRUE);
512                ps = connection.prepareStatement(binaryManager.gcStatsSql);
513                ps.setBoolean(1, true); // marked
514                ResultSet rs = ps.executeQuery();
515                rs.next();
516                status.numBinaries = rs.getLong(1);
517                status.sizeBinaries = rs.getLong(2);
518                logSQL("  -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries));
519                logSQL(binaryManager.gcStatsSql, Boolean.FALSE);
520                ps.setBoolean(1, false); // unmarked
521                rs = ps.executeQuery();
522                rs.next();
523                status.numBinariesGC = rs.getLong(1);
524                status.sizeBinariesGC = rs.getLong(2);
525                logSQL("  -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC));
526                if (delete) {
527                    // sweep
528                    ps.close();
529                    logSQL(binaryManager.gcSweepSql, Boolean.FALSE);
530                    ps = connection.prepareStatement(binaryManager.gcSweepSql);
531                    ps.setBoolean(1, false); // sweep unmarked
532                    int n = ps.executeUpdate();
533                    logSQL("  -> ? rows", Long.valueOf(n));
534                }
535            } catch (SQLException e) {
536                throw new RuntimeException(e);
537            } finally {
538                if (ps != null) {
539                    try {
540                        ps.close();
541                    } catch (SQLException e) {
542                        log.error(e, e);
543                    }
544                }
545                if (connection != null) {
546                    try {
547                        connection.close();
548                    } catch (SQLException e) {
549                        log.error(e, e);
550                    }
551                }
552            }
553
554            status.gcDuration = System.currentTimeMillis() - startTime;
555            startTime = 0;
556        }
557    }
558
559}