001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.File; 024import java.io.IOException; 025import java.io.Serializable; 026import java.nio.file.DirectoryStream; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.Paths; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.UUID; 036 037import org.apache.commons.codec.binary.Base64; 038import org.apache.commons.io.FileUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.common.Environment; 043import org.nuxeo.ecm.core.api.Blob; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 046import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded; 047import org.nuxeo.ecm.core.transientstore.api.TransientStore; 048import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 049 050/** 051 * Base class for a {@link TransientStore} implementation. 052 * 053 * @since 7.2 054 */ 055public abstract class AbstractTransientStore implements TransientStore { 056 057 protected static final Log log = LogFactory.getLog(AbstractTransientStore.class); 058 059 protected TransientStoreConfig config; 060 061 protected File cacheDir; 062 063 @Override 064 public void init(TransientStoreConfig config) { 065 this.config = config; 066 File data = getDataDir(config); 067 data.mkdirs(); 068 cacheDir = data.getAbsoluteFile(); 069 } 070 071 private File getDataDir(TransientStoreConfig config) { 072 String dataDirPath = config.getDataDir(); 073 if (StringUtils.isBlank(dataDirPath)) { 074 File transienStoreHome = new File(Environment.getDefault().getData(), "transientstores"); 075 return new File(transienStoreHome, config.getName()); 076 } else { 077 return new File(dataDirPath); 078 } 079 } 080 081 @Override 082 public abstract void shutdown(); 083 084 @Override 085 public abstract boolean exists(String key); 086 087 @Override 088 public abstract Set<String> keySet(); 089 090 @Override 091 public abstract void putParameter(String key, String parameter, Serializable value); 092 093 @Override 094 public abstract Serializable getParameter(String key, String parameter); 095 096 @Override 097 public abstract void putParameters(String key, Map<String, Serializable> parameters); 098 099 @Override 100 public abstract Map<String, Serializable> getParameters(String key); 101 102 @Override 103 public abstract List<Blob> getBlobs(String key); 104 105 @Override 106 public abstract long getSize(String key); 107 108 @Override 109 public abstract boolean isCompleted(String key); 110 111 @Override 112 public abstract void setCompleted(String key, boolean completed); 113 114 @Override 115 public abstract void remove(String key); 116 117 @Override 118 public abstract void release(String key); 119 120 /** 121 * Updates the total storage size and the storage size of the entry with the given {@code key} according to 122 * {@code sizeOfBlobs} and stores the blob information in this entry. 123 */ 124 protected abstract void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos); 125 126 /** 127 * Returns the size of the disk storage in bytes. 128 */ 129 public abstract long getStorageSize(); 130 131 /** 132 * Sets the size of the disk storage in bytes. 133 */ 134 protected abstract void setStorageSize(long newSize); 135 136 protected abstract long incrementStorageSize(long size); 137 138 protected abstract long decrementStorageSize(long size); 139 140 protected abstract void removeAllEntries(); 141 142 @Override 143 public void putBlobs(String key, List<Blob> blobs) { 144 if (config.getAbsoluteMaxSizeMB() < 0 || getStorageSize() < config.getAbsoluteMaxSizeMB() * (1024 * 1024)) { 145 // Store blobs on the file system 146 List<Map<String, String>> blobInfos = storeBlobs(key, blobs); 147 // Persist blob information in the store 148 persistBlobs(key, getSizeOfBlobs(blobs), blobInfos); 149 } else { 150 throw new MaximumTransientSpaceExceeded(); 151 } 152 } 153 154 protected List<Map<String, String>> storeBlobs(String key, List<Blob> blobs) { 155 if (blobs == null) { 156 return null; 157 } 158 // Store blobs on the file system and compute blob information 159 List<Map<String, String>> blobInfos = new ArrayList<>(); 160 for (Blob blob : blobs) { 161 Map<String, String> blobInfo = new HashMap<>(); 162 File cachingDir = getCachingDirectory(key); 163 String uuid = UUID.randomUUID().toString(); 164 File cachedFile = new File(cachingDir, uuid); 165 try { 166 if (blob instanceof FileBlob && ((FileBlob) blob).isTemporary()) { 167 ((FileBlob) blob).moveTo(cachedFile); 168 } else { 169 blob.transferTo(cachedFile); 170 } 171 } catch (IOException e) { 172 throw new NuxeoException(e); 173 } 174 Path cachedFileRelativePath = Paths.get(cachingDir.getName(), uuid); 175 blobInfo.put("file", cachedFileRelativePath.toString()); 176 // Redis doesn't support null values 177 if (blob.getFilename() != null) { 178 blobInfo.put("filename", blob.getFilename()); 179 } 180 if (blob.getEncoding() != null) { 181 blobInfo.put("encoding", blob.getEncoding()); 182 } 183 if (blob.getMimeType() != null) { 184 blobInfo.put("mimetype", blob.getMimeType()); 185 } 186 if (blob.getDigest() != null) { 187 blobInfo.put("digest", blob.getDigest()); 188 } 189 blobInfos.add(blobInfo); 190 } 191 log.debug("Stored blobs on the file system: " + blobInfos); 192 return blobInfos; 193 } 194 195 public File getCachingDirectory(String key) { 196 String cachingDirName = getCachingDirName(key); 197 try { 198 File cachingDir = new File(cacheDir.getCanonicalFile(), cachingDirName); 199 if (!cachingDir.getCanonicalPath().startsWith(cacheDir.getCanonicalPath())) { 200 throw new NuxeoException("Trying to traverse illegal path: " + cachingDir + " for key: " + key); 201 } 202 if (!cachingDir.exists()) { 203 cachingDir.mkdir(); 204 } 205 return cachingDir; 206 } catch (IOException e) { 207 throw new NuxeoException("Error when trying to access cache directory: " + cacheDir + "/" + cachingDirName 208 + " for key: " + key, e); 209 } 210 } 211 212 protected String getCachingDirName(String key) { 213 String dirName = Base64.encodeBase64String(key.getBytes()); 214 dirName = dirName.replaceAll("/", "_"); 215 return dirName; 216 } 217 218 protected long getSizeOfBlobs(List<Blob> blobs) { 219 int size = 0; 220 if (blobs != null) { 221 for (Blob blob : blobs) { 222 long blobLength = blob.getLength(); 223 if (blobLength > -1) { 224 size += blobLength; 225 } 226 } 227 } 228 return size; 229 } 230 231 protected List<Blob> loadBlobs(List<Map<String, String>> blobInfos) { 232 log.debug("Loading blobs from the file system: " + blobInfos); 233 List<Blob> blobs = new ArrayList<>(); 234 for (Map<String, String> info : blobInfos) { 235 File blobFile = new File(cacheDir, info.get("file")); 236 Blob blob = new FileBlob(blobFile); 237 blob.setEncoding(info.get("encoding")); 238 blob.setMimeType(info.get("mimetype")); 239 blob.setFilename(info.get("filename")); 240 blob.setDigest(info.get("digest")); 241 blobs.add(blob); 242 } 243 return blobs; 244 } 245 246 @Override 247 public int getStorageSizeMB() { 248 return (int) getStorageSize() / (1024 * 1024); 249 } 250 251 @Override 252 public void doGC() { 253 log.debug(String.format("Performing GC for TransientStore %s", config.getName())); 254 long newSize = 0; 255 try { 256 try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(cacheDir.getAbsolutePath()))) { 257 for (Path entry : stream) { 258 String key = getKeyCachingDirName(entry.getFileName().toString()); 259 if (exists(key)) { 260 newSize += getFilePathSize(entry); 261 continue; 262 } 263 FileUtils.deleteQuietly(entry.toFile()); 264 } 265 } 266 } catch (IOException e) { 267 log.error("Error while performing GC", e); 268 } 269 setStorageSize(newSize); 270 } 271 272 protected String getKeyCachingDirName(String dir) { 273 String key = dir.replaceAll("_", "/"); 274 return new String(Base64.decodeBase64(key)); 275 } 276 277 protected long getFilePathSize(Path entry) { 278 long size = 0; 279 for (File file : entry.toFile().listFiles()) { 280 size += file.length(); 281 } 282 return size; 283 } 284 285 @Override 286 public void removeAll() { 287 log.debug("Removing all entries from TransientStore " + config.getName()); 288 removeAllEntries(); 289 doGC(); 290 } 291 292 public File getCacheDir() { 293 return cacheDir; 294 } 295 296}