001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.apache.commons.io.FileUtils.deleteDirectory;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.io.Externalizable;
025import java.io.IOException;
026import java.nio.file.Files;
027import java.nio.file.Path;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.List;
031import java.util.stream.Collectors;
032import java.util.stream.Stream;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.log.LogLag;
038import org.nuxeo.lib.stream.log.LogPartition;
039import org.nuxeo.lib.stream.log.LogTailer;
040import org.nuxeo.lib.stream.log.RebalanceListener;
041import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
042import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
043
044/**
045 * @since 9.3
046 */
047public class ChronicleLogManager extends AbstractLogManager {
048    private static final Log log = LogFactory.getLog(ChronicleLogManager.class);
049
050    protected final Path basePath;
051
052    protected final ChronicleRetentionDuration retention;
053
054    public ChronicleLogManager(Path basePath) {
055        this(basePath, null);
056    }
057
058    /**
059     * Constructor
060     *
061     * @param basePath the base path.
062     * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the
063     *            retention duration expires, the older files are candidates for being purged. The property can be
064     *            expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h'
065     *            in hours and 'd' in days)
066     */
067    public ChronicleLogManager(Path basePath, String retentionDuration) {
068        this.basePath = basePath;
069        this.retention = new ChronicleRetentionDuration(retentionDuration);
070    }
071
072    protected static void deleteQueueBasePath(Path basePath) {
073        try {
074            log.info("Removing Chronicle Queues directory: " + basePath);
075            // Performs a recursive delete if the directory contains only chronicles files
076            try (Stream<Path> paths = Files.list(basePath)) {
077                int count = (int) paths.filter(path -> (path.toFile().isFile() && !path.toString().endsWith(".cq4")))
078                                       .count();
079                if (count > 0) {
080                    String msg = "ChronicleLog basePath: " + basePath
081                            + " contains unknown files, please choose another basePath";
082                    log.error(msg);
083                    throw new IllegalArgumentException(msg);
084                }
085            }
086            deleteDirectory(basePath.toFile());
087        } catch (IOException e) {
088            String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
089            log.error(msg, e);
090            throw new IllegalArgumentException(msg, e);
091        }
092    }
093
094    public String getBasePath() {
095        return basePath.toAbsolutePath().toString();
096    }
097
098    @Override
099    public boolean exists(String name) {
100        try (Stream<Path> paths = Files.list(basePath.resolve(name))) {
101            return paths.count() > 0;
102        } catch (IOException e) {
103            return false;
104        }
105    }
106
107    @SuppressWarnings("unchecked")
108    @Override
109    public void create(String name, int size) {
110        ChronicleLogAppender.create(NO_CODEC, basePath.resolve(name).toFile(), size, retention).close();
111    }
112
113    @Override
114    protected int getSize(String name) {
115        return ChronicleLogAppender.partitions(basePath.resolve(name).toFile());
116    }
117
118    @Override
119    public boolean delete(String name) {
120        Path path = basePath.resolve(name);
121        if (path.toFile().isDirectory()) {
122            deleteQueueBasePath(path);
123            return true;
124        }
125        return false;
126    }
127
128    @SuppressWarnings("unchecked")
129    protected LogLag getLagForPartition(String name, int partition, String group) {
130        long pos;
131        Path path = basePath.resolve(name);
132        if (!ChronicleLogOffsetTracker.exists(path, group)) {
133            pos = 0;
134        } else {
135            try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition,
136                    group)) {
137                pos = offsetTracker.readLastCommittedOffset();
138            }
139        }
140        try (ChronicleLogAppender<Externalizable> appender = ChronicleLogAppender.open(NO_CODEC,
141                basePath.resolve(name).toFile())) {
142            // this trigger an acquire/release on cycle
143            long end = appender.endOffset(partition);
144            if (pos == 0) {
145                pos = appender.firstOffset(partition);
146            }
147            long lag = appender.countMessages(partition, pos, end);
148            long firstOffset = appender.firstOffset(partition);
149            long endMessages = appender.countMessages(partition, firstOffset, end);
150            return new LogLag(pos, end, lag, endMessages);
151        }
152    }
153
154    @Override
155    public List<LogLag> getLagPerPartition(String name, String group) {
156        int size = size(name);
157        List<LogLag> ret = new ArrayList<>(size);
158        for (int i = 0; i < size; i++) {
159            ret.add(getLagForPartition(name, i, group));
160        }
161        return ret;
162    }
163
164    @Override
165    public String toString() {
166        return "ChronicleLogManager{" + "basePath=" + basePath + ", retention='" + retention + '\'' + '}';
167    }
168
169    @Override
170    public List<String> listAll() {
171        try (Stream<Path> paths = Files.list(basePath)) {
172            return paths.filter(Files::isDirectory).map(Path::getFileName).map(Path::toString).collect(
173                    Collectors.toList());
174        } catch (IOException e) {
175            throw new IllegalArgumentException("Invalid base path: " + basePath, e);
176        }
177    }
178
179    @Override
180    public List<String> listConsumerGroups(String name) {
181        Path logRoot = basePath.resolve(name);
182        if (!logRoot.toFile().exists()) {
183            throw new IllegalArgumentException("Unknown Log: " + name);
184        }
185        try (Stream<Path> paths = Files.list(logRoot)) {
186            return paths.filter(Files::isDirectory)
187                        .map(Path::getFileName)
188                        .map(Path::toString)
189                        .filter(ChronicleLogOffsetTracker::isOffsetTracker)
190                        .map(ChronicleLogOffsetTracker::getGroupFromDirectory)
191                        .collect(Collectors.toList());
192        } catch (IOException e) {
193            throw new IllegalArgumentException("Cannot access Log: " + name, e);
194        }
195    }
196
197    @Override
198    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec) {
199        return ChronicleLogAppender.open(codec, basePath.resolve(name).toFile(), retention);
200    }
201
202    @Override
203    @SuppressWarnings("unchecked")
204    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group,
205            Codec<M> codec) {
206        Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size());
207        partitions.forEach(partition -> pTailers.add(
208                (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name(), codec)).createTailer(
209                        partition, group, codec)));
210        if (pTailers.size() == 1) {
211            return pTailers.iterator().next();
212        }
213        return new ChronicleCompoundLogTailer<>(pTailers, group);
214    }
215
216    @Override
217    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
218            RebalanceListener listener, Codec<M> codec) {
219        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
220
221    }
222
223}