001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.File;
024import java.io.IOException;
025import java.io.Serializable;
026import java.nio.file.DirectoryStream;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.Paths;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.UUID;
036
037import org.apache.commons.codec.binary.Base64;
038import org.apache.commons.io.FileUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.common.Environment;
043import org.nuxeo.ecm.core.api.Blob;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
046import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
047import org.nuxeo.ecm.core.transientstore.api.TransientStore;
048import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
049
050/**
051 * Base class for a {@link TransientStore} implementation.
052 *
053 * @since 7.2
054 */
055public abstract class AbstractTransientStore implements TransientStore {
056
057    protected static final Log log = LogFactory.getLog(AbstractTransientStore.class);
058
059    protected TransientStoreConfig config;
060
061    protected File cacheDir;
062
063    @Override
064    public void init(TransientStoreConfig config) {
065        this.config = config;
066        File data = getDataDir(config);
067        data.mkdirs();
068        cacheDir = data.getAbsoluteFile();
069    }
070
071    private File getDataDir(TransientStoreConfig config) {
072        String dataDirPath = config.getDataDir();
073        if (StringUtils.isBlank(dataDirPath)) {
074            File transienStoreHome = new File(Environment.getDefault().getData(), "transientstores");
075            return new File(transienStoreHome, config.getName());
076        } else {
077            return new File(dataDirPath);
078        }
079    }
080
081    @Override
082    public abstract void shutdown();
083
084    @Override
085    public abstract boolean exists(String key);
086
087    @Override
088    public abstract Set<String> keySet();
089
090    @Override
091    public abstract void putParameter(String key, String parameter, Serializable value);
092
093    @Override
094    public abstract Serializable getParameter(String key, String parameter);
095
096    @Override
097    public abstract void putParameters(String key, Map<String, Serializable> parameters);
098
099    @Override
100    public abstract Map<String, Serializable> getParameters(String key);
101
102    @Override
103    public abstract List<Blob> getBlobs(String key);
104
105    @Override
106    public abstract long getSize(String key);
107
108    @Override
109    public abstract boolean isCompleted(String key);
110
111    @Override
112    public abstract void setCompleted(String key, boolean completed);
113
114    @Override
115    public abstract void remove(String key);
116
117    @Override
118    public abstract void release(String key);
119
120    /**
121     * Updates the total storage size and the storage size of the entry with the given {@code key} according to
122     * {@code sizeOfBlobs} and stores the blob information in this entry.
123     */
124    protected abstract void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos);
125
126    /**
127     * Returns the size of the disk storage in bytes.
128     */
129    public abstract long getStorageSize();
130
131    /**
132     * Sets the size of the disk storage in bytes.
133     */
134    protected abstract void setStorageSize(long newSize);
135
136    protected abstract long incrementStorageSize(long size);
137
138    protected abstract long decrementStorageSize(long size);
139
140    protected abstract void removeAllEntries();
141
142    @Override
143    public void putBlobs(String key, List<Blob> blobs) {
144        if (config.getAbsoluteMaxSizeMB() < 0 || getStorageSize() < config.getAbsoluteMaxSizeMB() * (1024 * 1024)) {
145            // Store blobs on the file system
146            List<Map<String, String>> blobInfos = storeBlobs(key, blobs);
147            // Persist blob information in the store
148            persistBlobs(key, getSizeOfBlobs(blobs), blobInfos);
149        } else {
150            throw new MaximumTransientSpaceExceeded();
151        }
152    }
153
154    protected List<Map<String, String>> storeBlobs(String key, List<Blob> blobs) {
155        if (blobs == null) {
156            return null;
157        }
158        // Store blobs on the file system and compute blob information
159        List<Map<String, String>> blobInfos = new ArrayList<>();
160        for (Blob blob : blobs) {
161            Map<String, String> blobInfo = new HashMap<>();
162            File cachingDir = getCachingDirectory(key);
163            String uuid = UUID.randomUUID().toString();
164            File cachedFile = new File(cachingDir, uuid);
165            try {
166                if (blob instanceof FileBlob && ((FileBlob) blob).isTemporary()) {
167                    ((FileBlob) blob).moveTo(cachedFile);
168                } else {
169                    blob.transferTo(cachedFile);
170                }
171            } catch (IOException e) {
172                throw new NuxeoException(e);
173            }
174            Path cachedFileRelativePath = Paths.get(cachingDir.getName(), uuid);
175            blobInfo.put("file", cachedFileRelativePath.toString());
176            // Redis doesn't support null values
177            if (blob.getFilename() != null) {
178                blobInfo.put("filename", blob.getFilename());
179            }
180            if (blob.getEncoding() != null) {
181                blobInfo.put("encoding", blob.getEncoding());
182            }
183            if (blob.getMimeType() != null) {
184                blobInfo.put("mimetype", blob.getMimeType());
185            }
186            if (blob.getDigest() != null) {
187                blobInfo.put("digest", blob.getDigest());
188            }
189            blobInfos.add(blobInfo);
190        }
191        log.debug("Stored blobs on the file system: " + blobInfos);
192        return blobInfos;
193    }
194
195    public File getCachingDirectory(String key) {
196        String cachingDirName = getCachingDirName(key);
197        try {
198            File cachingDir = new File(cacheDir.getCanonicalFile(), cachingDirName);
199            if (!cachingDir.getCanonicalPath().startsWith(cacheDir.getCanonicalPath())) {
200                throw new NuxeoException("Trying to traverse illegal path: " + cachingDir + " for key: " + key);
201            }
202            if (!cachingDir.exists()) {
203                cachingDir.mkdir();
204            }
205            return cachingDir;
206        } catch (IOException e) {
207            throw new NuxeoException("Error when trying to access cache directory: " + cacheDir + "/" + cachingDirName
208                    + " for key: " + key, e);
209        }
210    }
211
212    protected String getCachingDirName(String key) {
213        String dirName = Base64.encodeBase64String(key.getBytes());
214        dirName = dirName.replaceAll("/", "_");
215        return dirName;
216    }
217
218    protected long getSizeOfBlobs(List<Blob> blobs) {
219        int size = 0;
220        if (blobs != null) {
221            for (Blob blob : blobs) {
222                long blobLength = blob.getLength();
223                if (blobLength > -1) {
224                    size += blobLength;
225                }
226            }
227        }
228        return size;
229    }
230
231    protected List<Blob> loadBlobs(List<Map<String, String>> blobInfos) {
232        log.debug("Loading blobs from the file system: " + blobInfos);
233        List<Blob> blobs = new ArrayList<>();
234        for (Map<String, String> info : blobInfos) {
235            File blobFile = new File(cacheDir, info.get("file"));
236            Blob blob = new FileBlob(blobFile);
237            blob.setEncoding(info.get("encoding"));
238            blob.setMimeType(info.get("mimetype"));
239            blob.setFilename(info.get("filename"));
240            blob.setDigest(info.get("digest"));
241            blobs.add(blob);
242        }
243        return blobs;
244    }
245
246    @Override
247    public int getStorageSizeMB() {
248        return (int) getStorageSize() / (1024 * 1024);
249    }
250
251    @Override
252    public void doGC() {
253        log.debug(String.format("Performing GC for TransientStore %s", config.getName()));
254        long newSize = 0;
255        try {
256            try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(cacheDir.getAbsolutePath()))) {
257                for (Path entry : stream) {
258                    String key = getKeyCachingDirName(entry.getFileName().toString());
259                    if (exists(key)) {
260                        newSize += getFilePathSize(entry);
261                        continue;
262                    }
263                    FileUtils.deleteQuietly(entry.toFile());
264                }
265            }
266        } catch (IOException e) {
267            log.error("Error while performing GC", e);
268        }
269        setStorageSize(newSize);
270    }
271
272    protected String getKeyCachingDirName(String dir) {
273        String key = dir.replaceAll("_", "/");
274        return new String(Base64.decodeBase64(key));
275    }
276
277    protected long getFilePathSize(Path entry) {
278        long size = 0;
279        for (File file : entry.toFile().listFiles()) {
280            size += file.length();
281        }
282        return size;
283    }
284
285    @Override
286    public void removeAll() {
287        log.debug("Removing all entries from TransientStore " + config.getName());
288        removeAllEntries();
289        doGC();
290    }
291
292    public File getCacheDir() {
293        return cacheDir;
294    }
295
296}