001/*
002 * (C) Copyright 2010-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.io.Serializable;
028import java.sql.Connection;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.util.Map;
033
034import javax.naming.NamingException;
035import javax.sql.DataSource;
036
037import org.apache.commons.io.IOUtils;
038import org.apache.commons.lang.StringUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.blob.binary.Binary;
042import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
043import org.nuxeo.ecm.core.blob.binary.BinaryManager;
044import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
045import org.nuxeo.ecm.core.blob.binary.CachingBinaryManager;
046import org.nuxeo.ecm.core.blob.binary.FileStorage;
047import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
048import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
049import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
050import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
051import org.nuxeo.runtime.datasource.ConnectionHelper;
052import org.nuxeo.runtime.datasource.DataSourceHelper;
053
054/**
055 * A Binary Manager that stores binaries as SQL BLOBs.
056 * <p>
057 * The BLOBs are cached locally on first access for efficiency.
058 * <p>
059 * Because the BLOB length can be accessed independently of the binary stream, it is also cached in a simple text file
060 * if accessed before the stream.
061 */
062public class SQLBinaryManager extends CachingBinaryManager {
063
064    private static final Log log = LogFactory.getLog(SQLBinaryManager.class);
065
066    public static final String DS_PROP = "datasource";
067
068    public static final String DS_PREFIX = "datasource=";
069
070    public static final String TABLE_PROP = "table";
071
072    public static final String TABLE_PREFIX = "table=";
073
074    public static final String CACHE_SIZE_PROP = "cacheSize";
075
076    public static final String CACHE_SIZE_PREFIX = "cachesize=";
077
078    public static final String DEFAULT_CACHE_SIZE = "10M";
079
080    public static final String COL_ID = "id";
081
082    public static final String COL_BIN = "bin";
083
084    public static final String COL_MARK = "mark"; // for mark & sweep GC
085
086    protected String dataSourceName;
087
088    protected String checkSql;
089
090    protected String putSql;
091
092    protected String getSql;
093
094    protected String getLengthSql;
095
096    protected String gcStartSql;
097
098    protected String gcMarkSql;
099
100    protected String gcStatsSql;
101
102    protected String gcSweepSql;
103
104    protected static boolean disableCheckExisting; // for unit tests
105
106    protected static boolean resetCache; // for unit tests
107
108    @Override
109    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
110        super.initialize(blobProviderId, properties);
111
112        dataSourceName = null;
113        String tableName = null;
114        String cacheSizeStr = null;
115        String key = properties.get(BinaryManager.PROP_KEY);
116        key = StringUtils.defaultIfBlank(key, "");
117        for (String part : key.split(",")) {
118            if (part.startsWith(DS_PREFIX)) {
119                dataSourceName = part.substring(DS_PREFIX.length()).trim();
120            }
121            if (part.startsWith(TABLE_PREFIX)) {
122                tableName = part.substring(TABLE_PREFIX.length()).trim();
123            }
124            if (part.startsWith(CACHE_SIZE_PREFIX)) {
125                cacheSizeStr = part.substring(CACHE_SIZE_PREFIX.length()).trim();
126            }
127        }
128        if (StringUtils.isBlank(dataSourceName)) {
129            dataSourceName = properties.get(DS_PROP);
130            if (StringUtils.isBlank(dataSourceName)) {
131                throw new RuntimeException("Missing " + DS_PROP + " in binaryManager configuration");
132            }
133        }
134        if (StringUtils.isBlank(tableName)) {
135            tableName = properties.get(TABLE_PROP);
136            if (StringUtils.isBlank(tableName)) {
137                throw new RuntimeException("Missing " + TABLE_PROP + " in binaryManager configuration");
138            }
139        }
140        if (StringUtils.isBlank(cacheSizeStr)) {
141            cacheSizeStr = properties.get(CACHE_SIZE_PROP);
142            if (StringUtils.isBlank(cacheSizeStr)) {
143                cacheSizeStr = DEFAULT_CACHE_SIZE;
144            }
145        }
146
147        // create the SQL statements used
148        createSql(tableName);
149
150        // create file cache
151        initializeCache(cacheSizeStr, new SQLFileStorage());
152        createGarbageCollector();
153    }
154
155    protected void createGarbageCollector() {
156        garbageCollector = new SQLBinaryGarbageCollector(this);
157    }
158
159    protected void createSql(String tableName) throws IOException {
160        Dialect dialect = getDialect();
161        Database database = new Database(dialect);
162        Table table = database.addTable(tableName);
163        ColumnType dummytype = ColumnType.STRING;
164        Column idCol = table.addColumn(COL_ID, dummytype, COL_ID, null);
165        Column binCol = table.addColumn(COL_BIN, dummytype, COL_BIN, null);
166        Column markCol = table.addColumn(COL_MARK, dummytype, COL_MARK, null);
167
168        checkSql = String.format("SELECT 1 FROM %s WHERE %s = ?", table.getQuotedName(), idCol.getQuotedName());
169        putSql = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)", table.getQuotedName(),
170                idCol.getQuotedName(), binCol.getQuotedName(), markCol.getQuotedName());
171        getSql = String.format("SELECT %s FROM %s WHERE %s = ?", binCol.getQuotedName(), table.getQuotedName(),
172                idCol.getQuotedName());
173        getLengthSql = String.format("SELECT %s(%s) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
174                binCol.getQuotedName(), table.getQuotedName(), idCol.getQuotedName());
175
176        gcStartSql = String.format("UPDATE %s SET %s = ?", table.getQuotedName(), markCol.getQuotedName());
177        gcMarkSql = String.format("UPDATE %s SET %s = ? WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName(),
178                idCol.getQuotedName());
179        gcStatsSql = String.format("SELECT COUNT(*), SUM(%s(%s)) FROM %s WHERE %s = ?", dialect.getBlobLengthFunction(),
180                binCol.getQuotedName(), table.getQuotedName(), markCol.getQuotedName());
181        gcSweepSql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), markCol.getQuotedName());
182    }
183
184    protected Dialect getDialect() throws IOException {
185        Connection connection = null;
186        try {
187            connection = getConnection();
188            return Dialect.createDialect(connection, null);
189        } catch (SQLException e) {
190            throw new IOException(e);
191        } finally {
192            if (connection != null) {
193                try {
194                    connection.close();
195                } catch (SQLException e) {
196                    log.error(e, e);
197                }
198            }
199        }
200    }
201
202    protected Connection getConnection() throws SQLException {
203        return ConnectionHelper.getConnection(dataSourceName);
204    }
205
206    protected static void logSQL(String sql, Serializable... values) {
207        if (!log.isTraceEnabled()) {
208            return;
209        }
210        StringBuilder buf = new StringBuilder();
211        int start = 0;
212        for (Serializable v : values) {
213            int index = sql.indexOf('?', start);
214            if (index == -1) {
215                // mismatch between number of ? and number of values
216                break;
217            }
218            buf.append(sql, start, index);
219            buf.append(loggedValue(v));
220            start = index + 1;
221        }
222        buf.append(sql, start, sql.length());
223        log.trace("(bin) SQL: " + buf.toString());
224    }
225
226    protected static String loggedValue(Serializable value) {
227        if (value == null) {
228            return "NULL";
229        }
230        if (value instanceof String) {
231            String v = (String) value;
232            return "'" + v.replace("'", "''") + "'";
233        }
234        return value.toString();
235    }
236
237    protected static boolean isDuplicateKeyException(SQLException e) {
238        String sqlState = e.getSQLState();
239        if ("23000".equals(sqlState)) {
240            // MySQL: Duplicate entry ... for key ...
241            // Oracle: unique constraint ... violated
242            // SQL Server: Violation of PRIMARY KEY constraint
243            return true;
244        }
245        if ("23001".equals(sqlState)) {
246            // H2: Unique index or primary key violation
247            return true;
248        }
249        if ("23505".equals(sqlState)) {
250            // H2: Unique index or primary key violation
251            // PostgreSQL: duplicate key value violates unique constraint
252            return true;
253        }
254        if ("S0003".equals(sqlState) || "S0005".equals(sqlState)) {
255            // SQL Server: Snapshot isolation transaction aborted due to update
256            // conflict
257            return true;
258        }
259        return false;
260    }
261
262    @Override
263    public Binary getBinary(String digest) {
264        if (resetCache) {
265            // for unit tests
266            resetCache = false;
267            fileCache.clear();
268        }
269        return super.getBinary(digest);
270    }
271
272    @Override
273    public Binary getBinary(InputStream in) throws IOException {
274        if (resetCache) {
275            // for unit tests
276            resetCache = false;
277            fileCache.clear();
278        }
279        return super.getBinary(in);
280    }
281
282    public class SQLFileStorage implements FileStorage {
283
284        @Override
285        public void storeFile(String digest, File file) throws IOException {
286            Connection connection = null;
287            try {
288                connection = getConnection();
289                boolean existing;
290                if (disableCheckExisting) {
291                    // for unit tests
292                    existing = false;
293                } else {
294                    logSQL(checkSql, digest);
295                    PreparedStatement ps = connection.prepareStatement(checkSql);
296                    ps.setString(1, digest);
297                    ResultSet rs = ps.executeQuery();
298                    existing = rs.next();
299                    ps.close();
300                }
301                if (!existing) {
302                    // insert new blob
303                    logSQL(putSql, digest, "somebinary", Boolean.TRUE);
304                    PreparedStatement ps = connection.prepareStatement(putSql);
305                    ps.setString(1, digest);
306                    // needs dbcp 1.4:
307                    // ps.setBlob(2, new FileInputStream(file), file.length());
308                    FileInputStream tmpis = new FileInputStream(file);
309                    try {
310                        ps.setBinaryStream(2, tmpis, (int) file.length());
311                        ps.setBoolean(3, true); // mark new additions for GC
312                        try {
313                            ps.execute();
314                        } catch (SQLException e) {
315                            if (!isDuplicateKeyException(e)) {
316                                throw e;
317                            }
318                        }
319                    } finally {
320                        IOUtils.closeQuietly(tmpis);
321                    }
322                    ps.close();
323                }
324            } catch (SQLException e) {
325                throw new IOException(e);
326            } finally {
327                if (connection != null) {
328                    try {
329                        connection.close();
330                    } catch (SQLException e) {
331                        log.error(e, e);
332                    }
333                }
334            }
335        }
336
337        @Override
338        public boolean fetchFile(String digest, File tmp) throws IOException {
339            Connection connection = null;
340            try {
341                connection = getConnection();
342                logSQL(getSql, digest);
343                PreparedStatement ps = connection.prepareStatement(getSql);
344                ps.setString(1, digest);
345                ResultSet rs = ps.executeQuery();
346                if (!rs.next()) {
347                    log.error("Unknown binary: " + digest);
348                    return false;
349                }
350                InputStream in = rs.getBinaryStream(1);
351                OutputStream out = null;
352                try {
353                    if (in == null) {
354                        log.error("Missing binary: " + digest);
355                        return false;
356                    }
357                    // store in file
358                    out = new FileOutputStream(tmp);
359                    IOUtils.copy(in, out);
360                } finally {
361                    IOUtils.closeQuietly(in);
362                    IOUtils.closeQuietly(out);
363                }
364                return true;
365            } catch (SQLException e) {
366                throw new IOException(e);
367            } finally {
368                if (connection != null) {
369                    try {
370                        connection.close();
371                    } catch (SQLException e) {
372                        log.error(e, e);
373                    }
374                }
375            }
376        }
377
378        @Override
379        public Long fetchLength(String digest) throws IOException {
380            Connection connection = null;
381            try {
382                connection = getConnection();
383                logSQL(getLengthSql, digest);
384                PreparedStatement ps = connection.prepareStatement(getLengthSql);
385                ps.setString(1, digest);
386                ResultSet rs = ps.executeQuery();
387                if (!rs.next()) {
388                    log.error("Unknown binary: " + digest);
389                    return null;
390                }
391                return Long.valueOf(rs.getLong(1));
392            } catch (SQLException e) {
393                throw new IOException(e);
394            } finally {
395                if (connection != null) {
396                    try {
397                        connection.close();
398                    } catch (SQLException e) {
399                        log.error(e, e);
400                    }
401                }
402            }
403        }
404    }
405
406    public static class SQLBinaryGarbageCollector implements BinaryGarbageCollector {
407
408        protected final SQLBinaryManager binaryManager;
409
410        protected volatile long startTime;
411
412        protected BinaryManagerStatus status;
413
414        public SQLBinaryGarbageCollector(SQLBinaryManager binaryManager) {
415            this.binaryManager = binaryManager;
416        }
417
418        @Override
419        public String getId() {
420            return "datasource:" + binaryManager.dataSourceName;
421        }
422
423        @Override
424        public BinaryManagerStatus getStatus() {
425            return status;
426        }
427
428        @Override
429        public boolean isInProgress() {
430            // volatile as this is designed to be called from another thread
431            return startTime != 0;
432        }
433
434        @Override
435        public void start() {
436            if (startTime != 0) {
437                throw new RuntimeException("Already started");
438            }
439            startTime = System.currentTimeMillis();
440            status = new BinaryManagerStatus();
441
442            Connection connection = null;
443            PreparedStatement ps = null;
444            try {
445                connection = binaryManager.getConnection();
446                logSQL(binaryManager.gcStartSql, Boolean.FALSE);
447                ps = connection.prepareStatement(binaryManager.gcStartSql);
448                ps.setBoolean(1, false); // clear marks
449                int n = ps.executeUpdate();
450                logSQL("  -> ? rows", Long.valueOf(n));
451            } catch (SQLException e) {
452                throw new RuntimeException(e);
453            } finally {
454                if (ps != null) {
455                    try {
456                        ps.close();
457                    } catch (SQLException e) {
458                        log.error(e, e);
459                    }
460                }
461                if (connection != null) {
462                    try {
463                        connection.close();
464                    } catch (SQLException e) {
465                        log.error(e, e);
466                    }
467                }
468            }
469        }
470
471        @Override
472        public void mark(String digest) {
473            Connection connection = null;
474            PreparedStatement ps = null;
475            try {
476                connection = binaryManager.getConnection();
477                logSQL(binaryManager.gcMarkSql, Boolean.TRUE, digest);
478                ps = connection.prepareStatement(binaryManager.gcMarkSql);
479                ps.setBoolean(1, true); // mark
480                ps.setString(2, digest);
481                ps.execute();
482            } catch (SQLException e) {
483                throw new RuntimeException(e);
484            } finally {
485                if (ps != null) {
486                    try {
487                        ps.close();
488                    } catch (SQLException e) {
489                        log.error(e, e);
490                    }
491                }
492                if (connection != null) {
493                    try {
494                        connection.close();
495                    } catch (SQLException e) {
496                        log.error(e, e);
497                    }
498                }
499            }
500        }
501
502        @Override
503        public void stop(boolean delete) {
504            if (startTime == 0) {
505                throw new RuntimeException("Not started");
506            }
507
508            Connection connection = null;
509            PreparedStatement ps = null;
510            try {
511                connection = binaryManager.getConnection();
512                // stats
513                logSQL(binaryManager.gcStatsSql, Boolean.TRUE);
514                ps = connection.prepareStatement(binaryManager.gcStatsSql);
515                ps.setBoolean(1, true); // marked
516                ResultSet rs = ps.executeQuery();
517                rs.next();
518                status.numBinaries = rs.getLong(1);
519                status.sizeBinaries = rs.getLong(2);
520                logSQL("  -> ?, ?", Long.valueOf(status.numBinaries), Long.valueOf(status.sizeBinaries));
521                logSQL(binaryManager.gcStatsSql, Boolean.FALSE);
522                ps.setBoolean(1, false); // unmarked
523                rs = ps.executeQuery();
524                rs.next();
525                status.numBinariesGC = rs.getLong(1);
526                status.sizeBinariesGC = rs.getLong(2);
527                logSQL("  -> ?, ?", Long.valueOf(status.numBinariesGC), Long.valueOf(status.sizeBinariesGC));
528                if (delete) {
529                    // sweep
530                    ps.close();
531                    logSQL(binaryManager.gcSweepSql, Boolean.FALSE);
532                    ps = connection.prepareStatement(binaryManager.gcSweepSql);
533                    ps.setBoolean(1, false); // sweep unmarked
534                    int n = ps.executeUpdate();
535                    logSQL("  -> ? rows", Long.valueOf(n));
536                }
537            } catch (SQLException e) {
538                throw new RuntimeException(e);
539            } finally {
540                if (ps != null) {
541                    try {
542                        ps.close();
543                    } catch (SQLException e) {
544                        log.error(e, e);
545                    }
546                }
547                if (connection != null) {
548                    try {
549                        connection.close();
550                    } catch (SQLException e) {
551                        log.error(e, e);
552                    }
553                }
554            }
555
556            status.gcDuration = System.currentTimeMillis() - startTime;
557            startTime = 0;
558        }
559    }
560
561}