001/*
002 * (C) Copyright 2015 Nuxeo SAS (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 * Nuxeo - initial API and implementation
016 */
017
018package org.nuxeo.ecm.core.transientstore;
019
020import java.io.File;
021import java.io.IOException;
022import java.nio.file.DirectoryStream;
023import java.nio.file.Files;
024import java.nio.file.Path;
025import java.nio.file.Paths;
026
027import org.apache.commons.codec.binary.Base64;
028import org.apache.commons.io.FileUtils;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.common.Environment;
032import org.nuxeo.ecm.core.cache.Cache;
033import org.nuxeo.ecm.core.cache.CacheDescriptor;
034import org.nuxeo.ecm.core.cache.CacheService;
035import org.nuxeo.ecm.core.cache.CacheServiceImpl;
036import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
037import org.nuxeo.ecm.core.transientstore.api.StorageEntry;
038import org.nuxeo.ecm.core.transientstore.api.TransientStore;
039import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
040import org.nuxeo.runtime.api.Framework;
041
042/**
043 * Base class for {@link TransientStore} implementation.
044 *
045 * @author <a href="mailto:tdelprat@nuxeo.com">Tiry</a>
046 * @since 7.2
047 */
048public abstract class AbstractTransientStore implements TransientStore {
049
050    protected TransientStoreConfig config;
051
052    protected static final Log log = LogFactory.getLog(AbstractTransientStore.class);
053
054    protected File cacheDir;
055
056    protected Cache l1Cache;
057
058    protected Cache l2Cache;
059
060    protected CacheDescriptor l1cd;
061
062    protected CacheDescriptor l2cd;
063
064    @Override
065    public void init(TransientStoreConfig config) {
066        this.config = config;
067        CacheService cs = Framework.getService(CacheService.class);
068        if (cs == null) {
069            throw new UnsupportedOperationException("Cache service is required");
070        }
071        // register the caches
072        l1cd = getL1CacheConfig();
073        l2cd = getL2CacheConfig();
074        ((CacheServiceImpl) cs).registerCache(l1cd);
075        ((CacheServiceImpl) cs).registerCache(l2cd);
076        l1cd.start();
077        l2cd.start();
078
079        // get caches
080        l1Cache = cs.getCache(l1cd.name);
081        l2Cache = cs.getCache(l2cd.name);
082
083        // initialize caching directory
084        File transienStoreHome = new File(Environment.getDefault().getData(), "transientstores");
085        File data = new File(transienStoreHome, config.getName());
086        data.mkdirs();
087        cacheDir = data.getAbsoluteFile();
088    }
089
090    @Override
091    public void shutdown() {
092        CacheService cs = Framework.getService(CacheService.class);
093        if (cs != null) {
094            if (l1cd != null) {
095                ((CacheServiceImpl) cs).unregisterCache(l1cd);
096            }
097            if (l2cd != null) {
098                ((CacheServiceImpl) cs).unregisterCache(l2cd);
099            }
100        }
101    }
102
103    protected abstract void incrementStorageSize(long size);
104
105    protected abstract void decrementStorageSize(long size);
106
107    protected void incrementStorageSize(StorageEntry entry) {
108        incrementStorageSize(entry.getSize());
109    }
110
111    protected void decrementStorageSize(StorageEntry entry) {
112        decrementStorageSize(entry.getSize());
113    }
114
115    public abstract long getStorageSize();
116
117    protected abstract void setStorageSize(long newSize);
118
119    public Cache getL1Cache() {
120        return l1Cache;
121    }
122
123    public Cache getL2Cache() {
124        return l2Cache;
125    }
126
127    @Override
128    public void put(StorageEntry entry) {
129        if (config.getAbsoluteMaxSizeMB() < 0 || getStorageSize() < config.getAbsoluteMaxSizeMB() * (1024 * 1024)) {
130            StorageEntry old = get(entry.getId());
131            if (old != null) {
132                decrementStorageSize(old.getLastStorageSize());
133            }
134            incrementStorageSize(entry);
135            entry = persistEntry(entry);
136            getL1Cache().put(entry.getId(), entry);
137        } else {
138            throw new MaximumTransientSpaceExceeded();
139        }
140    }
141
142    protected StorageEntry persistEntry(StorageEntry entry) {
143        entry.persist(getCachingDirectory(entry.getId()));
144        return entry;
145    }
146
147    @Override
148    public StorageEntry get(String key) {
149        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
150        if (entry == null) {
151            entry = (StorageEntry) getL2Cache().get(key);
152        }
153        if (entry != null) {
154            entry.load(getCachingDirectory(key));
155        }
156        return entry;
157    }
158
159    @Override
160    public void remove(String key) {
161        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
162        if (entry == null) {
163            entry = (StorageEntry) getL2Cache().get(key);
164            getL2Cache().invalidate(key);
165        } else {
166            getL1Cache().invalidate(key);
167        }
168        if (entry != null) {
169            decrementStorageSize(entry);
170            entry.beforeRemove();
171        }
172    }
173
174    @Override
175    public void release(String key) {
176        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
177        if (entry != null) {
178            getL1Cache().invalidate(key);
179            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
180                getL2Cache().put(key, entry);
181            }
182        }
183    }
184
185    @Override
186    public int getStorageSizeMB() {
187        return (int) getStorageSize() / (1024 * 1024);
188    }
189
190    protected String getCachingDirName(String key) {
191        String dirName = Base64.encodeBase64String(key.getBytes());
192        dirName = dirName.replaceAll("/", "_");
193        return dirName;
194    }
195
196    protected String getKeyCachingDirName(String dir) {
197        String key = dir.replaceAll("_", "/");
198        return new String(Base64.decodeBase64(key));
199    }
200
201    public File getCachingDirectory(String key) {
202        File cachingDir = new File(cacheDir, getCachingDirName(key));
203        if (!cachingDir.exists()) {
204            cachingDir.mkdir();
205        }
206        return cachingDir;
207    }
208
209    @Override
210    public void doGC() {
211        log.debug(String.format("Performing GC for TransientStore %s", config.getName()));
212        File dir = cacheDir;
213        long newSize = 0;
214        try {
215            try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(dir.getAbsolutePath()))) {
216                for (Path entry : stream) {
217                    String key = getKeyCachingDirName(entry.getFileName().toString());
218                    try {
219                        if (getL1Cache().hasEntry(key)) {
220                            newSize += getSize(entry);
221                            continue;
222                        }
223                        if (getL2Cache().hasEntry(key)) {
224                            newSize += getSize(entry);
225                            continue;
226                        }
227                        FileUtils.deleteDirectory(entry.toFile());
228                    } catch (IOException e) {
229                        log.error("Error while performing GC", e);
230                    }
231                }
232            }
233        } catch (IOException e) {
234            log.error("Error while performing GC", e);
235        }
236        setStorageSize(newSize);
237    }
238
239    protected long getSize(Path entry) {
240        long size = 0;
241        for (File file : entry.toFile().listFiles()) {
242            size += file.length();
243        }
244        return size;
245    }
246
247    public abstract Class<? extends Cache> getCacheImplClass();
248
249    protected class TransientCacheConfig extends CacheDescriptor {
250
251        TransientCacheConfig(String name, int ttl) {
252            super();
253            super.name = name;
254            super.implClass = getCacheImplClass();
255            super.ttl = ttl;
256        }
257    }
258
259    protected CacheDescriptor getL1CacheConfig() {
260        return new TransientCacheConfig(config.getName() + "L1", config.getFistLevelTTL());
261    }
262
263    protected CacheDescriptor getL2CacheConfig() {
264        return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL());
265    }
266
267}