001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.mongodb;
021
022import static java.lang.Boolean.TRUE;
023import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.NAMESPACE;
024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE;
025import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.TRANSIENT;
026
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.IOException;
030import java.io.InputStream;
031import java.util.Map;
032
033import org.apache.commons.codec.digest.DigestUtils;
034import org.apache.commons.lang3.StringUtils;
035import org.bson.BsonValue;
036import org.bson.Document;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
040import org.nuxeo.ecm.core.blob.BlobInfo;
041import org.nuxeo.ecm.core.blob.BlobManager;
042import org.nuxeo.ecm.core.blob.BlobProvider;
043import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager;
044import org.nuxeo.ecm.core.blob.binary.Binary;
045import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
046import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
047import org.nuxeo.ecm.core.blob.binary.BinaryManager;
048import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
049import org.nuxeo.runtime.api.Framework;
050import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
051
052import com.mongodb.client.MongoCollection;
053import com.mongodb.client.MongoDatabase;
054import com.mongodb.client.gridfs.GridFSBucket;
055import com.mongodb.client.gridfs.GridFSBuckets;
056import com.mongodb.client.gridfs.GridFSUploadStream;
057import com.mongodb.client.gridfs.model.GridFSFile;
058import com.mongodb.client.model.Filters;
059import com.mongodb.client.model.FindOneAndUpdateOptions;
060import com.mongodb.client.model.ReturnDocument;
061import com.mongodb.client.model.Updates;
062
063/**
064 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS.
065 * <p>
066 * This implementation does not use local caching.
067 * <p>
068 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that
069 * exposes a {@link File}.
070 *
071 * @since 7.10
072 */
073public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider {
074
075    /**
076     * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}.
077     * <p>
078     * The connection id will be {@code blobProvider/[BLOB_PROVIDER_ID]}.
079     */
080    public static final String BLOB_PROVIDER_CONNECTION_PREFIX = "blobProvider/";
081
082    /**
083     * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead
084     */
085    @Deprecated
086    public static final String SERVER_PROPERTY = "server";
087
088    /**
089     * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead
090     */
091    @Deprecated
092    public static final String DBNAME_PROPERTY = "dbname";
093
094    public static final String BUCKET_PROPERTY = "bucket";
095
096    private static final String METADATA_PROPERTY_FILENAME = "filename";
097
098    private static final String METADATA_PROPERTY_METADATA = "metadata";
099
100    private static final String METADATA_PROPERTY_LENGTH = "length";
101
102    protected GridFSBucket gridFSBucket;
103
104    protected MongoCollection<Document> filesColl;
105
106    @Override
107    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
108        super.initialize(blobProviderId, properties);
109        if (StringUtils.isNotBlank(properties.get(SERVER_PROPERTY))
110                || StringUtils.isNotBlank(properties.get(DBNAME_PROPERTY))) {
111            throw new NuxeoException("Unable to initialize GridFS Binary Manager, properties " + SERVER_PROPERTY
112                    + " and " + DBNAME_PROPERTY + " has been removed. Please configure a connection!");
113        }
114
115        String namespace = properties.get(NAMESPACE);
116        String bucket = properties.get(BUCKET_PROPERTY);
117        if (StringUtils.isBlank(bucket)) {
118            if (StringUtils.isNotBlank(namespace)) {
119                bucket = blobProviderId + "." + namespace.trim();
120            } else {
121                bucket = blobProviderId;
122            }
123            bucket = bucket + ".fs";
124        } else if (StringUtils.isNotBlank(namespace)) {
125            bucket = bucket + "." + namespace.trim();
126        }
127
128        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
129        MongoDatabase database = mongoService.getDatabase(BLOB_PROVIDER_CONNECTION_PREFIX + blobProviderId);
130        gridFSBucket = GridFSBuckets.create(database, bucket);
131        filesColl = database.getCollection(bucket + ".files");
132        garbageCollector = new GridFSBinaryGarbageCollector(bucket);
133    }
134
135    @Override
136    public void close() {
137    }
138
139    @Override
140    public BinaryManager getBinaryManager() {
141        return this;
142    }
143
144    protected GridFSBucket getGridFSBucket() {
145        return gridFSBucket;
146    }
147
148    /**
149     * A binary backed by GridFS.
150     */
151    protected static class GridFSBinary extends Binary {
152
153        private static final long serialVersionUID = 1L;
154
155        // transient to be Serializable
156        protected transient GridFSBinaryManager bm;
157
158        protected GridFSBinary(String digest, String blobProviderId, GridFSBinaryManager bm) {
159            super(digest, blobProviderId);
160            this.bm = bm;
161        }
162
163        // because the class is Serializable, re-acquire the BinaryManager if needed
164        protected GridFSBinaryManager getBinaryManager() {
165            if (bm == null) {
166                if (blobProviderId == null) {
167                    throw new UnsupportedOperationException("Cannot find binary manager, no blob provider id");
168                }
169                BlobManager blobManager = Framework.getService(BlobManager.class);
170                BlobProvider bp = blobManager.getBlobProvider(blobProviderId);
171                bm = (GridFSBinaryManager) bp.getBinaryManager();
172            }
173            return bm;
174        }
175
176        @Override
177        public InputStream getStream() {
178            return getBinaryManager().getGridFSBucket().openDownloadStream(digest);
179        }
180
181        @Override
182        protected File recomputeFile() {
183            return null; // no file to recompute
184        }
185    }
186
187    @Override
188    public Binary getBinary(Blob blob) throws IOException {
189        if (!(blob instanceof FileBlob)) {
190            return super.getBinary(blob); // just open the stream and call getBinary(InputStream)
191        }
192        // we already have a file so can compute the length and digest efficiently
193        File file = blob.getFile();
194        String digest;
195        try (InputStream in = new FileInputStream(file)) {
196            digest = DigestUtils.md5Hex(in);
197        }
198        // if the digest is not already known then save to GridFS
199        GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first();
200        if (dbFile == null) {
201            try (InputStream in = new FileInputStream(file)) {
202                gridFSBucket.uploadFromStream(digest, in);
203            }
204        }
205        return new GridFSBinary(digest, blobProviderId, this);
206    }
207
208    @Override
209    protected Binary getBinary(InputStream in) throws IOException {
210        // save the file to GridFS
211        String inputName = "tmp-" + System.nanoTime();
212        try (in; GridFSUploadStream uploadStream = gridFSBucket.openUploadStream(inputName)) {
213            BsonValue id = uploadStream.getId();
214            String digest = storeAndDigest(in, uploadStream);
215            // if the digest is already known then reuse it instead
216            GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first();
217            if (dbFile == null) {
218                // no existing file, set its filename as the digest
219                gridFSBucket.rename(id, digest);
220            } else {
221                // file already existed, no need for the temporary one
222                gridFSBucket.delete(id);
223            }
224            return new GridFSBinary(digest, blobProviderId, this);
225        }
226    }
227
228    @Override
229    public Binary getBinary(String digest) {
230        GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first();
231        if (dbFile != null) {
232            return new GridFSBinary(digest, blobProviderId, this);
233        }
234        return null;
235    }
236
237    @Override
238    public Blob readBlob(BlobInfo blobInfo) throws IOException {
239        // just delegate to avoid copy/pasting code
240        return new BinaryBlobProvider(this).readBlob(blobInfo);
241    }
242
243    @Override
244    public String writeBlob(Blob blob) throws IOException {
245        // just delegate to avoid copy/pasting code
246        return new BinaryBlobProvider(this).writeBlob(blob);
247    }
248
249    @Override
250    public boolean supportsUserUpdate() {
251        return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE));
252    }
253
254    @Override
255    public boolean isTransient() {
256        return Boolean.parseBoolean(properties.get(TRANSIENT));
257    }
258
259    public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector {
260
261        protected final String bucket;
262
263        protected BinaryManagerStatus status;
264
265        protected volatile long startTime;
266
267        protected static final String MARK_KEY_PREFIX = "gc-mark-key-";
268
269        protected String msKey;
270
271        public GridFSBinaryGarbageCollector(String bucket) {
272            this.bucket = bucket;
273        }
274
275        @Override
276        public String getId() {
277            return "gridfs:" + bucket;
278        }
279
280        @Override
281        public BinaryManagerStatus getStatus() {
282            return status;
283        }
284
285        @Override
286        public boolean isInProgress() {
287            return startTime != 0;
288        }
289
290        @Override
291        public void mark(String digest) {
292            Document dbFile = filesColl.findOneAndUpdate(Filters.eq(METADATA_PROPERTY_FILENAME, digest),
293                    Updates.set(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), TRUE),
294                    new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
295            if (dbFile != null) {
296                status.numBinaries += 1;
297                status.sizeBinaries += dbFile.getLong(METADATA_PROPERTY_LENGTH);
298            }
299        }
300
301        @Override
302        public void start() {
303            if (startTime != 0) {
304                throw new NuxeoException("Already started");
305            }
306            startTime = System.currentTimeMillis();
307            status = new BinaryManagerStatus();
308            msKey = MARK_KEY_PREFIX + System.currentTimeMillis();
309        }
310
311        @Override
312        public void stop(boolean delete) {
313            gridFSBucket.find(Filters.exists(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), false)) //
314                        .forEach(file -> {
315                            status.numBinariesGC += 1;
316                            status.sizeBinariesGC += file.getLength();
317                            if (delete) {
318                                gridFSBucket.delete(file.getId());
319                            }
320                        });
321            startTime = 0;
322        }
323    }
324
325    @Override
326    public Map<String, String> getProperties() {
327        return properties;
328    }
329
330}