001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.mongodb;
021
022import static java.lang.Boolean.FALSE;
023import static java.lang.Boolean.TRUE;
024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE;
025
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStream;
030import java.util.List;
031import java.util.Map;
032
033import org.apache.commons.codec.digest.DigestUtils;
034import org.apache.commons.lang.StringUtils;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
038import org.nuxeo.ecm.core.blob.BlobManager;
039import org.nuxeo.ecm.core.blob.BlobProvider;
040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager;
041import org.nuxeo.ecm.core.blob.binary.Binary;
042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
044import org.nuxeo.ecm.core.blob.binary.BinaryManager;
045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
046import org.nuxeo.ecm.core.model.Document;
047
048import com.mongodb.BasicDBObject;
049import com.mongodb.DBObject;
050import com.mongodb.MongoClient;
051import com.mongodb.MongoClientURI;
052import com.mongodb.ServerAddress;
053import com.mongodb.gridfs.GridFS;
054import com.mongodb.gridfs.GridFSDBFile;
055import com.mongodb.gridfs.GridFSInputFile;
056
057/**
058 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS.
059 * <p>
060 * This implementation does not use local caching.
061 * <p>
062 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that
063 * exposes a {@link File}.
064 *
065 * @since 7.10
066 */
067public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider {
068
069    public static final String SERVER_PROPERTY = "server";
070
071    public static final String DBNAME_PROPERTY = "dbname";
072
073    public static final String BUCKET_PROPERTY = "bucket";
074
075    protected Map<String, String> properties;
076
077    protected MongoClient client;
078
079    protected GridFS gridFS;
080
081    @Override
082    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
083        super.initialize(blobProviderId, properties);
084        this.properties = properties;
085        String server = properties.get(SERVER_PROPERTY);
086        if (StringUtils.isBlank(server)) {
087            throw new NuxeoException("Missing server property in GridFS Binary Manager descriptor: " + blobProviderId);
088        }
089        String dbname = properties.get(DBNAME_PROPERTY);
090        if (StringUtils.isBlank(dbname)) {
091            throw new NuxeoException("Missing dbname property in GridFS Binary Manager descriptor: " + blobProviderId);
092        }
093        String bucket = properties.get(BUCKET_PROPERTY);
094        if (StringUtils.isBlank(bucket)) {
095            bucket = blobProviderId + ".fs";
096        }
097        if (server.startsWith("mongodb://")) {
098            client = new MongoClient(new MongoClientURI(server));
099        } else {
100            client = new MongoClient(new ServerAddress(server));
101        }
102        gridFS = new GridFS(client.getDB(dbname), bucket);
103        garbageCollector = new GridFSBinaryGarbageCollector();
104    }
105
106    @Override
107    public void close() {
108        if (client != null) {
109            client.close();
110            client = null;
111        }
112    }
113
114    @Override
115    public BinaryManager getBinaryManager() {
116        return this;
117    }
118
119    public GridFS getGridFS() {
120        return gridFS;
121    }
122
123    /**
124     * A binary backed by GridFS.
125     */
126    protected class GridFSBinary extends Binary {
127
128        private static final long serialVersionUID = 1L;
129
130        protected GridFSBinary(String digest, long length, String blobProviderId) {
131            super(digest, blobProviderId);
132            this.length = length;
133        }
134
135        @Override
136        public InputStream getStream() {
137            GridFSDBFile dbFile = gridFS.findOne(digest);
138            return dbFile == null ? null : dbFile.getInputStream();
139        }
140    }
141
142    @Override
143    public Binary getBinary(Blob blob) throws IOException {
144        if (!(blob instanceof FileBlob)) {
145            return super.getBinary(blob); // just open the stream and call getBinary(InputStream)
146        }
147        // we already have a file so can compute the length and digest efficiently
148        File file = ((FileBlob) blob).getFile();
149        long length = file.length();
150        String digest;
151        try (InputStream in = new FileInputStream(file)) {
152            digest = DigestUtils.md5Hex(in);
153        }
154        // if the digest is not already known then save to GridFS
155        GridFSDBFile dbFile = gridFS.findOne(digest);
156        if (dbFile == null) {
157            try (InputStream in = new FileInputStream(file)) {
158                GridFSInputFile inputFile = gridFS.createFile(in, digest);
159                inputFile.save();
160            }
161        }
162        return new GridFSBinary(digest, length, blobProviderId);
163    }
164
165    @Override
166    protected Binary getBinary(InputStream in) throws IOException {
167        // save the file to GridFS
168        GridFSInputFile inputFile = gridFS.createFile(in, true);
169        inputFile.save();
170        // now we know length and digest
171        long length = inputFile.getLength();
172        String digest = inputFile.getMD5();
173        // if the digest is already known then reuse it instead
174        GridFSDBFile dbFile = gridFS.findOne(digest);
175        if (dbFile == null) {
176            // no existing file, set its filename as the digest
177            inputFile.setFilename(digest);
178            inputFile.save();
179        } else {
180            // file already existed, no need for the temporary one
181            gridFS.remove(inputFile);
182        }
183        return new GridFSBinary(digest, length, blobProviderId);
184    }
185
186    @Override
187    public Binary getBinary(String digest) {
188        GridFSDBFile dbFile = gridFS.findOne(digest);
189        if (dbFile != null) {
190            return new GridFSBinary(digest, dbFile.getLength(), blobProviderId);
191        }
192        return null;
193    }
194
195    @Override
196    public Blob readBlob(BlobManager.BlobInfo blobInfo) throws IOException {
197        // just delegate to avoid copy/pasting code
198        return new BinaryBlobProvider(this).readBlob(blobInfo);
199    }
200
201    @Override
202    public String writeBlob(Blob blob, Document doc) throws IOException {
203        // just delegate to avoid copy/pasting code
204        return new BinaryBlobProvider(this).writeBlob(blob, doc);
205    }
206
207    @Override
208    public boolean supportsUserUpdate() {
209        return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE));
210    }
211
212    public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector {
213
214        protected BinaryManagerStatus status;
215
216        protected volatile long startTime;
217
218        protected static final String MARK_KEY_PREFIX = "gc-mark-key-";
219
220        protected String msKey;
221
222        @Override
223        public String getId() {
224            return "gridfs:" + getGridFS().getBucketName();
225        }
226
227        @Override
228        public BinaryManagerStatus getStatus() {
229            return status;
230        }
231
232        @Override
233        public boolean isInProgress() {
234            return startTime != 0;
235        }
236
237        @Override
238        public void mark(String digest) {
239            GridFSDBFile dbFile = gridFS.findOne(digest);
240            if (dbFile != null) {
241                dbFile.setMetaData(new BasicDBObject(msKey, TRUE));
242                dbFile.save();
243                status.numBinaries += 1;
244                status.sizeBinaries += dbFile.getLength();
245            }
246        }
247
248        @Override
249        public void start() {
250            if (startTime != 0) {
251                throw new NuxeoException("Already started");
252            }
253            startTime = System.currentTimeMillis();
254            status = new BinaryManagerStatus();
255            msKey = MARK_KEY_PREFIX + System.currentTimeMillis();
256        }
257
258        @Override
259        public void stop(boolean delete) {
260            DBObject query = new BasicDBObject("metadata." + msKey, new BasicDBObject("$exists", FALSE));
261            List<GridFSDBFile> files = gridFS.find(query);
262            for (GridFSDBFile file : files) {
263                status.numBinariesGC += 1;
264                status.sizeBinariesGC += file.getLength();
265                if (delete) {
266                    gridFS.remove(file);
267                }
268            }
269            startTime = 0;
270        }
271    }
272
273}