001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.apache.commons.io.FileUtils.deleteDirectory;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.METADATA_FILE;
024
025import java.io.Externalizable;
026import java.io.IOException;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Set;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.lib.stream.codec.Codec;
041import org.nuxeo.lib.stream.log.LogConfig;
042import org.nuxeo.lib.stream.log.LogLag;
043import org.nuxeo.lib.stream.log.LogPartition;
044import org.nuxeo.lib.stream.log.LogTailer;
045import org.nuxeo.lib.stream.log.Name;
046import org.nuxeo.lib.stream.log.RebalanceListener;
047import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
048import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
049
050/**
051 * @since 9.3
052 */
053public class ChronicleLogManager extends AbstractLogManager {
054    private static final Log log = LogFactory.getLog(ChronicleLogManager.class);
055
056    protected final List<ChronicleLogConfig> configs;
057
058    protected final ChronicleLogConfig defaultConfig;
059
060    public ChronicleLogManager(Path basePath) {
061        this(basePath, null);
062    }
063
064    /**
065     * Constructor
066     *
067     * @param basePath the base path.
068     * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the
069     *            retention duration expires, the older files are candidates for being purged. The property can be
070     *            expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h'
071     *            in hours and 'd' in days)
072     */
073    public ChronicleLogManager(Path basePath, String retentionDuration) {
074        this(Collections.singletonList(
075                new ChronicleLogConfig("unknown", true, Collections.emptyList(), basePath, retentionDuration)));
076    }
077
078    public ChronicleLogManager(List<ChronicleLogConfig> configs) {
079        this.configs = configs;
080        this.defaultConfig = findDefaultConfig();
081    }
082
083    protected ChronicleLogConfig findDefaultConfig() {
084        List<ChronicleLogConfig> defaultConfigs = configs.stream()
085                                                         .filter(LogConfig::isDefault)
086                                                         .collect(Collectors.toList());
087        // use the last default config
088        if (defaultConfigs.isEmpty()) {
089            return configs.get(configs.size() - 1);
090        }
091        return defaultConfigs.get(defaultConfigs.size() - 1);
092    }
093
094    protected ChronicleLogConfig getConfig(Name name) {
095        return configs.stream().filter(config -> config.match(name)).findFirst().orElse(defaultConfig);
096    }
097
098    protected ChronicleLogConfig getConfig(Name name, Name group) {
099        return configs.stream().filter(config -> config.match(name, group)).findFirst().orElse(defaultConfig);
100    }
101
102    protected static void deleteQueueBasePath(Path basePath) {
103        try {
104            log.info("Removing Chronicle Queues directory: " + basePath);
105            // Performs a recursive delete if the directory contains only chronicles files
106            try (Stream<Path> paths = Files.list(basePath)) {
107                int count = (int) paths.filter(path -> (path.toFile().isFile() && !isChronicleLogFile(path))).count();
108                if (count > 0) {
109                    String msg = "ChronicleLog basePath: " + basePath
110                            + " contains unknown files, please choose another basePath";
111                    log.error(msg);
112                    throw new IllegalArgumentException(msg);
113                }
114            }
115            deleteDirectory(basePath.toFile());
116        } catch (IOException e) {
117            String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
118            log.error(msg, e);
119            throw new IllegalArgumentException(msg, e);
120        }
121    }
122
123    protected static boolean isChronicleLogFile(Path path) {
124        String filename = path.getFileName().toString();
125        return filename.endsWith(".cq4") || filename.endsWith(".cq4t") || METADATA_FILE.equals(filename);
126    }
127
128    public String getBasePath() {
129        return defaultConfig.getBasePath().toAbsolutePath().toString();
130    }
131
132    @Override
133    public boolean exists(Name name) {
134        ChronicleLogConfig config = getConfig(name);
135        try (Stream<Path> paths = Files.list(config.getBasePath().resolve(name.getId()))) {
136            return paths.count() > 0;
137        } catch (IOException e) {
138            return false;
139        }
140    }
141
142    @SuppressWarnings("unchecked")
143    @Override
144    public void create(Name name, int size) {
145        ChronicleLogConfig config = getConfig(name);
146        ChronicleLogAppender.create(config, name, size, NO_CODEC).close();
147    }
148
149    @Override
150    protected int getSize(Name name) {
151        ChronicleLogConfig config = getConfig(name);
152        return ChronicleLogAppender.partitions(config.getBasePath().resolve(name.getId()));
153    }
154
155    @Override
156    public boolean delete(Name name) {
157        ChronicleLogConfig config = getConfig(name);
158        Path path = config.getBasePath().resolve(name.getId());
159        if (path.toFile().isDirectory()) {
160            deleteQueueBasePath(path);
161            return true;
162        }
163        return false;
164    }
165
166    @SuppressWarnings("unchecked")
167    protected LogLag getLagForPartition(Name name, int partition, Name group) {
168        ChronicleLogConfig config = getConfig(name);
169        long pos;
170        Path path = config.getBasePath().resolve(name.getId());
171        if (!ChronicleLogOffsetTracker.exists(path, group)) {
172            pos = 0;
173        } else {
174            try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition,
175                    group, ChronicleRetentionDuration.disableOf(config.getRetention()))) {
176                pos = offsetTracker.readLastCommittedOffset();
177            }
178        }
179        try (ChronicleLogAppender<Externalizable> appender = ChronicleLogAppender.openWithoutRetention(config, name,
180                NO_CODEC)) {
181            // this trigger an acquire/release on cycle
182            long end = appender.endOffset(partition);
183            if (pos == 0) {
184                pos = appender.firstOffset(partition);
185            }
186            long lag = appender.countMessages(partition, pos, end);
187            long firstOffset = appender.firstOffset(partition);
188            long endMessages = appender.countMessages(partition, firstOffset, end);
189            return new LogLag(pos, end, lag, endMessages);
190        }
191    }
192
193    @Override
194    public List<LogLag> getLagPerPartition(Name name, Name group) {
195        int size = size(name);
196        List<LogLag> ret = new ArrayList<>(size);
197        for (int i = 0; i < size; i++) {
198            ret.add(getLagForPartition(name, i, group));
199        }
200        return ret;
201    }
202
203    @Override
204    public String toString() {
205        return "ChronicleLogManager{" + "configs=" + configs + ", defaultConfig=" + defaultConfig + '}';
206    }
207
208    @Override
209    public List<Name> listAllNames() {
210        Set<Name> names = new HashSet<>();
211        for (ChronicleLogConfig config : configs) {
212            try (Stream<Path> paths = Files.list(config.getBasePath())) {
213                paths.filter(Files::isDirectory)
214                        .map(Path::getFileName)
215                        .map(Path::toString)
216                     .forEach(name -> names.add(Name.ofId(name)));
217            } catch (IOException e) {
218                log.info("Nothing to list in CQ config: " + config);
219            } catch (IllegalArgumentException e) {
220                log.error("Invalid log name in " + config.getBasePath(), e);
221                throw e;
222            }
223        }
224        return new ArrayList<>(names);
225    }
226
227    @Override
228    public List<Name> listConsumerGroups(Name name) {
229        ChronicleLogConfig config = getConfig(name);
230        Path logRoot = config.getBasePath().resolve(name.getId());
231        if (!logRoot.toFile().exists()) {
232            throw new IllegalArgumentException("Unknown Log: " + name);
233        }
234        try (Stream<Path> paths = Files.list(logRoot)) {
235            return paths.filter(Files::isDirectory)
236                        .map(Path::getFileName)
237                        .map(Path::toString)
238                        .filter(ChronicleLogOffsetTracker::isOffsetTracker)
239                        .map(ChronicleLogOffsetTracker::getGroupFromDirectory)
240                        .map(Name::ofId)
241                        .collect(Collectors.toList());
242        } catch (IOException e) {
243            throw new IllegalArgumentException("Cannot access Log: " + name, e);
244        }
245    }
246
247    @Override
248    public <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec) {
249        ChronicleLogConfig config = getConfig(name);
250        return ChronicleLogAppender.open(config, name, codec);
251    }
252
253    @Override
254    @SuppressWarnings("unchecked")
255    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, Name group,
256            Codec<M> codec) {
257        Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size());
258        partitions.forEach(partition -> pTailers.add(
259                (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name(), codec)).createTailer(
260                        partition, group, codec)));
261        if (pTailers.size() == 1) {
262            return pTailers.iterator().next();
263        }
264        return new ChronicleCompoundLogTailer<>(pTailers, group);
265    }
266
267    @Override
268    protected <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names,
269            RebalanceListener listener, Codec<M> codec) {
270        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
271    }
272
273}