001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.mongodb; 021 022import static java.lang.Boolean.TRUE; 023import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.NAMESPACE; 024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE; 025import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.TRANSIENT; 026 027import java.io.File; 028import java.io.FileInputStream; 029import java.io.IOException; 030import java.io.InputStream; 031import java.util.Map; 032 033import org.apache.commons.codec.digest.DigestUtils; 034import org.apache.commons.lang3.StringUtils; 035import org.bson.BsonValue; 036import org.bson.Document; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 040import org.nuxeo.ecm.core.blob.BlobInfo; 041import org.nuxeo.ecm.core.blob.BlobManager; 042import org.nuxeo.ecm.core.blob.BlobProvider; 043import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager; 044import org.nuxeo.ecm.core.blob.binary.Binary; 045import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 046import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 047import org.nuxeo.ecm.core.blob.binary.BinaryManager; 048import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 049import org.nuxeo.runtime.api.Framework; 050import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 051 052import com.mongodb.client.MongoCollection; 053import com.mongodb.client.MongoDatabase; 054import com.mongodb.client.gridfs.GridFSBucket; 055import com.mongodb.client.gridfs.GridFSBuckets; 056import com.mongodb.client.gridfs.GridFSUploadStream; 057import com.mongodb.client.gridfs.model.GridFSFile; 058import com.mongodb.client.model.Filters; 059import com.mongodb.client.model.FindOneAndUpdateOptions; 060import com.mongodb.client.model.ReturnDocument; 061import com.mongodb.client.model.Updates; 062 063/** 064 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS. 065 * <p> 066 * This implementation does not use local caching. 067 * <p> 068 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that 069 * exposes a {@link File}. 070 * 071 * @since 7.10 072 */ 073public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider { 074 075 /** 076 * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}. 077 * <p> 078 * The connection id will be {@code blobProvider/[BLOB_PROVIDER_ID]}. 079 */ 080 public static final String BLOB_PROVIDER_CONNECTION_PREFIX = "blobProvider/"; 081 082 /** 083 * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead 084 */ 085 @Deprecated 086 public static final String SERVER_PROPERTY = "server"; 087 088 /** 089 * @deprecated since 9.3 use {@link MongoDBConnectionService} to provide access instead 090 */ 091 @Deprecated 092 public static final String DBNAME_PROPERTY = "dbname"; 093 094 public static final String BUCKET_PROPERTY = "bucket"; 095 096 private static final String METADATA_PROPERTY_FILENAME = "filename"; 097 098 private static final String METADATA_PROPERTY_METADATA = "metadata"; 099 100 private static final String METADATA_PROPERTY_LENGTH = "length"; 101 102 protected GridFSBucket gridFSBucket; 103 104 protected MongoCollection<Document> filesColl; 105 106 @Override 107 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 108 super.initialize(blobProviderId, properties); 109 if (StringUtils.isNotBlank(properties.get(SERVER_PROPERTY)) 110 || StringUtils.isNotBlank(properties.get(DBNAME_PROPERTY))) { 111 throw new NuxeoException("Unable to initialize GridFS Binary Manager, properties " + SERVER_PROPERTY 112 + " and " + DBNAME_PROPERTY + " has been removed. Please configure a connection!"); 113 } 114 115 String namespace = properties.get(NAMESPACE); 116 String bucket = properties.get(BUCKET_PROPERTY); 117 if (StringUtils.isBlank(bucket)) { 118 if (StringUtils.isNotBlank(namespace)) { 119 bucket = blobProviderId + "." + namespace.trim(); 120 } else { 121 bucket = blobProviderId; 122 } 123 bucket = bucket + ".fs"; 124 } else if (StringUtils.isNotBlank(namespace)) { 125 bucket = bucket + "." + namespace.trim(); 126 } 127 128 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 129 MongoDatabase database = mongoService.getDatabase(BLOB_PROVIDER_CONNECTION_PREFIX + blobProviderId); 130 gridFSBucket = GridFSBuckets.create(database, bucket); 131 filesColl = database.getCollection(bucket + ".files"); 132 garbageCollector = new GridFSBinaryGarbageCollector(bucket); 133 } 134 135 @Override 136 public void close() { 137 } 138 139 @Override 140 public BinaryManager getBinaryManager() { 141 return this; 142 } 143 144 protected GridFSBucket getGridFSBucket() { 145 return gridFSBucket; 146 } 147 148 /** 149 * A binary backed by GridFS. 150 */ 151 protected static class GridFSBinary extends Binary { 152 153 private static final long serialVersionUID = 1L; 154 155 // transient to be Serializable 156 protected transient GridFSBinaryManager bm; 157 158 protected GridFSBinary(String digest, String blobProviderId, GridFSBinaryManager bm) { 159 super(digest, blobProviderId); 160 this.bm = bm; 161 } 162 163 // because the class is Serializable, re-acquire the BinaryManager if needed 164 protected GridFSBinaryManager getBinaryManager() { 165 if (bm == null) { 166 if (blobProviderId == null) { 167 throw new UnsupportedOperationException("Cannot find binary manager, no blob provider id"); 168 } 169 BlobManager blobManager = Framework.getService(BlobManager.class); 170 BlobProvider bp = blobManager.getBlobProvider(blobProviderId); 171 bm = (GridFSBinaryManager) bp.getBinaryManager(); 172 } 173 return bm; 174 } 175 176 @Override 177 public InputStream getStream() { 178 return getBinaryManager().getGridFSBucket().openDownloadStream(digest); 179 } 180 181 @Override 182 protected File recomputeFile() { 183 return null; // no file to recompute 184 } 185 } 186 187 @Override 188 public Binary getBinary(Blob blob) throws IOException { 189 if (!(blob instanceof FileBlob)) { 190 return super.getBinary(blob); // just open the stream and call getBinary(InputStream) 191 } 192 // we already have a file so can compute the length and digest efficiently 193 File file = blob.getFile(); 194 String digest; 195 try (InputStream in = new FileInputStream(file)) { 196 digest = DigestUtils.md5Hex(in); 197 } 198 // if the digest is not already known then save to GridFS 199 GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first(); 200 if (dbFile == null) { 201 try (InputStream in = new FileInputStream(file)) { 202 gridFSBucket.uploadFromStream(digest, in); 203 } 204 } 205 return new GridFSBinary(digest, blobProviderId, this); 206 } 207 208 @Override 209 protected Binary getBinary(InputStream in) throws IOException { 210 // save the file to GridFS 211 String inputName = "tmp-" + System.nanoTime(); 212 try (in; GridFSUploadStream uploadStream = gridFSBucket.openUploadStream(inputName)) { 213 BsonValue id = uploadStream.getId(); 214 String digest = storeAndDigest(in, uploadStream); 215 // if the digest is already known then reuse it instead 216 GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first(); 217 if (dbFile == null) { 218 // no existing file, set its filename as the digest 219 gridFSBucket.rename(id, digest); 220 } else { 221 // file already existed, no need for the temporary one 222 gridFSBucket.delete(id); 223 } 224 return new GridFSBinary(digest, blobProviderId, this); 225 } 226 } 227 228 @Override 229 public Binary getBinary(String digest) { 230 GridFSFile dbFile = gridFSBucket.find(Filters.eq(METADATA_PROPERTY_FILENAME, digest)).first(); 231 if (dbFile != null) { 232 return new GridFSBinary(digest, blobProviderId, this); 233 } 234 return null; 235 } 236 237 @Override 238 public Blob readBlob(BlobInfo blobInfo) throws IOException { 239 // just delegate to avoid copy/pasting code 240 return new BinaryBlobProvider(this).readBlob(blobInfo); 241 } 242 243 @Override 244 public String writeBlob(Blob blob) throws IOException { 245 // just delegate to avoid copy/pasting code 246 return new BinaryBlobProvider(this).writeBlob(blob); 247 } 248 249 @Override 250 public boolean supportsUserUpdate() { 251 return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE)); 252 } 253 254 @Override 255 public boolean isTransient() { 256 return Boolean.parseBoolean(properties.get(TRANSIENT)); 257 } 258 259 public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector { 260 261 protected final String bucket; 262 263 protected BinaryManagerStatus status; 264 265 protected volatile long startTime; 266 267 protected static final String MARK_KEY_PREFIX = "gc-mark-key-"; 268 269 protected String msKey; 270 271 public GridFSBinaryGarbageCollector(String bucket) { 272 this.bucket = bucket; 273 } 274 275 @Override 276 public String getId() { 277 return "gridfs:" + bucket; 278 } 279 280 @Override 281 public BinaryManagerStatus getStatus() { 282 return status; 283 } 284 285 @Override 286 public boolean isInProgress() { 287 return startTime != 0; 288 } 289 290 @Override 291 public void mark(String digest) { 292 Document dbFile = filesColl.findOneAndUpdate(Filters.eq(METADATA_PROPERTY_FILENAME, digest), 293 Updates.set(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), TRUE), 294 new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)); 295 if (dbFile != null) { 296 status.numBinaries += 1; 297 status.sizeBinaries += dbFile.getLong(METADATA_PROPERTY_LENGTH); 298 } 299 } 300 301 @Override 302 public void start() { 303 if (startTime != 0) { 304 throw new NuxeoException("Already started"); 305 } 306 startTime = System.currentTimeMillis(); 307 status = new BinaryManagerStatus(); 308 msKey = MARK_KEY_PREFIX + System.currentTimeMillis(); 309 } 310 311 @Override 312 public void stop(boolean delete) { 313 gridFSBucket.find(Filters.exists(String.format("%s.%s", METADATA_PROPERTY_METADATA, msKey), false)) // 314 .forEach(file -> { 315 status.numBinariesGC += 1; 316 status.sizeBinariesGC += file.getLength(); 317 if (delete) { 318 gridFSBucket.delete(file.getId()); 319 } 320 }); 321 startTime = 0; 322 } 323 } 324 325 @Override 326 public Map<String, String> getProperties() { 327 return properties; 328 } 329 330}