001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.apache.commons.io.FileUtils.deleteDirectory;
022
023import java.io.Externalizable;
024import java.io.IOException;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.List;
030import java.util.stream.Collectors;
031import java.util.stream.Stream;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.lib.stream.log.LogLag;
036import org.nuxeo.lib.stream.log.LogPartition;
037import org.nuxeo.lib.stream.log.LogTailer;
038import org.nuxeo.lib.stream.log.RebalanceListener;
039import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
040import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
041
042/**
043 * @since 9.3
044 */
045public class ChronicleLogManager extends AbstractLogManager {
046    private static final Log log = LogFactory.getLog(ChronicleLogManager.class);
047
048    protected final Path basePath;
049
050    protected final ChronicleRetentionDuration retention;
051
052    public ChronicleLogManager(Path basePath) {
053        this(basePath, null);
054    }
055
056    /**
057     * Constructor
058     *
059     * @param basePath the base path.
060     * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the
061     *            retention duration expires, the older files are candidates for being purged. The property can be
062     *            expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h'
063     *            in hours and 'd' in days)
064     */
065    public ChronicleLogManager(Path basePath, String retentionDuration) {
066        this.basePath = basePath;
067        this.retention = new ChronicleRetentionDuration(retentionDuration);
068    }
069
070    protected static void deleteQueueBasePath(Path basePath) {
071        try {
072            log.info("Removing Chronicle Queues directory: " + basePath);
073            // Performs a recursive delete if the directory contains only chronicles files
074            try (Stream<Path> paths = Files.list(basePath)) {
075                int count = (int) paths.filter(path -> (Files.isRegularFile(path) && !path.toString().endsWith(".cq4")))
076                                       .count();
077                if (count > 0) {
078                    String msg = "ChronicleLog basePath: " + basePath
079                            + " contains unknown files, please choose another basePath";
080                    log.error(msg);
081                    throw new IllegalArgumentException(msg);
082                }
083            }
084            deleteDirectory(basePath.toFile());
085        } catch (IOException e) {
086            String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
087            log.error(msg, e);
088            throw new IllegalArgumentException(msg, e);
089        }
090    }
091
092    public String getBasePath() {
093        return basePath.toAbsolutePath().toString();
094    }
095
096    @Override
097    public boolean exists(String name) {
098        try {
099            return Files.list(basePath.resolve(name)).count() > 0;
100        } catch (IOException e) {
101            return false;
102        }
103    }
104
105    @Override
106    public void create(String name, int size) {
107        ChronicleLogAppender.create(basePath.resolve(name).toFile(), size, retention).close();
108    }
109
110    @Override
111    public boolean delete(String name) {
112        Path path = basePath.resolve(name);
113        if (Files.isDirectory(path)) {
114            deleteQueueBasePath(path);
115            return true;
116        }
117        return false;
118    }
119
120    protected LogLag getLagForPartition(String name, int partition, String group) {
121        long pos;
122        Path path = basePath.resolve(name);
123        ChronicleLogAppender appender = (ChronicleLogAppender) getAppender(name);
124        if (!ChronicleLogOffsetTracker.exists(path, group)) {
125            pos = 0;
126        } else {
127            try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition,
128                    group)) {
129                pos = offsetTracker.readLastCommittedOffset();
130            }
131        }
132        // this trigger an acquire/release on cycle
133        long end = appender.endOffset(partition);
134        if (pos == 0) {
135            pos = appender.firstOffset(partition);
136        }
137        long lag = appender.countMessages(partition, pos, end);
138        long firstOffset = appender.firstOffset(partition);
139        long endMessages = appender.countMessages(partition, firstOffset, end);
140        return new LogLag(pos, end, lag, endMessages);
141    }
142
143    @Override
144    public List<LogLag> getLagPerPartition(String name, String group) {
145        int size = getAppender(name).size();
146        List<LogLag> ret = new ArrayList<>(size);
147        for (int i = 0; i < size; i++) {
148            ret.add(getLagForPartition(name, i, group));
149        }
150        return ret;
151    }
152
153    @Override
154    public String toString() {
155        return "ChronicleLogManager{" + "basePath=" + basePath + ", retention='" + retention + '\'' + '}';
156    }
157
158    @Override
159    public List<String> listAll() {
160        try {
161            return Files.list(basePath).filter(Files::isDirectory).map(Path::getFileName).map(Path::toString).collect(
162                    Collectors.toList());
163        } catch (IOException e) {
164            throw new IllegalArgumentException("Invalid base path: " + basePath, e);
165        }
166    }
167
168    @Override
169    public List<String> listConsumerGroups(String name) {
170        Path logRoot = basePath.resolve(name);
171        if (!Files.exists(logRoot)) {
172            throw new IllegalArgumentException("Unknown Log: " + name);
173        }
174        try {
175            return Files.list(logRoot)
176                        .filter(Files::isDirectory)
177                        .map(Path::getFileName)
178                        .map(Path::toString)
179                        .filter(ChronicleLogOffsetTracker::isOffsetTracker)
180                        .map(ChronicleLogOffsetTracker::getGroupFromDirectory)
181                        .collect(Collectors.toList());
182        } catch (IOException e) {
183            throw new IllegalArgumentException("Cannot access Log: " + name, e);
184        }
185    }
186
187    @Override
188    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) {
189        return ChronicleLogAppender.open(basePath.resolve(name).toFile(), retention);
190    }
191
192    @Override
193    @SuppressWarnings("unchecked")
194    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions,
195            String group) {
196        Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size());
197        partitions.forEach(partition -> pTailers.add(
198                (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name())).createTailer(
199                        partition, group)));
200        if (pTailers.size() == 1) {
201            return pTailers.iterator().next();
202        }
203        return new ChronicleCompoundLogTailer<>(pTailers, group);
204    }
205
206    @Override
207    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
208            RebalanceListener listener) {
209        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
210
211    }
212
213}