001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.AbstractMQManager;
028
029import java.io.Externalizable;
030import java.io.File;
031import java.io.IOException;
032import java.nio.file.Files;
033import java.nio.file.Path;
034import java.util.ArrayList;
035import java.util.Collection;
036import java.util.stream.Stream;
037
038import static org.apache.commons.io.FileUtils.deleteDirectory;
039
040/**
041 * @since 9.2
042 */
043public class ChronicleMQManager<M extends Externalizable> extends AbstractMQManager<M> {
044    private static final Log log = LogFactory.getLog(ChronicleMQManager.class);
045
046    /**
047     * Default retention duration for queue files
048     */
049    public static final String DEFAULT_RETENTION_DURATION = "4d";
050
051    private final Path basePath;
052
053    private final String retentionDuration;
054
055    public ChronicleMQManager(Path basePath) {
056        this.basePath = basePath;
057        this.retentionDuration = DEFAULT_RETENTION_DURATION;
058    }
059
060    /**
061     * Constructor
062     *
063     * @param basePath the base path.
064     * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the
065     *            retention duration expires, the older files are candidates for being purged. The property can be
066     *            expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h'
067     *            in hours and 'd' in days)
068     */
069    public ChronicleMQManager(Path basePath, String retentionDuration) {
070        this.basePath = basePath;
071        this.retentionDuration = retentionDuration == null ? DEFAULT_RETENTION_DURATION : retentionDuration;
072    }
073
074    public String getBasePath() {
075        return basePath.toAbsolutePath().toString();
076    }
077
078    @Override
079    public boolean exists(String name) {
080        File path = new File(basePath.toFile(), name);
081        return path.isDirectory() && path.list().length > 0;
082    }
083
084    @Override
085    public void create(String name, int size) {
086        ChronicleMQAppender<M> cq = ChronicleMQAppender.create(new File(basePath.toFile(), name), size, retentionDuration);
087        try {
088            cq.close();
089        } catch (Exception e) {
090            throw new RuntimeException("Can not create and close " + name, e);
091        }
092    }
093
094    @Override
095    public boolean delete(String name) {
096        File path = new File(basePath.toFile(), name);
097        if (path.isDirectory()) {
098            deleteQueueBasePath(path);
099            return true;
100        }
101        return false;
102    }
103
104
105    @Override
106    public MQAppender<M> createAppender(String name) {
107        return ChronicleMQAppender.open(new File(basePath.toFile(), name), retentionDuration);
108    }
109
110    @Override
111    protected MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group) {
112        Collection<ChronicleMQTailer<M>> pTailers = new ArrayList<>(partitions.size());
113        partitions.forEach(partition -> pTailers.add((ChronicleMQTailer<M>) ((ChronicleMQAppender<M>) getAppender(partition.name())).createTailer(partition, group)));
114        if (pTailers.size() == 1) {
115            return pTailers.iterator().next();
116        }
117        return new ChronicleCompoundMQTailer<>(pTailers, group);
118    }
119
120    @Override
121    protected MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener) {
122        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
123
124    }
125
126    private static void deleteQueueBasePath(File basePath) {
127        try {
128            log.info("Removing Chronicle Queues directory: " + basePath);
129            // Performs a recursive delete if the directory contains only chronicles files
130            try (Stream<Path> paths = Files.list(basePath.toPath())) {
131                int count = (int) paths.filter(path -> (Files.isRegularFile(path) && !path.toString().endsWith(".cq4"))).count();
132                if (count > 0) {
133                    String msg = "ChronicleMQueue basePath: " + basePath + " contains unknown files, please choose another basePath";
134                    log.error(msg);
135                    throw new IllegalArgumentException(msg);
136                }
137            }
138            deleteDirectory(basePath);
139        } catch (IOException e) {
140            String msg = "Can not remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
141            log.error(msg, e);
142            throw new IllegalArgumentException(msg, e);
143        }
144    }
145
146}