001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.mongodb; 021 022import static java.lang.Boolean.FALSE; 023import static java.lang.Boolean.TRUE; 024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE; 025 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStream; 030import java.util.List; 031import java.util.Map; 032 033import org.apache.commons.codec.digest.DigestUtils; 034import org.apache.commons.lang.StringUtils; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 038import org.nuxeo.ecm.core.blob.BlobManager; 039import org.nuxeo.ecm.core.blob.BlobProvider; 040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager; 041import org.nuxeo.ecm.core.blob.binary.Binary; 042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 044import org.nuxeo.ecm.core.blob.binary.BinaryManager; 045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 046import org.nuxeo.ecm.core.model.Document; 047 048import com.mongodb.BasicDBObject; 049import com.mongodb.DBObject; 050import com.mongodb.MongoClient; 051import com.mongodb.MongoClientURI; 052import com.mongodb.ServerAddress; 053import com.mongodb.gridfs.GridFS; 054import com.mongodb.gridfs.GridFSDBFile; 055import com.mongodb.gridfs.GridFSInputFile; 056 057/** 058 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS. 059 * <p> 060 * This implementation does not use local caching. 061 * <p> 062 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that 063 * exposes a {@link File}. 064 * 065 * @since 7.10 066 */ 067public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider { 068 069 public static final String SERVER_PROPERTY = "server"; 070 071 public static final String DBNAME_PROPERTY = "dbname"; 072 073 public static final String BUCKET_PROPERTY = "bucket"; 074 075 protected Map<String, String> properties; 076 077 protected MongoClient client; 078 079 protected GridFS gridFS; 080 081 @Override 082 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 083 super.initialize(blobProviderId, properties); 084 this.properties = properties; 085 String server = properties.get(SERVER_PROPERTY); 086 if (StringUtils.isBlank(server)) { 087 throw new NuxeoException("Missing server property in GridFS Binary Manager descriptor: " + blobProviderId); 088 } 089 String dbname = properties.get(DBNAME_PROPERTY); 090 if (StringUtils.isBlank(dbname)) { 091 throw new NuxeoException("Missing dbname property in GridFS Binary Manager descriptor: " + blobProviderId); 092 } 093 String bucket = properties.get(BUCKET_PROPERTY); 094 if (StringUtils.isBlank(bucket)) { 095 bucket = blobProviderId + ".fs"; 096 } 097 if (server.startsWith("mongodb://")) { 098 client = new MongoClient(new MongoClientURI(server)); 099 } else { 100 client = new MongoClient(new ServerAddress(server)); 101 } 102 gridFS = new GridFS(client.getDB(dbname), bucket); 103 garbageCollector = new GridFSBinaryGarbageCollector(); 104 } 105 106 @Override 107 public void close() { 108 if (client != null) { 109 client.close(); 110 client = null; 111 } 112 } 113 114 @Override 115 public BinaryManager getBinaryManager() { 116 return this; 117 } 118 119 public GridFS getGridFS() { 120 return gridFS; 121 } 122 123 /** 124 * A binary backed by GridFS. 125 */ 126 protected class GridFSBinary extends Binary { 127 128 private static final long serialVersionUID = 1L; 129 130 protected GridFSBinary(String digest, String blobProviderId) { 131 super(digest, blobProviderId); 132 } 133 134 @Override 135 public InputStream getStream() { 136 GridFSDBFile dbFile = gridFS.findOne(digest); 137 return dbFile == null ? null : dbFile.getInputStream(); 138 } 139 } 140 141 @Override 142 public Binary getBinary(Blob blob) throws IOException { 143 if (!(blob instanceof FileBlob)) { 144 return super.getBinary(blob); // just open the stream and call getBinary(InputStream) 145 } 146 // we already have a file so can compute the length and digest efficiently 147 File file = ((FileBlob) blob).getFile(); 148 String digest; 149 try (InputStream in = new FileInputStream(file)) { 150 digest = DigestUtils.md5Hex(in); 151 } 152 // if the digest is not already known then save to GridFS 153 GridFSDBFile dbFile = gridFS.findOne(digest); 154 if (dbFile == null) { 155 try (InputStream in = new FileInputStream(file)) { 156 GridFSInputFile inputFile = gridFS.createFile(in, digest); 157 inputFile.save(); 158 } 159 } 160 return new GridFSBinary(digest, blobProviderId); 161 } 162 163 @Override 164 protected Binary getBinary(InputStream in) throws IOException { 165 // save the file to GridFS 166 GridFSInputFile inputFile = gridFS.createFile(in, true); 167 inputFile.save(); 168 // now we know length and digest 169 String digest = inputFile.getMD5(); 170 // if the digest is already known then reuse it instead 171 GridFSDBFile dbFile = gridFS.findOne(digest); 172 if (dbFile == null) { 173 // no existing file, set its filename as the digest 174 inputFile.setFilename(digest); 175 inputFile.save(); 176 } else { 177 // file already existed, no need for the temporary one 178 gridFS.remove(inputFile); 179 } 180 return new GridFSBinary(digest, blobProviderId); 181 } 182 183 @Override 184 public Binary getBinary(String digest) { 185 GridFSDBFile dbFile = gridFS.findOne(digest); 186 if (dbFile != null) { 187 return new GridFSBinary(digest, blobProviderId); 188 } 189 return null; 190 } 191 192 @Override 193 public Blob readBlob(BlobManager.BlobInfo blobInfo) throws IOException { 194 // just delegate to avoid copy/pasting code 195 return new BinaryBlobProvider(this).readBlob(blobInfo); 196 } 197 198 @Override 199 public String writeBlob(Blob blob, Document doc) throws IOException { 200 // just delegate to avoid copy/pasting code 201 return new BinaryBlobProvider(this).writeBlob(blob, doc); 202 } 203 204 @Override 205 public boolean supportsUserUpdate() { 206 return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE)); 207 } 208 209 public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector { 210 211 protected BinaryManagerStatus status; 212 213 protected volatile long startTime; 214 215 protected static final String MARK_KEY_PREFIX = "gc-mark-key-"; 216 217 protected String msKey; 218 219 @Override 220 public String getId() { 221 return "gridfs:" + getGridFS().getBucketName(); 222 } 223 224 @Override 225 public BinaryManagerStatus getStatus() { 226 return status; 227 } 228 229 @Override 230 public boolean isInProgress() { 231 return startTime != 0; 232 } 233 234 @Override 235 public void mark(String digest) { 236 GridFSDBFile dbFile = gridFS.findOne(digest); 237 if (dbFile != null) { 238 dbFile.setMetaData(new BasicDBObject(msKey, TRUE)); 239 dbFile.save(); 240 status.numBinaries += 1; 241 status.sizeBinaries += dbFile.getLength(); 242 } 243 } 244 245 @Override 246 public void start() { 247 if (startTime != 0) { 248 throw new NuxeoException("Already started"); 249 } 250 startTime = System.currentTimeMillis(); 251 status = new BinaryManagerStatus(); 252 msKey = MARK_KEY_PREFIX + System.currentTimeMillis(); 253 } 254 255 @Override 256 public void stop(boolean delete) { 257 DBObject query = new BasicDBObject("metadata." + msKey, new BasicDBObject("$exists", FALSE)); 258 List<GridFSDBFile> files = gridFS.find(query); 259 for (GridFSDBFile file : files) { 260 status.numBinariesGC += 1; 261 status.sizeBinariesGC += file.getLength(); 262 if (delete) { 263 gridFS.remove(file); 264 } 265 } 266 startTime = 0; 267 } 268 } 269 270}