001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.mongodb; 021 022import static java.lang.Boolean.FALSE; 023import static java.lang.Boolean.TRUE; 024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE; 025 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStream; 030import java.util.List; 031import java.util.Map; 032 033import org.apache.commons.codec.digest.DigestUtils; 034import org.apache.commons.lang.StringUtils; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 038import org.nuxeo.ecm.core.blob.BlobManager; 039import org.nuxeo.ecm.core.blob.BlobProvider; 040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager; 041import org.nuxeo.ecm.core.blob.binary.Binary; 042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 044import org.nuxeo.ecm.core.blob.binary.BinaryManager; 045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 046import org.nuxeo.ecm.core.model.Document; 047 048import com.mongodb.BasicDBObject; 049import com.mongodb.DBObject; 050import com.mongodb.MongoClient; 051import com.mongodb.MongoClientURI; 052import com.mongodb.ServerAddress; 053import com.mongodb.gridfs.GridFS; 054import com.mongodb.gridfs.GridFSDBFile; 055import com.mongodb.gridfs.GridFSInputFile; 056 057/** 058 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS. 059 * <p> 060 * This implementation does not use local caching. 061 * <p> 062 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that 063 * exposes a {@link File}. 064 * 065 * @since 7.10 066 */ 067public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider { 068 069 public static final String SERVER_PROPERTY = "server"; 070 071 public static final String DBNAME_PROPERTY = "dbname"; 072 073 public static final String BUCKET_PROPERTY = "bucket"; 074 075 protected Map<String, String> properties; 076 077 protected MongoClient client; 078 079 protected GridFS gridFS; 080 081 @Override 082 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 083 super.initialize(blobProviderId, properties); 084 this.properties = properties; 085 String server = properties.get(SERVER_PROPERTY); 086 if (StringUtils.isBlank(server)) { 087 throw new NuxeoException("Missing server property in GridFS Binary Manager descriptor: " + blobProviderId); 088 } 089 String dbname = properties.get(DBNAME_PROPERTY); 090 if (StringUtils.isBlank(dbname)) { 091 throw new NuxeoException("Missing dbname property in GridFS Binary Manager descriptor: " + blobProviderId); 092 } 093 String bucket = properties.get(BUCKET_PROPERTY); 094 if (StringUtils.isBlank(bucket)) { 095 bucket = blobProviderId + ".fs"; 096 } 097 if (server.startsWith("mongodb://")) { 098 client = new MongoClient(new MongoClientURI(server)); 099 } else { 100 client = new MongoClient(new ServerAddress(server)); 101 } 102 gridFS = new GridFS(client.getDB(dbname), bucket); 103 garbageCollector = new GridFSBinaryGarbageCollector(); 104 } 105 106 @Override 107 public void close() { 108 if (client != null) { 109 client.close(); 110 client = null; 111 } 112 } 113 114 @Override 115 public BinaryManager getBinaryManager() { 116 return this; 117 } 118 119 public GridFS getGridFS() { 120 return gridFS; 121 } 122 123 /** 124 * A binary backed by GridFS. 125 */ 126 protected class GridFSBinary extends Binary { 127 128 private static final long serialVersionUID = 1L; 129 130 protected GridFSBinary(String digest, long length, String blobProviderId) { 131 super(digest, blobProviderId); 132 this.length = length; 133 } 134 135 @Override 136 public InputStream getStream() { 137 GridFSDBFile dbFile = gridFS.findOne(digest); 138 return dbFile == null ? null : dbFile.getInputStream(); 139 } 140 } 141 142 @Override 143 public Binary getBinary(Blob blob) throws IOException { 144 if (!(blob instanceof FileBlob)) { 145 return super.getBinary(blob); // just open the stream and call getBinary(InputStream) 146 } 147 // we already have a file so can compute the length and digest efficiently 148 File file = ((FileBlob) blob).getFile(); 149 long length = file.length(); 150 String digest; 151 try (InputStream in = new FileInputStream(file)) { 152 digest = DigestUtils.md5Hex(in); 153 } 154 // if the digest is not already known then save to GridFS 155 GridFSDBFile dbFile = gridFS.findOne(digest); 156 if (dbFile == null) { 157 try (InputStream in = new FileInputStream(file)) { 158 GridFSInputFile inputFile = gridFS.createFile(in, digest); 159 inputFile.save(); 160 } 161 } 162 return new GridFSBinary(digest, length, blobProviderId); 163 } 164 165 @Override 166 protected Binary getBinary(InputStream in) throws IOException { 167 // save the file to GridFS 168 GridFSInputFile inputFile = gridFS.createFile(in, true); 169 inputFile.save(); 170 // now we know length and digest 171 long length = inputFile.getLength(); 172 String digest = inputFile.getMD5(); 173 // if the digest is already known then reuse it instead 174 GridFSDBFile dbFile = gridFS.findOne(digest); 175 if (dbFile == null) { 176 // no existing file, set its filename as the digest 177 inputFile.setFilename(digest); 178 inputFile.save(); 179 } else { 180 // file already existed, no need for the temporary one 181 gridFS.remove(inputFile); 182 } 183 return new GridFSBinary(digest, length, blobProviderId); 184 } 185 186 @Override 187 public Binary getBinary(String digest) { 188 GridFSDBFile dbFile = gridFS.findOne(digest); 189 if (dbFile != null) { 190 return new GridFSBinary(digest, dbFile.getLength(), blobProviderId); 191 } 192 return null; 193 } 194 195 @Override 196 public Blob readBlob(BlobManager.BlobInfo blobInfo) throws IOException { 197 // just delegate to avoid copy/pasting code 198 return new BinaryBlobProvider(this).readBlob(blobInfo); 199 } 200 201 @Override 202 public String writeBlob(Blob blob, Document doc) throws IOException { 203 // just delegate to avoid copy/pasting code 204 return new BinaryBlobProvider(this).writeBlob(blob, doc); 205 } 206 207 @Override 208 public boolean supportsUserUpdate() { 209 return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE)); 210 } 211 212 public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector { 213 214 protected BinaryManagerStatus status; 215 216 protected volatile long startTime; 217 218 protected static final String MARK_KEY_PREFIX = "gc-mark-key-"; 219 220 protected String msKey; 221 222 @Override 223 public String getId() { 224 return "gridfs:" + getGridFS().getBucketName(); 225 } 226 227 @Override 228 public BinaryManagerStatus getStatus() { 229 return status; 230 } 231 232 @Override 233 public boolean isInProgress() { 234 return startTime != 0; 235 } 236 237 @Override 238 public void mark(String digest) { 239 GridFSDBFile dbFile = gridFS.findOne(digest); 240 if (dbFile != null) { 241 dbFile.setMetaData(new BasicDBObject(msKey, TRUE)); 242 dbFile.save(); 243 status.numBinaries += 1; 244 status.sizeBinaries += dbFile.getLength(); 245 } 246 } 247 248 @Override 249 public void start() { 250 if (startTime != 0) { 251 throw new NuxeoException("Already started"); 252 } 253 startTime = System.currentTimeMillis(); 254 status = new BinaryManagerStatus(); 255 msKey = MARK_KEY_PREFIX + System.currentTimeMillis(); 256 } 257 258 @Override 259 public void stop(boolean delete) { 260 DBObject query = new BasicDBObject("metadata." + msKey, new BasicDBObject("$exists", FALSE)); 261 List<GridFSDBFile> files = gridFS.find(query); 262 for (GridFSDBFile file : files) { 263 status.numBinariesGC += 1; 264 status.sizeBinariesGC += file.getLength(); 265 if (delete) { 266 gridFS.remove(file); 267 } 268 } 269 startTime = 0; 270 } 271 } 272 273}