001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.File;
024import java.io.IOException;
025import java.io.Serializable;
026import java.nio.file.DirectoryStream;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.Paths;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.UUID;
036
037import org.apache.commons.codec.binary.Base64;
038import org.apache.commons.io.FileUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.common.Environment;
042import org.nuxeo.ecm.core.api.Blob;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
045import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
046import org.nuxeo.ecm.core.transientstore.api.TransientStore;
047import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
048
049/**
050 * Base class for a {@link TransientStore} implementation.
051 *
052 * @since 7.2
053 */
054public abstract class AbstractTransientStore implements TransientStore {
055
056    protected static final Log log = LogFactory.getLog(AbstractTransientStore.class);
057
058    protected TransientStoreConfig config;
059
060    protected File cacheDir;
061
062    @Override
063    public void init(TransientStoreConfig config) {
064        this.config = config;
065
066        // initialize caching directory
067        File transienStoreHome = new File(Environment.getDefault().getData(), "transientstores");
068        File data = new File(transienStoreHome, config.getName());
069        data.mkdirs();
070        cacheDir = data.getAbsoluteFile();
071    }
072
073    @Override
074    public abstract void shutdown();
075
076    @Override
077    public abstract boolean exists(String key);
078
079    @Override
080    public abstract Set<String> keySet();
081
082    @Override
083    public abstract void putParameter(String key, String parameter, Serializable value);
084
085    @Override
086    public abstract Serializable getParameter(String key, String parameter);
087
088    @Override
089    public abstract void putParameters(String key, Map<String, Serializable> parameters);
090
091    @Override
092    public abstract Map<String, Serializable> getParameters(String key);
093
094    @Override
095    public abstract List<Blob> getBlobs(String key);
096
097    @Override
098    public abstract long getSize(String key);
099
100    @Override
101    public abstract boolean isCompleted(String key);
102
103    @Override
104    public abstract void setCompleted(String key, boolean completed);
105
106    @Override
107    public abstract void remove(String key);
108
109    @Override
110    public abstract void release(String key);
111
112    /**
113     * Updates the total storage size and the storage size of the entry with the given {@code key} according to
114     * {@code sizeOfBlobs} and stores the blob information in this entry.
115     */
116    protected abstract void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos);
117
118    /**
119     * Returns the size of the disk storage in bytes.
120     */
121    public abstract long getStorageSize();
122
123    /**
124     * Sets the size of the disk storage in bytes.
125     */
126    protected abstract void setStorageSize(long newSize);
127
128    protected abstract long incrementStorageSize(long size);
129
130    protected abstract long decrementStorageSize(long size);
131
132    protected abstract void removeAllEntries();
133
134    @Override
135    public void putBlobs(String key, List<Blob> blobs) {
136        if (config.getAbsoluteMaxSizeMB() < 0 || getStorageSize() < config.getAbsoluteMaxSizeMB() * (1024 * 1024)) {
137            // Store blobs on the file system
138            List<Map<String, String>> blobInfos = storeBlobs(key, blobs);
139            // Persist blob information in the store
140            persistBlobs(key, getSizeOfBlobs(blobs), blobInfos);
141        } else {
142            throw new MaximumTransientSpaceExceeded();
143        }
144    }
145
146    protected List<Map<String, String>> storeBlobs(String key, List<Blob> blobs) {
147        if (blobs == null) {
148            return null;
149        }
150        // Store blobs on the file system and compute blob information
151        List<Map<String, String>> blobInfos = new ArrayList<>();
152        for (Blob blob : blobs) {
153            Map<String, String> blobInfo = new HashMap<>();
154            File cachingDir = getCachingDirectory(key);
155            String uuid = UUID.randomUUID().toString();
156            File cachedFile = new File(cachingDir, uuid);
157            try {
158                if (blob instanceof FileBlob && ((FileBlob) blob).isTemporary()) {
159                    ((FileBlob) blob).moveTo(cachedFile);
160                } else {
161                    blob.transferTo(cachedFile);
162                }
163            } catch (IOException e) {
164                throw new NuxeoException(e);
165            }
166            Path cachedFileRelativePath = Paths.get(cachingDir.getName(), uuid);
167            blobInfo.put("file", cachedFileRelativePath.toString());
168            // Redis doesn't support null values
169            if (blob.getFilename() != null) {
170                blobInfo.put("filename", blob.getFilename());
171            }
172            if (blob.getEncoding() != null) {
173                blobInfo.put("encoding", blob.getEncoding());
174            }
175            if (blob.getMimeType() != null) {
176                blobInfo.put("mimetype", blob.getMimeType());
177            }
178            if (blob.getDigest() != null) {
179                blobInfo.put("digest", blob.getDigest());
180            }
181            blobInfos.add(blobInfo);
182        }
183        log.debug("Stored blobs on the file system: " + blobInfos);
184        return blobInfos;
185    }
186
187    public File getCachingDirectory(String key) {
188        String cachingDirName = getCachingDirName(key);
189        try {
190            File cachingDir = new File(cacheDir.getCanonicalFile(), cachingDirName);
191            if (!cachingDir.getCanonicalPath().startsWith(cacheDir.getCanonicalPath())) {
192                throw new NuxeoException("Trying to traverse illegal path: " + cachingDir + " for key: " + key);
193            }
194            if (!cachingDir.exists()) {
195                cachingDir.mkdir();
196            }
197            return cachingDir;
198        } catch (IOException e) {
199            throw new NuxeoException("Error when trying to access cache directory: " + cacheDir + "/" + cachingDirName
200                    + " for key: " + key, e);
201        }
202    }
203
204    protected String getCachingDirName(String key) {
205        String dirName = Base64.encodeBase64String(key.getBytes());
206        dirName = dirName.replaceAll("/", "_");
207        return dirName;
208    }
209
210    protected long getSizeOfBlobs(List<Blob> blobs) {
211        int size = 0;
212        if (blobs != null) {
213            for (Blob blob : blobs) {
214                long blobLength = blob.getLength();
215                if (blobLength > -1) {
216                    size += blobLength;
217                }
218            }
219        }
220        return size;
221    }
222
223    protected List<Blob> loadBlobs(List<Map<String, String>> blobInfos) {
224        log.debug("Loading blobs from the file system: " + blobInfos);
225        List<Blob> blobs = new ArrayList<>();
226        for (Map<String, String> info : blobInfos) {
227            File blobFile = new File(cacheDir, info.get("file"));
228            Blob blob = new FileBlob(blobFile);
229            blob.setEncoding(info.get("encoding"));
230            blob.setMimeType(info.get("mimetype"));
231            blob.setFilename(info.get("filename"));
232            blob.setDigest(info.get("digest"));
233            blobs.add(blob);
234        }
235        return blobs;
236    }
237
238    @Override
239    public int getStorageSizeMB() {
240        return (int) getStorageSize() / (1024 * 1024);
241    }
242
243    @Override
244    public void doGC() {
245        log.debug(String.format("Performing GC for TransientStore %s", config.getName()));
246        long newSize = 0;
247        try {
248            try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(cacheDir.getAbsolutePath()))) {
249                for (Path entry : stream) {
250                    String key = getKeyCachingDirName(entry.getFileName().toString());
251                    try {
252                        if (exists(key)) {
253                            newSize += getFilePathSize(entry);
254                            continue;
255                        }
256                        FileUtils.deleteDirectory(entry.toFile());
257                    } catch (IOException e) {
258                        log.error("Error while performing GC", e);
259                    }
260                }
261            }
262        } catch (IOException e) {
263            log.error("Error while performing GC", e);
264        }
265        setStorageSize(newSize);
266    }
267
268    protected String getKeyCachingDirName(String dir) {
269        String key = dir.replaceAll("_", "/");
270        return new String(Base64.decodeBase64(key));
271    }
272
273    protected long getFilePathSize(Path entry) {
274        long size = 0;
275        for (File file : entry.toFile().listFiles()) {
276            size += file.length();
277        }
278        return size;
279    }
280
281    @Override
282    public void removeAll() {
283        log.debug("Removing all entries from TransientStore " + config.getName());
284        removeAllEntries();
285        doGC();
286    }
287
288}