001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.nio.file.Files; 028import java.nio.file.Path; 029 030import org.nuxeo.common.file.FileCache; 031import org.nuxeo.common.file.LRUFileCache; 032import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 033import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 034import org.nuxeo.runtime.trackers.files.FileEventTracker; 035 036/** 037 * Blob store wrapper that caches blobs locally because fetching them may be expensive. 038 * 039 * @since 11.1 040 */ 041public class CachingBlobStore extends AbstractBlobStore { 042 043 protected final BlobStore store; 044 045 // public for tests 046 public final Path cacheDir; 047 048 protected final FileCache fileCache; 049 050 protected final PathStrategyFlat tmpPathStrategy; 051 052 protected final BlobStore tmpStore; 053 054 protected final BinaryGarbageCollector gc; 055 056 public CachingBlobStore(String name, BlobStore store, CachingConfiguration config) { 057 super(name, store.getKeyStrategy()); 058 this.store = store; 059 cacheDir = config.dir; 060 fileCache = new LRUFileCache(cacheDir.toFile(), config.maxSize, config.maxCount, config.minAge); 061 // be sure FileTracker won't steal our files 062 FileEventTracker.registerProtectedPath(cacheDir.toAbsolutePath().toString()); 063 tmpPathStrategy = new PathStrategyFlat(cacheDir); 064 tmpStore = new LocalBlobStore(name, store.getKeyStrategy(), tmpPathStrategy); // view of the LRUFileCache tmp dir 065 gc = new CachingBinaryGarbageCollector(store.getBinaryGarbageCollector()); 066 } 067 068 @Override 069 public boolean hasVersioning() { 070 return store.hasVersioning(); 071 } 072 073 @Override 074 public BlobStore unwrap() { 075 return store.unwrap(); 076 } 077 078 @Override 079 public String writeBlob(BlobWriteContext blobWriteContext) throws IOException { 080 // write the blob to a temporary file 081 String tmpKey = tmpStore.writeBlob(blobWriteContext.copyWithKey(randomString())); 082 // get the final key 083 String key = blobWriteContext.getKey(); // may depend on write observer, for example for digests 084 085 // when using deduplication, check if it's in the cache already 086 if (blobWriteContext.useDeDuplication()) { 087 if (fileCache.getFile(key) != null) { 088 logTrace("<--", "exists"); 089 logTrace("hnote right: " + key); 090 // delete tmp file, not needed anymore 091 tmpStore.deleteBlob(tmpKey); 092 return key; 093 } else { 094 logTrace("<--", "missing"); 095 logTrace("hnote right: " + key); 096 // fall through 097 } 098 } 099 100 // we now have a file for this blob 101 Path tmp = tmpPathStrategy.getPathForKey(tmpKey); 102 blobWriteContext.setFile(tmp); 103 // send the file to storage 104 String returnedKey = store.writeBlob(blobWriteContext.copyWithNoWriteObserverAndKey(key)); 105 // register the file in the file cache using its actual key 106 logTrace(name, "-->", name, "rename"); 107 logTrace("hnote right of " + name + ": " + returnedKey); 108 fileCache.putFile(returnedKey, tmp.toFile()); 109 return returnedKey; 110 } 111 112 @Override 113 public boolean copyBlob(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove) 114 throws IOException { 115 CachingBlobStore cachingSourceStore = sourceStore instanceof CachingBlobStore ? (CachingBlobStore) sourceStore 116 : null; 117 if ((!atomicMove || copyBlobIsOptimized(sourceStore)) && cachingSourceStore != null) { 118 // if it's a copy and the original cached file won't be touched 119 // else optimized move won't need the cache, so we can move the cache ahead of time 120 tmpStore.copyBlob(key, cachingSourceStore.tmpStore, sourceKey, atomicMove); 121 } 122 boolean found = store.copyBlob(key, sourceStore, sourceKey, atomicMove); 123 if (found && atomicMove && cachingSourceStore != null) { 124 // clear source cache 125 cachingSourceStore.tmpStore.deleteBlob(sourceKey); 126 } 127 return found; 128 } 129 130 @Override 131 public OptionalOrUnknown<Path> getFile(String key) { 132 File cachedFile = fileCache.getFile(key); 133 if (cachedFile == null) { 134 logTrace("<--", "missing"); 135 logTrace("hnote right: " + key); 136 return OptionalOrUnknown.missing(); 137 } else { 138 logTrace("<-", "read " + cachedFile.length() + " bytes"); 139 logTrace("hnote right: " + key); 140 return OptionalOrUnknown.of(cachedFile.toPath()); 141 } 142 } 143 144 @Override 145 public OptionalOrUnknown<InputStream> getStream(String key) throws IOException { 146 File cachedFile = fileCache.getFile(key); 147 if (cachedFile == null) { 148 logTrace("<--", "missing"); 149 logTrace("hnote right: " + key); 150 // fetch file from storage into the cache 151 // go through a tmp file for atomicity 152 String tmpKey = randomString(); 153 boolean found = tmpStore.copyBlob(tmpKey, store, key, false); 154 if (!found) { 155 return OptionalOrUnknown.missing(); 156 } 157 File tmp = tmpPathStrategy.getPathForKey(tmpKey).toFile(); 158 logTrace("->", "write " + tmp.length() + " bytes"); 159 logTrace("hnote right: " + key); 160 cachedFile = fileCache.putFile(key, tmp); 161 } else { 162 logTrace("<-", "read " + cachedFile.length() + " bytes"); 163 logTrace("hnote right: " + key); 164 } 165 return OptionalOrUnknown.of(new FileInputStream(cachedFile)); 166 } 167 168 @Override 169 public boolean readBlob(String key, Path dest) throws IOException { 170 OptionalOrUnknown<InputStream> streamOpt = getStream(key); 171 if (!streamOpt.isPresent()) { 172 return false; 173 } 174 try (InputStream stream = streamOpt.get()) { 175 Files.copy(stream, dest, REPLACE_EXISTING); 176 return true; 177 } 178 } 179 180 @Override 181 public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOException { 182 store.writeBlobProperties(blobUpdateContext); 183 } 184 185 @Override 186 public void deleteBlob(String key) { 187 tmpStore.deleteBlob(key); // TODO add API to FileCache to do this cleanly 188 store.deleteBlob(key); 189 } 190 191 @Override 192 public BinaryGarbageCollector getBinaryGarbageCollector() { 193 return gc; 194 } 195 196 /** 197 * Garbage collector that delegates to the underlying one, but purges the cache after an actual GC is done. 198 */ 199 public class CachingBinaryGarbageCollector implements BinaryGarbageCollector { 200 201 protected final BinaryGarbageCollector delegate; 202 203 public CachingBinaryGarbageCollector(BinaryGarbageCollector delegate) { 204 this.delegate = delegate; 205 } 206 207 @Override 208 public String getId() { 209 return delegate.getId(); 210 } 211 212 @Override 213 public void start() { 214 delegate.start(); 215 } 216 217 @Override 218 public void mark(String key) { 219 delegate.mark(key); 220 } 221 222 @Override 223 public void stop(boolean delete) { 224 delegate.stop(delete); 225 if (delete) { 226 logTrace("->", "clear"); 227 fileCache.clear(); 228 } 229 } 230 231 @Override 232 public BinaryManagerStatus getStatus() { 233 return delegate.getStatus(); 234 } 235 236 @Override 237 public boolean isInProgress() { 238 return delegate.isInProgress(); 239 } 240 } 241}