001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.File; 024import java.io.IOException; 025import java.io.Serializable; 026import java.nio.file.DirectoryStream; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.Paths; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.UUID; 036 037import org.apache.commons.codec.binary.Base64; 038import org.apache.commons.io.FileUtils; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.common.Environment; 042import org.nuxeo.ecm.core.api.Blob; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 045import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded; 046import org.nuxeo.ecm.core.transientstore.api.TransientStore; 047import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 048 049/** 050 * Base class for a {@link TransientStore} implementation. 051 * 052 * @since 7.2 053 */ 054public abstract class AbstractTransientStore implements TransientStore { 055 056 protected static final Log log = LogFactory.getLog(AbstractTransientStore.class); 057 058 protected TransientStoreConfig config; 059 060 protected File cacheDir; 061 062 @Override 063 public void init(TransientStoreConfig config) { 064 this.config = config; 065 066 // initialize caching directory 067 File transienStoreHome = new File(Environment.getDefault().getData(), "transientstores"); 068 File data = new File(transienStoreHome, config.getName()); 069 data.mkdirs(); 070 cacheDir = data.getAbsoluteFile(); 071 } 072 073 @Override 074 public abstract void shutdown(); 075 076 @Override 077 public abstract boolean exists(String key); 078 079 @Override 080 public abstract Set<String> keySet(); 081 082 @Override 083 public abstract void putParameter(String key, String parameter, Serializable value); 084 085 @Override 086 public abstract Serializable getParameter(String key, String parameter); 087 088 @Override 089 public abstract void putParameters(String key, Map<String, Serializable> parameters); 090 091 @Override 092 public abstract Map<String, Serializable> getParameters(String key); 093 094 @Override 095 public abstract List<Blob> getBlobs(String key); 096 097 @Override 098 public abstract long getSize(String key); 099 100 @Override 101 public abstract boolean isCompleted(String key); 102 103 @Override 104 public abstract void setCompleted(String key, boolean completed); 105 106 @Override 107 public abstract void remove(String key); 108 109 @Override 110 public abstract void release(String key); 111 112 /** 113 * Updates the total storage size and the storage size of the entry with the given {@code key} according to 114 * {@code sizeOfBlobs} and stores the blob information in this entry. 115 */ 116 protected abstract void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos); 117 118 /** 119 * Returns the size of the disk storage in bytes. 120 */ 121 public abstract long getStorageSize(); 122 123 /** 124 * Sets the size of the disk storage in bytes. 125 */ 126 protected abstract void setStorageSize(long newSize); 127 128 protected abstract long incrementStorageSize(long size); 129 130 protected abstract long decrementStorageSize(long size); 131 132 protected abstract void removeAllEntries(); 133 134 @Override 135 public void putBlobs(String key, List<Blob> blobs) { 136 if (config.getAbsoluteMaxSizeMB() < 0 || getStorageSize() < config.getAbsoluteMaxSizeMB() * (1024 * 1024)) { 137 // Store blobs on the file system 138 List<Map<String, String>> blobInfos = storeBlobs(key, blobs); 139 // Persist blob information in the store 140 persistBlobs(key, getSizeOfBlobs(blobs), blobInfos); 141 } else { 142 throw new MaximumTransientSpaceExceeded(); 143 } 144 } 145 146 protected List<Map<String, String>> storeBlobs(String key, List<Blob> blobs) { 147 if (blobs == null) { 148 return null; 149 } 150 // Store blobs on the file system and compute blob information 151 List<Map<String, String>> blobInfos = new ArrayList<>(); 152 for (Blob blob : blobs) { 153 Map<String, String> blobInfo = new HashMap<>(); 154 File cachingDir = getCachingDirectory(key); 155 String uuid = UUID.randomUUID().toString(); 156 File cachedFile = new File(cachingDir, uuid); 157 try { 158 if (blob instanceof FileBlob && ((FileBlob) blob).isTemporary()) { 159 ((FileBlob) blob).moveTo(cachedFile); 160 } else { 161 blob.transferTo(cachedFile); 162 } 163 } catch (IOException e) { 164 throw new NuxeoException(e); 165 } 166 Path cachedFileRelativePath = Paths.get(cachingDir.getName(), uuid); 167 blobInfo.put("file", cachedFileRelativePath.toString()); 168 // Redis doesn't support null values 169 if (blob.getFilename() != null) { 170 blobInfo.put("filename", blob.getFilename()); 171 } 172 if (blob.getEncoding() != null) { 173 blobInfo.put("encoding", blob.getEncoding()); 174 } 175 if (blob.getMimeType() != null) { 176 blobInfo.put("mimetype", blob.getMimeType()); 177 } 178 if (blob.getDigest() != null) { 179 blobInfo.put("digest", blob.getDigest()); 180 } 181 blobInfos.add(blobInfo); 182 } 183 log.debug("Stored blobs on the file system: " + blobInfos); 184 return blobInfos; 185 } 186 187 public File getCachingDirectory(String key) { 188 String cachingDirName = getCachingDirName(key); 189 try { 190 File cachingDir = new File(cacheDir.getCanonicalFile(), cachingDirName); 191 if (!cachingDir.getCanonicalPath().startsWith(cacheDir.getCanonicalPath())) { 192 throw new NuxeoException("Trying to traverse illegal path: " + cachingDir + " for key: " + key); 193 } 194 if (!cachingDir.exists()) { 195 cachingDir.mkdir(); 196 } 197 return cachingDir; 198 } catch (IOException e) { 199 throw new NuxeoException("Error when trying to access cache directory: " + cacheDir + "/" + cachingDirName 200 + " for key: " + key, e); 201 } 202 } 203 204 protected String getCachingDirName(String key) { 205 String dirName = Base64.encodeBase64String(key.getBytes()); 206 dirName = dirName.replaceAll("/", "_"); 207 return dirName; 208 } 209 210 protected long getSizeOfBlobs(List<Blob> blobs) { 211 int size = 0; 212 if (blobs != null) { 213 for (Blob blob : blobs) { 214 long blobLength = blob.getLength(); 215 if (blobLength > -1) { 216 size += blobLength; 217 } 218 } 219 } 220 return size; 221 } 222 223 protected List<Blob> loadBlobs(List<Map<String, String>> blobInfos) { 224 log.debug("Loading blobs from the file system: " + blobInfos); 225 List<Blob> blobs = new ArrayList<>(); 226 for (Map<String, String> info : blobInfos) { 227 File blobFile = new File(cacheDir, info.get("file")); 228 Blob blob = new FileBlob(blobFile); 229 blob.setEncoding(info.get("encoding")); 230 blob.setMimeType(info.get("mimetype")); 231 blob.setFilename(info.get("filename")); 232 blob.setDigest(info.get("digest")); 233 blobs.add(blob); 234 } 235 return blobs; 236 } 237 238 @Override 239 public int getStorageSizeMB() { 240 return (int) getStorageSize() / (1024 * 1024); 241 } 242 243 @Override 244 public void doGC() { 245 log.debug(String.format("Performing GC for TransientStore %s", config.getName())); 246 long newSize = 0; 247 try { 248 try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(cacheDir.getAbsolutePath()))) { 249 for (Path entry : stream) { 250 String key = getKeyCachingDirName(entry.getFileName().toString()); 251 try { 252 if (exists(key)) { 253 newSize += getFilePathSize(entry); 254 continue; 255 } 256 FileUtils.deleteDirectory(entry.toFile()); 257 } catch (IOException e) { 258 log.error("Error while performing GC", e); 259 } 260 } 261 } 262 } catch (IOException e) { 263 log.error("Error while performing GC", e); 264 } 265 setStorageSize(newSize); 266 } 267 268 protected String getKeyCachingDirName(String dir) { 269 String key = dir.replaceAll("_", "/"); 270 return new String(Base64.decodeBase64(key)); 271 } 272 273 protected long getFilePathSize(Path entry) { 274 long size = 0; 275 for (File file : entry.toFile().listFiles()) { 276 size += file.length(); 277 } 278 return size; 279 } 280 281 @Override 282 public void removeAll() { 283 log.debug("Removing all entries from TransientStore " + config.getName()); 284 removeAllEntries(); 285 doGC(); 286 } 287 288}