001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.mongodb;
021
022import static java.lang.Boolean.FALSE;
023import static java.lang.Boolean.TRUE;
024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE;
025
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStream;
030import java.util.List;
031import java.util.Map;
032
033import org.apache.commons.codec.digest.DigestUtils;
034import org.apache.commons.lang.StringUtils;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
038import org.nuxeo.ecm.core.blob.BlobManager;
039import org.nuxeo.ecm.core.blob.BlobProvider;
040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager;
041import org.nuxeo.ecm.core.blob.binary.Binary;
042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
044import org.nuxeo.ecm.core.blob.binary.BinaryManager;
045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
046import org.nuxeo.ecm.core.model.Document;
047
048import com.mongodb.BasicDBObject;
049import com.mongodb.DBObject;
050import com.mongodb.MongoClient;
051import com.mongodb.MongoClientURI;
052import com.mongodb.ServerAddress;
053import com.mongodb.gridfs.GridFS;
054import com.mongodb.gridfs.GridFSDBFile;
055import com.mongodb.gridfs.GridFSInputFile;
056
057/**
058 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS.
059 * <p>
060 * This implementation does not use local caching.
061 * <p>
062 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that
063 * exposes a {@link File}.
064 *
065 * @since 7.10
066 */
067public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider {
068
069    public static final String SERVER_PROPERTY = "server";
070
071    public static final String DBNAME_PROPERTY = "dbname";
072
073    public static final String BUCKET_PROPERTY = "bucket";
074
075    protected Map<String, String> properties;
076
077    protected MongoClient client;
078
079    protected GridFS gridFS;
080
081    @Override
082    public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
083        super.initialize(blobProviderId, properties);
084        this.properties = properties;
085        String server = properties.get(SERVER_PROPERTY);
086        if (StringUtils.isBlank(server)) {
087            throw new NuxeoException("Missing server property in GridFS Binary Manager descriptor: " + blobProviderId);
088        }
089        String dbname = properties.get(DBNAME_PROPERTY);
090        if (StringUtils.isBlank(dbname)) {
091            throw new NuxeoException("Missing dbname property in GridFS Binary Manager descriptor: " + blobProviderId);
092        }
093        String bucket = properties.get(BUCKET_PROPERTY);
094        if (StringUtils.isBlank(bucket)) {
095            bucket = blobProviderId + ".fs";
096        }
097        if (server.startsWith("mongodb://")) {
098            client = new MongoClient(new MongoClientURI(server));
099        } else {
100            client = new MongoClient(new ServerAddress(server));
101        }
102        gridFS = new GridFS(client.getDB(dbname), bucket);
103        garbageCollector = new GridFSBinaryGarbageCollector();
104    }
105
106    @Override
107    public void close() {
108        if (client != null) {
109            client.close();
110            client = null;
111        }
112    }
113
114    @Override
115    public BinaryManager getBinaryManager() {
116        return this;
117    }
118
119    public GridFS getGridFS() {
120        return gridFS;
121    }
122
123    /**
124     * A binary backed by GridFS.
125     */
126    protected class GridFSBinary extends Binary {
127
128        private static final long serialVersionUID = 1L;
129
130        protected GridFSBinary(String digest, String blobProviderId) {
131            super(digest, blobProviderId);
132        }
133
134        @Override
135        public InputStream getStream() {
136            GridFSDBFile dbFile = gridFS.findOne(digest);
137            return dbFile == null ? null : dbFile.getInputStream();
138        }
139    }
140
141    @Override
142    public Binary getBinary(Blob blob) throws IOException {
143        if (!(blob instanceof FileBlob)) {
144            return super.getBinary(blob); // just open the stream and call getBinary(InputStream)
145        }
146        // we already have a file so can compute the length and digest efficiently
147        File file = ((FileBlob) blob).getFile();
148        String digest;
149        try (InputStream in = new FileInputStream(file)) {
150            digest = DigestUtils.md5Hex(in);
151        }
152        // if the digest is not already known then save to GridFS
153        GridFSDBFile dbFile = gridFS.findOne(digest);
154        if (dbFile == null) {
155            try (InputStream in = new FileInputStream(file)) {
156                GridFSInputFile inputFile = gridFS.createFile(in, digest);
157                inputFile.save();
158            }
159        }
160        return new GridFSBinary(digest, blobProviderId);
161    }
162
163    @Override
164    protected Binary getBinary(InputStream in) throws IOException {
165        // save the file to GridFS
166        GridFSInputFile inputFile = gridFS.createFile(in, true);
167        inputFile.save();
168        // now we know length and digest
169        String digest = inputFile.getMD5();
170        // if the digest is already known then reuse it instead
171        GridFSDBFile dbFile = gridFS.findOne(digest);
172        if (dbFile == null) {
173            // no existing file, set its filename as the digest
174            inputFile.setFilename(digest);
175            inputFile.save();
176        } else {
177            // file already existed, no need for the temporary one
178            gridFS.remove(inputFile);
179        }
180        return new GridFSBinary(digest, blobProviderId);
181    }
182
183    @Override
184    public Binary getBinary(String digest) {
185        GridFSDBFile dbFile = gridFS.findOne(digest);
186        if (dbFile != null) {
187            return new GridFSBinary(digest, blobProviderId);
188        }
189        return null;
190    }
191
192    @Override
193    public Blob readBlob(BlobManager.BlobInfo blobInfo) throws IOException {
194        // just delegate to avoid copy/pasting code
195        return new BinaryBlobProvider(this).readBlob(blobInfo);
196    }
197
198    @Override
199    public String writeBlob(Blob blob, Document doc) throws IOException {
200        // just delegate to avoid copy/pasting code
201        return new BinaryBlobProvider(this).writeBlob(blob, doc);
202    }
203
204    @Override
205    public boolean supportsUserUpdate() {
206        return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE));
207    }
208
209    public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector {
210
211        protected BinaryManagerStatus status;
212
213        protected volatile long startTime;
214
215        protected static final String MARK_KEY_PREFIX = "gc-mark-key-";
216
217        protected String msKey;
218
219        @Override
220        public String getId() {
221            return "gridfs:" + getGridFS().getBucketName();
222        }
223
224        @Override
225        public BinaryManagerStatus getStatus() {
226            return status;
227        }
228
229        @Override
230        public boolean isInProgress() {
231            return startTime != 0;
232        }
233
234        @Override
235        public void mark(String digest) {
236            GridFSDBFile dbFile = gridFS.findOne(digest);
237            if (dbFile != null) {
238                dbFile.setMetaData(new BasicDBObject(msKey, TRUE));
239                dbFile.save();
240                status.numBinaries += 1;
241                status.sizeBinaries += dbFile.getLength();
242            }
243        }
244
245        @Override
246        public void start() {
247            if (startTime != 0) {
248                throw new NuxeoException("Already started");
249            }
250            startTime = System.currentTimeMillis();
251            status = new BinaryManagerStatus();
252            msKey = MARK_KEY_PREFIX + System.currentTimeMillis();
253        }
254
255        @Override
256        public void stop(boolean delete) {
257            DBObject query = new BasicDBObject("metadata." + msKey, new BasicDBObject("$exists", FALSE));
258            List<GridFSDBFile> files = gridFS.find(query);
259            for (GridFSDBFile file : files) {
260                status.numBinariesGC += 1;
261                status.sizeBinariesGC += file.getLength();
262                if (delete) {
263                    gridFS.remove(file);
264                }
265            }
266            startTime = 0;
267        }
268    }
269
270}