001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.mongodb;
021
022import static java.lang.Boolean.TRUE;
023import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE;
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.IOException;
028import java.io.InputStream;
029import java.util.Map;
030
031import org.apache.commons.codec.digest.DigestUtils;
032import org.apache.commons.lang3.StringUtils;
033import org.bson.Document;
034import org.bson.types.ObjectId;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
038import org.nuxeo.ecm.core.blob.BlobInfo;
039import org.nuxeo.ecm.core.blob.BlobProvider;
040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager;
041import org.nuxeo.ecm.core.blob.binary.Binary;
042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
044import org.nuxeo.ecm.core.blob.binary.BinaryManager;
045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
048
049import com.mongodb.Block;
050import com.mongodb.client.MongoCollection;
051import com.mongodb.client.MongoDatabase;
052import com.mongodb.client.gridfs.GridFSBucket;
053import com.mongodb.client.gridfs.GridFSBuckets;
054import com.mongodb.client.gridfs.model.GridFSFile;
055import com.mongodb.client.model.Filters;
056import com.mongodb.client.model.FindOneAndUpdateOptions;
057import com.mongodb.client.model.ReturnDocument;
058import com.mongodb.client.model.Updates;
059import com.mongodb.gridfs.GridFS;
060
061/**
062 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS.
063 * <p>
064 * This implementation does not use local caching.
065 * <p>
066 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that
067 * exposes a {@link File}.
068 *
069 * @since 7.10
070 */
071public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider {
072
073    /**
074     * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}.
075     * <p />
076     * The connection id will be {@code blobProvider/[BLOB_PROVIDER_ID]}.
077     */
078    public static final String BLOB_PROVIDER_CONNECTION_PREFIX = "blobProvider/";
079
080    /**
081     * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead
082     */
083    @Deprecated
084    public static final String SERVER_PROPERTY = "server";
085
086    /**
087     * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead
088     */
089    @Deprecated
090    public static final String DBNAME_PROPERTY = "dbname";
091
092    public static final String BUCKET_PROPERTY = "bucket";
093
094    private static final String METADATA_PROPERTY_FILENAME = "filename";
095
096    private static final String METADATA_PROPERTY_METADATA = "metadata";
097
098    private static final String METADATA_PROPERTY_LENGTH = "length";
099
100    protected Map<String, String> properties;
101
102    @Deprecated
103    protected GridFS gridFS;
104
105    protected GridFSBucket gridFSBucket;
106
107    protected MongoCollection<Document> filesColl;
108
109    @Override
110    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
111        super.initialize(blobProviderId, properties);
112        this.properties = properties;
113        if (StringUtils.isNotBlank(properties.get(SERVER_PROPERTY))
114                || StringUtils.isNotBlank(properties.get(DBNAME_PROPERTY))) {
115            throw new NuxeoException("Unable to initialize GridFS Binary Manager, properties " + SERVER_PROPERTY
116                    + " and " + DBNAME_PROPERTY + " has been removed. Please configure a connection!");
117        }
118        String bucket = properties.get(BUCKET_PROPERTY);
119        if (StringUtils.isBlank(bucket)) {
120            bucket = blobProviderId + ".fs";
121        }
122
123        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
124        MongoDatabase database = mongoService.getDatabase(BLOB_PROVIDER_CONNECTION_PREFIX + blobProviderId);
125        gridFSBucket = GridFSBuckets.create(database, bucket);
126        filesColl = database.getCollection(bucket + ".files");
127        garbageCollector = new GridFSBinaryGarbageCollector(bucket);
128    }
129
130    @Override
131    public void close() {
132    }
133
134    @Override
135    public BinaryManager getBinaryManager() {
136        return this;
137    }
138
139    protected GridFSBucket getGridFSBucket() {
140        return gridFSBucket;
141    }
142
143    /**
144     * A binary backed by GridFS.
145     */
146    protected class GridFSBinary extends Binary {
147
148        private static final long serialVersionUID = 1L;
149
150        protected GridFSBinary(String digest, String blobProviderId) {
151            super(digest, blobProviderId);
152        }
153
154        @Override
155        public InputStream getStream() {
156            return gridFSBucket.openDownloadStream(digest);
157        }
158    }
159
160    @Override
161    public Binary getBinary(Blob blob) throws IOException {
162        if (!(blob instanceof FileBlob)) {
163            return super.getBinary(blob); // just open the stream and call getBinary(InputStream)
164        }
165        // we already have a file so can compute the length and digest efficiently
166        File file = blob.getFile();
167        String digest;
168        try (InputStream in = new FileInputStream(file)) {
169            digest = DigestUtils.md5Hex(in);
170        }
171        // if the digest is not already known then save to GridFS
172        GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first();
173        if (dbFile == null) {
174            try (InputStream in = new FileInputStream(file)) {
175                gridFSBucket.uploadFromStream(digest, in);
176            }
177        }
178        return new GridFSBinary(digest, blobProviderId);
179    }
180
181    @Override
182    protected Binary getBinary(InputStream in) throws IOException {
183        try {
184            // save the file to GridFS
185            String inputName = "tmp-" + System.nanoTime();
186            ObjectId id = gridFSBucket.uploadFromStream(inputName, in);
187            // now we know length and digest
188            GridFSFile inputFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, inputName)).first();
189            String digest = inputFile.getMD5();
190            // if the digest is already known then reuse it instead
191            GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first();
192            if (dbFile == null) {
193                // no existing file, set its filename as the digest
194                gridFSBucket.rename(id, digest);
195            } else {
196                // file already existed, no need for the temporary one
197                gridFSBucket.delete(id);
198            }
199            return new GridFSBinary(digest, blobProviderId);
200        } finally {
201            in.close();
202        }
203    }
204
205    @Override
206    public Binary getBinary(String digest) {
207        GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first();
208        if (dbFile != null) {
209            return new GridFSBinary(digest, blobProviderId);
210        }
211        return null;
212    }
213
214    @Override
215    public Blob readBlob(BlobInfo blobInfo) throws IOException {
216        // just delegate to avoid copy/pasting code
217        return new BinaryBlobProvider(this).readBlob(blobInfo);
218    }
219
220    @Override
221    public String writeBlob(Blob blob) throws IOException {
222        // just delegate to avoid copy/pasting code
223        return new BinaryBlobProvider(this).writeBlob(blob);
224    }
225
226    @Override
227    public boolean supportsUserUpdate() {
228        return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE));
229    }
230
231    public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector {
232
233        protected final String bucket;
234
235        protected BinaryManagerStatus status;
236
237        protected volatile long startTime;
238
239        protected static final String MARK_KEY_PREFIX = "gc-mark-key-";
240
241        protected String msKey;
242
243        public GridFSBinaryGarbageCollector(String bucket) {
244            this.bucket = bucket;
245        }
246
247        @Override
248        public String getId() {
249            return "gridfs:" + bucket;
250        }
251
252        @Override
253        public BinaryManagerStatus getStatus() {
254            return status;
255        }
256
257        @Override
258        public boolean isInProgress() {
259            return startTime != 0;
260        }
261
262        @Override
263        public void mark(String digest) {
264            Document dbFile = filesColl.findOneAndUpdate(Filters.eq(METADATA_PROPERTY_FILENAME, digest),
265                    Updates.set(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), TRUE),
266                    new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
267            if (dbFile != null) {
268                status.numBinaries += 1;
269                status.sizeBinaries += dbFile.getLong(METADATA_PROPERTY_LENGTH);
270            }
271        }
272
273        @Override
274        public void start() {
275            if (startTime != 0) {
276                throw new NuxeoException("Already started");
277            }
278            startTime = System.currentTimeMillis();
279            status = new BinaryManagerStatus();
280            msKey = MARK_KEY_PREFIX + System.currentTimeMillis();
281        }
282
283        @Override
284        public void stop(boolean delete) {
285            gridFSBucket.find(Filters.exists(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), false)) //
286                        .forEach((Block<GridFSFile>) file -> {
287                            status.numBinariesGC += 1;
288                            status.sizeBinariesGC += file.getLength();
289                            if (delete) {
290                                gridFSBucket.delete(file.getId());
291                            }
292                        });
293            startTime = 0;
294        }
295    }
296
297    @Override
298    public Map<String, String> getProperties() {
299        return properties;
300    }
301
302}