001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log.chronicle;
020
021import static org.apache.commons.io.FileUtils.deleteDirectory;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023import static org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender.METADATA_FILE;
024
025import java.io.Externalizable;
026import java.io.IOException;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.List;
032import java.util.stream.Collectors;
033import java.util.stream.Stream;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.codec.Codec;
038import org.nuxeo.lib.stream.log.LogLag;
039import org.nuxeo.lib.stream.log.LogPartition;
040import org.nuxeo.lib.stream.log.LogTailer;
041import org.nuxeo.lib.stream.log.RebalanceListener;
042import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
043import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
044
045/**
046 * @since 9.3
047 */
048public class ChronicleLogManager extends AbstractLogManager {
049    private static final Log log = LogFactory.getLog(ChronicleLogManager.class);
050
051    protected final Path basePath;
052
053    protected final ChronicleRetentionDuration retention;
054
055    public ChronicleLogManager(Path basePath) {
056        this(basePath, null);
057    }
058
059    /**
060     * Constructor
061     *
062     * @param basePath the base path.
063     * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the
064     *            retention duration expires, the older files are candidates for being purged. The property can be
065     *            expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h'
066     *            in hours and 'd' in days)
067     */
068    public ChronicleLogManager(Path basePath, String retentionDuration) {
069        this.basePath = basePath;
070        this.retention = new ChronicleRetentionDuration(retentionDuration);
071    }
072
073    protected static void deleteQueueBasePath(Path basePath) {
074        try {
075            log.info("Removing Chronicle Queues directory: " + basePath);
076            // Performs a recursive delete if the directory contains only chronicles files
077            try (Stream<Path> paths = Files.list(basePath)) {
078                int count = (int) paths.filter(path -> (path.toFile().isFile() && !isChronicleLogFile(path))).count();
079                if (count > 0) {
080                    String msg = "ChronicleLog basePath: " + basePath
081                            + " contains unknown files, please choose another basePath";
082                    log.error(msg);
083                    throw new IllegalArgumentException(msg);
084                }
085            }
086            deleteDirectory(basePath.toFile());
087        } catch (IOException e) {
088            String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
089            log.error(msg, e);
090            throw new IllegalArgumentException(msg, e);
091        }
092    }
093
094    protected static boolean isChronicleLogFile(Path path) {
095        String filename = path.getFileName().toString();
096        return filename.endsWith(".cq4") || filename.endsWith(".cq4t") || METADATA_FILE.equals(filename);
097    }
098
099    public String getBasePath() {
100        return basePath.toAbsolutePath().toString();
101    }
102
103    @Override
104    public boolean exists(String name) {
105        try (Stream<Path> paths = Files.list(basePath.resolve(name))) {
106            return paths.count() > 0;
107        } catch (IOException e) {
108            return false;
109        }
110    }
111
112    @SuppressWarnings("unchecked")
113    @Override
114    public void create(String name, int size) {
115        ChronicleLogAppender.create(NO_CODEC, basePath.resolve(name).toFile(), size, retention).close();
116    }
117
118    @Override
119    protected int getSize(String name) {
120        return ChronicleLogAppender.partitions(basePath.resolve(name));
121    }
122
123    @Override
124    public boolean delete(String name) {
125        Path path = basePath.resolve(name);
126        if (path.toFile().isDirectory()) {
127            deleteQueueBasePath(path);
128            return true;
129        }
130        return false;
131    }
132
133    @SuppressWarnings("unchecked")
134    protected LogLag getLagForPartition(String name, int partition, String group) {
135        long pos;
136        Path path = basePath.resolve(name);
137        if (!ChronicleLogOffsetTracker.exists(path, group)) {
138            pos = 0;
139        } else {
140            try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition,
141                    group, ChronicleRetentionDuration.disableOf(retention))) {
142                pos = offsetTracker.readLastCommittedOffset();
143            }
144        }
145        try (ChronicleLogAppender<Externalizable> appender = ChronicleLogAppender.open(NO_CODEC,
146                basePath.resolve(name).toFile())) {
147            // this trigger an acquire/release on cycle
148            long end = appender.endOffset(partition);
149            if (pos == 0) {
150                pos = appender.firstOffset(partition);
151            }
152            long lag = appender.countMessages(partition, pos, end);
153            long firstOffset = appender.firstOffset(partition);
154            long endMessages = appender.countMessages(partition, firstOffset, end);
155            return new LogLag(pos, end, lag, endMessages);
156        }
157    }
158
159    @Override
160    public List<LogLag> getLagPerPartition(String name, String group) {
161        int size = size(name);
162        List<LogLag> ret = new ArrayList<>(size);
163        for (int i = 0; i < size; i++) {
164            ret.add(getLagForPartition(name, i, group));
165        }
166        return ret;
167    }
168
169    @Override
170    public String toString() {
171        return "ChronicleLogManager{" + "basePath=" + basePath + ", retention='" + retention + '\'' + '}';
172    }
173
174    @Override
175    public List<String> listAll() {
176        try (Stream<Path> paths = Files.list(basePath)) {
177            return paths.filter(Files::isDirectory)
178                        .map(Path::getFileName)
179                        .map(Path::toString)
180                        .collect(Collectors.toList());
181        } catch (IOException e) {
182            throw new IllegalArgumentException("Invalid base path: " + basePath, e);
183        }
184    }
185
186    @Override
187    public List<String> listConsumerGroups(String name) {
188        Path logRoot = basePath.resolve(name);
189        if (!logRoot.toFile().exists()) {
190            throw new IllegalArgumentException("Unknown Log: " + name);
191        }
192        try (Stream<Path> paths = Files.list(logRoot)) {
193            return paths.filter(Files::isDirectory)
194                        .map(Path::getFileName)
195                        .map(Path::toString)
196                        .filter(ChronicleLogOffsetTracker::isOffsetTracker)
197                        .map(ChronicleLogOffsetTracker::getGroupFromDirectory)
198                        .collect(Collectors.toList());
199        } catch (IOException e) {
200            throw new IllegalArgumentException("Cannot access Log: " + name, e);
201        }
202    }
203
204    @Override
205    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name, Codec<M> codec) {
206        return ChronicleLogAppender.open(codec, basePath.resolve(name).toFile(), retention);
207    }
208
209    @Override
210    @SuppressWarnings("unchecked")
211    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group,
212            Codec<M> codec) {
213        Collection<ChronicleLogTailer<M>> pTailers = new ArrayList<>(partitions.size());
214        partitions.forEach(partition -> pTailers.add(
215                (ChronicleLogTailer<M>) ((ChronicleLogAppender<M>) getAppender(partition.name(), codec)).createTailer(
216                        partition, group, codec)));
217        if (pTailers.size() == 1) {
218            return pTailers.iterator().next();
219        }
220        return new ChronicleCompoundLogTailer<>(pTailers, group);
221    }
222
223    @Override
224    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names,
225            RebalanceListener listener, Codec<M> codec) {
226        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
227
228    }
229
230}