001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.mongodb; 021 022import static java.lang.Boolean.FALSE; 023import static java.lang.Boolean.TRUE; 024import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.PREVENT_USER_UPDATE; 025 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStream; 030import java.util.List; 031import java.util.Map; 032 033import org.apache.commons.codec.digest.DigestUtils; 034import org.apache.commons.lang.StringUtils; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 038import org.nuxeo.ecm.core.blob.BlobInfo; 039import org.nuxeo.ecm.core.blob.BlobProvider; 040import org.nuxeo.ecm.core.blob.binary.AbstractBinaryManager; 041import org.nuxeo.ecm.core.blob.binary.Binary; 042import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 043import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 044import org.nuxeo.ecm.core.blob.binary.BinaryManager; 045import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 046 047import com.mongodb.BasicDBObject; 048import com.mongodb.DBObject; 049import com.mongodb.MongoClient; 050import com.mongodb.MongoClientURI; 051import com.mongodb.ServerAddress; 052import com.mongodb.gridfs.GridFS; 053import com.mongodb.gridfs.GridFSDBFile; 054import com.mongodb.gridfs.GridFSInputFile; 055 056/** 057 * Implements the {@link BinaryManager} and {@link BlobProvider} interface using MongoDB GridFS. 058 * <p> 059 * This implementation does not use local caching. 060 * <p> 061 * This implementation may not always be ideal regarding streaming because of the usage of {@link Binary} interface that 062 * exposes a {@link File}. 063 * 064 * @since 7.10 065 */ 066public class GridFSBinaryManager extends AbstractBinaryManager implements BlobProvider { 067 068 public static final String SERVER_PROPERTY = "server"; 069 070 public static final String DBNAME_PROPERTY = "dbname"; 071 072 public static final String BUCKET_PROPERTY = "bucket"; 073 074 protected Map<String, String> properties; 075 076 protected MongoClient client; 077 078 protected GridFS gridFS; 079 080 @Override 081 public void initialize(String blobProviderId, Map<String, String> properties) throws IOException { 082 super.initialize(blobProviderId, properties); 083 this.properties = properties; 084 String server = properties.get(SERVER_PROPERTY); 085 if (StringUtils.isBlank(server)) { 086 throw new NuxeoException("Missing server property in GridFS Binary Manager descriptor: " + blobProviderId); 087 } 088 String dbname = properties.get(DBNAME_PROPERTY); 089 if (StringUtils.isBlank(dbname)) { 090 throw new NuxeoException("Missing dbname property in GridFS Binary Manager descriptor: " + blobProviderId); 091 } 092 String bucket = properties.get(BUCKET_PROPERTY); 093 if (StringUtils.isBlank(bucket)) { 094 bucket = blobProviderId + ".fs"; 095 } 096 if (server.startsWith("mongodb://")) { 097 client = new MongoClient(new MongoClientURI(server)); 098 } else { 099 client = new MongoClient(new ServerAddress(server)); 100 } 101 gridFS = new GridFS(client.getDB(dbname), bucket); 102 garbageCollector = new GridFSBinaryGarbageCollector(); 103 } 104 105 @Override 106 public void close() { 107 if (client != null) { 108 client.close(); 109 client = null; 110 } 111 } 112 113 @Override 114 public BinaryManager getBinaryManager() { 115 return this; 116 } 117 118 public GridFS getGridFS() { 119 return gridFS; 120 } 121 122 /** 123 * A binary backed by GridFS. 124 */ 125 protected class GridFSBinary extends Binary { 126 127 private static final long serialVersionUID = 1L; 128 129 protected GridFSBinary(String digest, String blobProviderId) { 130 super(digest, blobProviderId); 131 } 132 133 @Override 134 public InputStream getStream() { 135 GridFSDBFile dbFile = gridFS.findOne(digest); 136 return dbFile == null ? null : dbFile.getInputStream(); 137 } 138 } 139 140 @Override 141 public Binary getBinary(Blob blob) throws IOException { 142 if (!(blob instanceof FileBlob)) { 143 return super.getBinary(blob); // just open the stream and call getBinary(InputStream) 144 } 145 // we already have a file so can compute the length and digest efficiently 146 File file = ((FileBlob) blob).getFile(); 147 String digest; 148 try (InputStream in = new FileInputStream(file)) { 149 digest = DigestUtils.md5Hex(in); 150 } 151 // if the digest is not already known then save to GridFS 152 GridFSDBFile dbFile = gridFS.findOne(digest); 153 if (dbFile == null) { 154 try (InputStream in = new FileInputStream(file)) { 155 GridFSInputFile inputFile = gridFS.createFile(in, digest); 156 inputFile.save(); 157 } 158 } 159 return new GridFSBinary(digest, blobProviderId); 160 } 161 162 @Override 163 protected Binary getBinary(InputStream in) throws IOException { 164 // save the file to GridFS 165 GridFSInputFile inputFile = gridFS.createFile(in, true); 166 inputFile.save(); 167 // now we know length and digest 168 String digest = inputFile.getMD5(); 169 // if the digest is already known then reuse it instead 170 GridFSDBFile dbFile = gridFS.findOne(digest); 171 if (dbFile == null) { 172 // no existing file, set its filename as the digest 173 inputFile.setFilename(digest); 174 inputFile.save(); 175 } else { 176 // file already existed, no need for the temporary one 177 gridFS.remove(inputFile); 178 } 179 return new GridFSBinary(digest, blobProviderId); 180 } 181 182 @Override 183 public Binary getBinary(String digest) { 184 GridFSDBFile dbFile = gridFS.findOne(digest); 185 if (dbFile != null) { 186 return new GridFSBinary(digest, blobProviderId); 187 } 188 return null; 189 } 190 191 @Override 192 public Blob readBlob(BlobInfo blobInfo) throws IOException { 193 // just delegate to avoid copy/pasting code 194 return new BinaryBlobProvider(this).readBlob(blobInfo); 195 } 196 197 @Override 198 public String writeBlob(Blob blob) throws IOException { 199 // just delegate to avoid copy/pasting code 200 return new BinaryBlobProvider(this).writeBlob(blob); 201 } 202 203 @Override 204 public boolean supportsUserUpdate() { 205 return !Boolean.parseBoolean(properties.get(PREVENT_USER_UPDATE)); 206 } 207 208 public class GridFSBinaryGarbageCollector implements BinaryGarbageCollector { 209 210 protected BinaryManagerStatus status; 211 212 protected volatile long startTime; 213 214 protected static final String MARK_KEY_PREFIX = "gc-mark-key-"; 215 216 protected String msKey; 217 218 @Override 219 public String getId() { 220 return "gridfs:" + getGridFS().getBucketName(); 221 } 222 223 @Override 224 public BinaryManagerStatus getStatus() { 225 return status; 226 } 227 228 @Override 229 public boolean isInProgress() { 230 return startTime != 0; 231 } 232 233 @Override 234 public void mark(String digest) { 235 GridFSDBFile dbFile = gridFS.findOne(digest); 236 if (dbFile != null) { 237 dbFile.setMetaData(new BasicDBObject(msKey, TRUE)); 238 dbFile.save(); 239 status.numBinaries += 1; 240 status.sizeBinaries += dbFile.getLength(); 241 } 242 } 243 244 @Override 245 public void start() { 246 if (startTime != 0) { 247 throw new NuxeoException("Already started"); 248 } 249 startTime = System.currentTimeMillis(); 250 status = new BinaryManagerStatus(); 251 msKey = MARK_KEY_PREFIX + System.currentTimeMillis(); 252 } 253 254 @Override 255 public void stop(boolean delete) { 256 DBObject query = new BasicDBObject("metadata." + msKey, new BasicDBObject("$exists", FALSE)); 257 List<GridFSDBFile> files = gridFS.find(query); 258 for (GridFSDBFile file : files) { 259 status.numBinariesGC += 1; 260 status.sizeBinariesGC += file.getLength(); 261 if (delete) { 262 gridFS.remove(file); 263 } 264 } 265 startTime = 0; 266 } 267 } 268 269}