001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.mongodb; 021 022import static java.lang.Boolean.TRUE; 023import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE; 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.IOException; 028import java.io.InputStream; 029import java.util.Map; 030 031import org.apache.commons.codec.digest.DigestUtils; 032import org.apache.commons.lang3.StringUtils; 033import org.bson.Document; 034import org.bson.types.ObjectId; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 038import org.nuxeo.ecm.core.blob.BlobInfo; 039import org.nuxeo.ecm.core.blob.BlobProvider; 040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager; 041import org.nuxeo.ecm.core.blob.binary.Binary; 042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 044import org.nuxeo.ecm.core.blob.binary.BinaryManager; 045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 048 049import com.mongodb.Block; 050import com.mongodb.client.MongoCollection; 051import com.mongodb.client.MongoDatabase; 052import com.mongodb.client.gridfs.GridFSBucket; 053import com.mongodb.client.gridfs.GridFSBuckets; 054import com.mongodb.client.gridfs.model.GridFSFile; 055import com.mongodb.client.model.Filters; 056import com.mongodb.client.model.FindOneAndUpdateOptions; 057import com.mongodb.client.model.ReturnDocument; 058import com.mongodb.client.model.Updates; 059import com.mongodb.gridfs.GridFS; 060 061/** 062 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS. 063 * <p> 064 * This implementation does not use local caching. 065 * <p> 066 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that 067 * exposes a {@link File}. 068 * 069 * @since 7.10 070 */ 071public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider { 072 073 /** 074 * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}. 075 * <p /> 076 * The connection id will be {@code blobProvider/[BLOB_PROVIDER_ID]}. 077 */ 078 public static final String BLOB_PROVIDER_CONNECTION_PREFIX = "blobProvider/"; 079 080 /** 081 * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead 082 */ 083 @Deprecated 084 public static final String SERVER_PROPERTY = "server"; 085 086 /** 087 * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead 088 */ 089 @Deprecated 090 public static final String DBNAME_PROPERTY = "dbname"; 091 092 public static final String BUCKET_PROPERTY = "bucket"; 093 094 private static final String METADATA_PROPERTY_FILENAME = "filename"; 095 096 private static final String METADATA_PROPERTY_METADATA = "metadata"; 097 098 private static final String METADATA_PROPERTY_LENGTH = "length"; 099 100 protected Map<String, String> properties; 101 102 @Deprecated 103 protected GridFS gridFS; 104 105 protected GridFSBucket gridFSBucket; 106 107 protected MongoCollection<Document> filesColl; 108 109 @Override 110 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 111 super.initialize(blobProviderId, properties); 112 this.properties = properties; 113 if (StringUtils.isNotBlank(properties.get(SERVER_PROPERTY)) 114 || StringUtils.isNotBlank(properties.get(DBNAME_PROPERTY))) { 115 throw new NuxeoException("Unable to initialize GridFS Binary Manager, properties " + SERVER_PROPERTY 116 + " and " + DBNAME_PROPERTY + " has been removed. Please configure a connection!"); 117 } 118 String bucket = properties.get(BUCKET_PROPERTY); 119 if (StringUtils.isBlank(bucket)) { 120 bucket = blobProviderId + ".fs"; 121 } 122 123 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 124 MongoDatabase database = mongoService.getDatabase(BLOB_PROVIDER_CONNECTION_PREFIX + blobProviderId); 125 gridFSBucket = GridFSBuckets.create(database, bucket); 126 filesColl = database.getCollection(bucket + ".files"); 127 garbageCollector = new GridFSBinaryGarbageCollector(bucket); 128 } 129 130 @Override 131 public void close() { 132 } 133 134 @Override 135 public BinaryManager getBinaryManager() { 136 return this; 137 } 138 139 protected GridFSBucket getGridFSBucket() { 140 return gridFSBucket; 141 } 142 143 /** 144 * A binary backed by GridFS. 145 */ 146 protected class GridFSBinary extends Binary { 147 148 private static final long serialVersionUID = 1L; 149 150 protected GridFSBinary(String digest, String blobProviderId) { 151 super(digest, blobProviderId); 152 } 153 154 @Override 155 public InputStream getStream() { 156 return gridFSBucket.openDownloadStream(digest); 157 } 158 } 159 160 @Override 161 public Binary getBinary(Blob blob) throws IOException { 162 if (!(blob instanceof FileBlob)) { 163 return super.getBinary(blob); // just open the stream and call getBinary(InputStream) 164 } 165 // we already have a file so can compute the length and digest efficiently 166 File file = blob.getFile(); 167 String digest; 168 try (InputStream in = new FileInputStream(file)) { 169 digest = DigestUtils.md5Hex(in); 170 } 171 // if the digest is not already known then save to GridFS 172 GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first(); 173 if (dbFile == null) { 174 try (InputStream in = new FileInputStream(file)) { 175 gridFSBucket.uploadFromStream(digest, in); 176 } 177 } 178 return new GridFSBinary(digest, blobProviderId); 179 } 180 181 @Override 182 protected Binary getBinary(InputStream in) throws IOException { 183 try { 184 // save the file to GridFS 185 String inputName = "tmp-" + System.nanoTime(); 186 ObjectId id = gridFSBucket.uploadFromStream(inputName, in); 187 // now we know length and digest 188 GridFSFile inputFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, inputName)).first(); 189 String digest = inputFile.getMD5(); 190 // if the digest is already known then reuse it instead 191 GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first(); 192 if (dbFile == null) { 193 // no existing file, set its filename as the digest 194 gridFSBucket.rename(id, digest); 195 } else { 196 // file already existed, no need for the temporary one 197 gridFSBucket.delete(id); 198 } 199 return new GridFSBinary(digest, blobProviderId); 200 } finally { 201 in.close(); 202 } 203 } 204 205 @Override 206 public Binary getBinary(String digest) { 207 GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first(); 208 if (dbFile != null) { 209 return new GridFSBinary(digest, blobProviderId); 210 } 211 return null; 212 } 213 214 @Override 215 public Blob readBlob(BlobInfo blobInfo) throws IOException { 216 // just delegate to avoid copy/pasting code 217 return new BinaryBlobProvider(this).readBlob(blobInfo); 218 } 219 220 @Override 221 public String writeBlob(Blob blob) throws IOException { 222 // just delegate to avoid copy/pasting code 223 return new BinaryBlobProvider(this).writeBlob(blob); 224 } 225 226 @Override 227 public boolean supportsUserUpdate() { 228 return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE)); 229 } 230 231 public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector { 232 233 protected final String bucket; 234 235 protected BinaryManagerStatus status; 236 237 protected volatile long startTime; 238 239 protected static final String MARK_KEY_PREFIX = "gc-mark-key-"; 240 241 protected String msKey; 242 243 public GridFSBinaryGarbageCollector(String bucket) { 244 this.bucket = bucket; 245 } 246 247 @Override 248 public String getId() { 249 return "gridfs:" + bucket; 250 } 251 252 @Override 253 public BinaryManagerStatus getStatus() { 254 return status; 255 } 256 257 @Override 258 public boolean isInProgress() { 259 return startTime != 0; 260 } 261 262 @Override 263 public void mark(String digest) { 264 Document dbFile = filesColl.findOneAndUpdate(Filters.eq(METADATA_PROPERTY_FILENAME, digest), 265 Updates.set(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), TRUE), 266 new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)); 267 if (dbFile != null) { 268 status.numBinaries += 1; 269 status.sizeBinaries += dbFile.getLong(METADATA_PROPERTY_LENGTH); 270 } 271 } 272 273 @Override 274 public void start() { 275 if (startTime != 0) { 276 throw new NuxeoException("Already started"); 277 } 278 startTime = System.currentTimeMillis(); 279 status = new BinaryManagerStatus(); 280 msKey = MARK_KEY_PREFIX + System.currentTimeMillis(); 281 } 282 283 @Override 284 public void stop(boolean delete) { 285 gridFSBucket.find(Filters.exists(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), false)) // 286 .forEach((Block<GridFSFile>) file -> { 287 status.numBinariesGC += 1; 288 status.sizeBinariesGC += file.getLength(); 289 if (delete) { 290 gridFSBucket.delete(file.getId()); 291 } 292 }); 293 startTime = 0; 294 } 295 } 296 297 @Override 298 public Map<String, String> getProperties() { 299 return properties; 300 } 301 302}