001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.mongodb;
021
022import static java.lang.Boolean.FALSE;
023import static java.lang.Boolean.TRUE;
024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE;
025
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStream;
030import java.util.List;
031import java.util.Map;
032
033import org.apache.commons.codec.digest.DigestUtils;
034import org.apache.commons.lang.StringUtils;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
038import org.nuxeo.ecm.core.blob.BlobInfo;
039import org.nuxeo.ecm.core.blob.BlobProvider;
040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager;
041import org.nuxeo.ecm.core.blob.binary.Binary;
042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
044import org.nuxeo.ecm.core.blob.binary.BinaryManager;
045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
046
047import com.mongodb.BasicDBObject;
048import com.mongodb.DBObject;
049import com.mongodb.MongoClient;
050import com.mongodb.MongoClientURI;
051import com.mongodb.ServerAddress;
052import com.mongodb.gridfs.GridFS;
053import com.mongodb.gridfs.GridFSDBFile;
054import com.mongodb.gridfs.GridFSInputFile;
055
056/**
057 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS.
058 * <p>
059 * This implementation does not use local caching.
060 * <p>
061 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that
062 * exposes a {@link File}.
063 *
064 * @since 7.10
065 */
066public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider {
067
068    public static final String SERVER_PROPERTY = "server";
069
070    public static final String DBNAME_PROPERTY = "dbname";
071
072    public static final String BUCKET_PROPERTY = "bucket";
073
074    protected Map<String, String> properties;
075
076    protected MongoClient client;
077
078    protected GridFS gridFS;
079
080    @Override
081    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
082        super.initialize(blobProviderId, properties);
083        this.properties = properties;
084        String server = properties.get(SERVER_PROPERTY);
085        if (StringUtils.isBlank(server)) {
086            throw new NuxeoException("Missing server property in GridFS Binary Manager descriptor: " + blobProviderId);
087        }
088        String dbname = properties.get(DBNAME_PROPERTY);
089        if (StringUtils.isBlank(dbname)) {
090            throw new NuxeoException("Missing dbname property in GridFS Binary Manager descriptor: " + blobProviderId);
091        }
092        String bucket = properties.get(BUCKET_PROPERTY);
093        if (StringUtils.isBlank(bucket)) {
094            bucket = blobProviderId + ".fs";
095        }
096        if (server.startsWith("mongodb://")) {
097            client = new MongoClient(new MongoClientURI(server));
098        } else {
099            client = new MongoClient(new ServerAddress(server));
100        }
101        gridFS = new GridFS(client.getDB(dbname), bucket);
102        garbageCollector = new GridFSBinaryGarbageCollector();
103    }
104
105    @Override
106    public void close() {
107        if (client != null) {
108            client.close();
109            client = null;
110        }
111    }
112
113    @Override
114    public BinaryManager getBinaryManager() {
115        return this;
116    }
117
118    public GridFS getGridFS() {
119        return gridFS;
120    }
121
122    /**
123     * A binary backed by GridFS.
124     */
125    protected class GridFSBinary extends Binary {
126
127        private static final long serialVersionUID = 1L;
128
129        protected GridFSBinary(String digest, String blobProviderId) {
130            super(digest, blobProviderId);
131        }
132
133        @Override
134        public InputStream getStream() {
135            GridFSDBFile dbFile = gridFS.findOne(digest);
136            return dbFile == null ? null : dbFile.getInputStream();
137        }
138    }
139
140    @Override
141    public Binary getBinary(Blob blob) throws IOException {
142        if (!(blob instanceof FileBlob)) {
143            return super.getBinary(blob); // just open the stream and call getBinary(InputStream)
144        }
145        // we already have a file so can compute the length and digest efficiently
146        File file = ((FileBlob) blob).getFile();
147        String digest;
148        try (InputStream in = new FileInputStream(file)) {
149            digest = DigestUtils.md5Hex(in);
150        }
151        // if the digest is not already known then save to GridFS
152        GridFSDBFile dbFile = gridFS.findOne(digest);
153        if (dbFile == null) {
154            try (InputStream in = new FileInputStream(file)) {
155                GridFSInputFile inputFile = gridFS.createFile(in, digest);
156                inputFile.save();
157            }
158        }
159        return new GridFSBinary(digest, blobProviderId);
160    }
161
162    @Override
163    protected Binary getBinary(InputStream in) throws IOException {
164        // save the file to GridFS
165        GridFSInputFile inputFile = gridFS.createFile(in, true);
166        inputFile.save();
167        // now we know length and digest
168        String digest = inputFile.getMD5();
169        // if the digest is already known then reuse it instead
170        GridFSDBFile dbFile = gridFS.findOne(digest);
171        if (dbFile == null) {
172            // no existing file, set its filename as the digest
173            inputFile.setFilename(digest);
174            inputFile.save();
175        } else {
176            // file already existed, no need for the temporary one
177            gridFS.remove(inputFile);
178        }
179        return new GridFSBinary(digest, blobProviderId);
180    }
181
182    @Override
183    public Binary getBinary(String digest) {
184        GridFSDBFile dbFile = gridFS.findOne(digest);
185        if (dbFile != null) {
186            return new GridFSBinary(digest, blobProviderId);
187        }
188        return null;
189    }
190
191    @Override
192    public Blob readBlob(BlobInfo blobInfo) throws IOException {
193        // just delegate to avoid copy/pasting code
194        return new BinaryBlobProvider(this).readBlob(blobInfo);
195    }
196
197    @Override
198    public String writeBlob(Blob blob) throws IOException {
199        // just delegate to avoid copy/pasting code
200        return new BinaryBlobProvider(this).writeBlob(blob);
201    }
202
203    @Override
204    public boolean supportsUserUpdate() {
205        return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE));
206    }
207
208    public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector {
209
210        protected BinaryManagerStatus status;
211
212        protected volatile long startTime;
213
214        protected static final String MARK_KEY_PREFIX = "gc-mark-key-";
215
216        protected String msKey;
217
218        @Override
219        public String getId() {
220            return "gridfs:" + getGridFS().getBucketName();
221        }
222
223        @Override
224        public BinaryManagerStatus getStatus() {
225            return status;
226        }
227
228        @Override
229        public boolean isInProgress() {
230            return startTime != 0;
231        }
232
233        @Override
234        public void mark(String digest) {
235            GridFSDBFile dbFile = gridFS.findOne(digest);
236            if (dbFile != null) {
237                dbFile.setMetaData(new BasicDBObject(msKey, TRUE));
238                dbFile.save();
239                status.numBinaries += 1;
240                status.sizeBinaries += dbFile.getLength();
241            }
242        }
243
244        @Override
245        public void start() {
246            if (startTime != 0) {
247                throw new NuxeoException("Already started");
248            }
249            startTime = System.currentTimeMillis();
250            status = new BinaryManagerStatus();
251            msKey = MARK_KEY_PREFIX + System.currentTimeMillis();
252        }
253
254        @Override
255        public void stop(boolean delete) {
256            DBObject query = new BasicDBObject("metadata." + msKey, new BasicDBObject("$exists", FALSE));
257            List<GridFSDBFile> files = gridFS.find(query);
258            for (GridFSDBFile file : files) {
259                status.numBinariesGC += 1;
260                status.sizeBinariesGC += file.getLength();
261                if (delete) {
262                    gridFS.remove(file);
263                }
264            }
265            startTime = 0;
266        }
267    }
268
269}