001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; 022import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.ALLOW_BYTE_RANGE; 023 024import java.io.ByteArrayInputStream; 025import java.io.ByteArrayOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.Iterator; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.Random; 034import java.util.concurrent.ConcurrentHashMap; 035 036import org.apache.commons.io.FileUtils; 037import org.apache.commons.io.IOUtils; 038import org.apache.commons.lang3.mutable.MutableObject; 039import org.apache.logging.log4j.LogManager; 040import org.apache.logging.log4j.Logger; 041import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 042import org.nuxeo.runtime.api.Framework; 043 044/** 045 * Blob storage in memory, mostly for unit tests. 046 * 047 * @since 11.1 048 */ 049public class InMemoryBlobStore extends AbstractBlobStore { 050 051 private static final Logger log = LogManager.getLogger(InMemoryBlobStore.class); 052 053 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 054 055 protected Map<String, byte[]> map = new ConcurrentHashMap<>(); 056 057 protected Map<String, Boolean> legalHold = new ConcurrentHashMap<>(); 058 059 protected final InMemoryBlobGarbageCollector gc = new InMemoryBlobGarbageCollector(); 060 061 // used by unit tests to emulate absence of stream, to test copy 062 protected final boolean emulateNoStream; 063 064 // used by unit tests to emulate presence of a local file, to test copy 065 protected final boolean emulateLocalFile; 066 067 // used by unit tests to emulate versioning 068 protected final boolean emulateVersioning; 069 070 protected final boolean allowByteRange; 071 072 public InMemoryBlobStore(String name, KeyStrategy keyStrategy) { 073 this(name, null, keyStrategy, false, false); 074 } 075 076 public InMemoryBlobStore(String name, PropertyBasedConfiguration config, KeyStrategy keyStrategy) { 077 this(name, config, keyStrategy, false, false); 078 } 079 080 protected InMemoryBlobStore(String name, KeyStrategy keyStrategy, boolean emulateNoStream, 081 boolean emulateLocalFile) { 082 this(name, null, keyStrategy, emulateNoStream, emulateLocalFile); 083 } 084 085 protected InMemoryBlobStore(String name, PropertyBasedConfiguration config, KeyStrategy keyStrategy, 086 boolean emulateNoStream, boolean emulateLocalFile) { 087 super(name, keyStrategy); 088 this.emulateNoStream = emulateNoStream; 089 this.emulateLocalFile = emulateLocalFile; 090 emulateVersioning = config != null && config.getBooleanProperty("emulateVersioning"); 091 allowByteRange = config != null && config.getBooleanProperty(ALLOW_BYTE_RANGE); 092 } 093 094 @Override 095 public boolean hasVersioning() { 096 return emulateVersioning; 097 } 098 099 @Override 100 public String writeBlob(BlobWriteContext blobWriteContext) throws IOException { 101 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 102 transfer(blobWriteContext, baos); 103 String key = blobWriteContext.getKey(); // may depend on WriteObserver, for example for digests 104 if (hasVersioning()) { 105 key += "@" + RANDOM.nextLong(); 106 } 107 map.put(key, baos.toByteArray()); 108 return key; 109 } 110 111 @Override 112 public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOException { 113 String key = blobUpdateContext.key; 114 if (blobUpdateContext.updateLegalHold != null) { 115 boolean hold = blobUpdateContext.updateLegalHold.hold; 116 legalHold.put(key, hold); 117 } 118 // other updates not implemented for in-memory blob store 119 } 120 121 @Override 122 public boolean copyBlobIsOptimized(BlobStore sourceStore) { 123 // this allows us to test "optimized copy" code paths 124 return sourceStore instanceof InMemoryBlobStore; 125 } 126 127 @Override 128 public boolean copyBlob(String key, BlobStore sourceStore, String sourceKey, boolean atomicMove) 129 throws IOException { 130 boolean found = copyBlob(key, sourceStore, sourceKey); 131 if (found && atomicMove) { 132 sourceStore.deleteBlob(sourceKey); 133 } 134 return found; 135 } 136 137 protected boolean copyBlob(String key, BlobStore sourceStore, String sourceKey) throws IOException { 138 // try with a stream 139 OptionalOrUnknown<InputStream> streamOpt = sourceStore.getStream(sourceKey); 140 if (streamOpt.isKnown()) { 141 if (!streamOpt.isPresent()) { 142 return false; 143 } 144 byte[] bytes; 145 try (InputStream stream = streamOpt.get()) { 146 bytes = IOUtils.toByteArray(stream); 147 } 148 map.put(key, bytes); 149 return true; 150 } 151 // try with a local file 152 OptionalOrUnknown<Path> fileOpt = sourceStore.getFile(sourceKey); 153 if (fileOpt.isKnown()) { 154 if (!fileOpt.isPresent()) { 155 return false; 156 } 157 byte[] bytes = Files.readAllBytes(fileOpt.get()); 158 map.put(key, bytes); 159 return true; 160 } 161 // else use readBlobTo 162 Path tmp = Files.createTempFile("bin_", ".tmp"); 163 try { 164 boolean found = sourceStore.readBlob(sourceKey, tmp); 165 if (!found) { 166 return false; 167 } 168 byte[] bytes = Files.readAllBytes(tmp); 169 map.put(key, bytes); 170 return true; 171 } finally { 172 try { 173 Files.delete(tmp); 174 } catch (IOException e) { 175 log.warn(e, e); 176 } 177 } 178 } 179 180 protected ByteArrayInputStream getStreamInternal(String key) { 181 ByteRange byteRange; 182 if (allowByteRange) { 183 MutableObject<String> keyHolder = new MutableObject<>(key); 184 byteRange = getByteRangeFromKey(keyHolder); 185 key = keyHolder.getValue(); 186 } else { 187 byteRange = null; 188 } 189 byte[] bytes = map.get(key); 190 if (bytes == null) { 191 return null; 192 } else if (byteRange == null) { 193 return new ByteArrayInputStream(bytes); 194 } else { 195 return new ByteArrayInputStream(bytes, (int) byteRange.getStart(), (int) byteRange.getLength()); 196 } 197 } 198 199 @Override 200 public OptionalOrUnknown<Path> getFile(String key) { 201 if (!emulateLocalFile) { 202 return OptionalOrUnknown.unknown(); 203 } 204 InputStream stream = getStreamInternal(key); 205 if (stream == null) { 206 return OptionalOrUnknown.missing(); 207 } 208 try { 209 Path tmp = Files.createTempFile("tmp_", ".tmp"); 210 Framework.trackFile(tmp.toFile(), tmp); 211 FileUtils.copyToFile(stream, tmp.toFile()); 212 return OptionalOrUnknown.of(tmp); 213 } catch (IOException e) { 214 throw new UnsupportedOperationException(); 215 } 216 } 217 218 @Override 219 public OptionalOrUnknown<InputStream> getStream(String key) throws IOException { 220 if (emulateNoStream) { 221 return OptionalOrUnknown.unknown(); 222 } 223 InputStream stream = getStreamInternal(key); 224 if (stream == null) { 225 return OptionalOrUnknown.missing(); 226 } 227 return OptionalOrUnknown.of(stream); 228 } 229 230 @Override 231 public boolean readBlob(String key, Path dest) throws IOException { 232 InputStream stream = getStreamInternal(key); 233 if (stream == null) { 234 return false; 235 } 236 Files.copy(stream, dest, REPLACE_EXISTING); 237 return true; 238 } 239 240 @Override 241 public void deleteBlob(String key) { 242 map.remove(key); 243 legalHold.remove(key); 244 } 245 246 @Override 247 public BinaryGarbageCollector getBinaryGarbageCollector() { 248 return gc; 249 } 250 251 public class InMemoryBlobGarbageCollector extends AbstractBlobGarbageCollector { 252 253 @Override 254 public String getId() { 255 return toString(); 256 } 257 258 @Override 259 public void removeUnmarkedBlobsAndUpdateStatus(boolean delete) { 260 for (Iterator<Entry<String, byte[]>> it = map.entrySet().iterator(); it.hasNext();) { 261 Entry<String, byte[]> es = it.next(); 262 String key = es.getKey(); 263 byte[] bytes = es.getValue(); 264 if (marked.contains(key)) { 265 status.sizeBinaries += bytes.length; 266 status.numBinaries++; 267 } else { 268 status.sizeBinariesGC += bytes.length; 269 status.numBinariesGC++; 270 if (delete) { 271 it.remove(); 272 legalHold.remove(key); 273 } 274 } 275 } 276 } 277 } 278 279}