001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender; 024import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener; 026import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer; 027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.AbstractMQManager; 028 029import java.io.Externalizable; 030import java.io.File; 031import java.io.IOException; 032import java.nio.file.Files; 033import java.nio.file.Path; 034import java.util.ArrayList; 035import java.util.Collection; 036import java.util.stream.Stream; 037 038import static org.apache.commons.io.FileUtils.deleteDirectory; 039 040/** 041 * @since 9.2 042 */ 043public class ChronicleMQManager<M extends Externalizable> extends AbstractMQManager<M> { 044 private static final Log log = LogFactory.getLog(ChronicleMQManager.class); 045 046 /** 047 * Default retention duration for queue files 048 */ 049 public static final String DEFAULT_RETENTION_DURATION = "4d"; 050 051 private final Path basePath; 052 053 private final String retentionDuration; 054 055 public ChronicleMQManager(Path basePath) { 056 this.basePath = basePath; 057 this.retentionDuration = DEFAULT_RETENTION_DURATION; 058 } 059 060 /** 061 * Constructor 062 * 063 * @param basePath the base path. 064 * @param retentionDuration the retention duration. It is the time period the queue files will be retained. Once the 065 * retention duration expires, the older files are candidates for being purged. The property can be 066 * expressed as: 15s, 30m, 1h, 4d ... (where 's' is expressing a duration in seconds, 'm' in minutes,'h' 067 * in hours and 'd' in days) 068 */ 069 public ChronicleMQManager(Path basePath, String retentionDuration) { 070 this.basePath = basePath; 071 this.retentionDuration = retentionDuration == null ? DEFAULT_RETENTION_DURATION : retentionDuration; 072 } 073 074 public String getBasePath() { 075 return basePath.toAbsolutePath().toString(); 076 } 077 078 @Override 079 public boolean exists(String name) { 080 File path = new File(basePath.toFile(), name); 081 return path.isDirectory() && path.list().length > 0; 082 } 083 084 @Override 085 public void create(String name, int size) { 086 ChronicleMQAppender<M> cq = ChronicleMQAppender.create(new File(basePath.toFile(), name), size, retentionDuration); 087 try { 088 cq.close(); 089 } catch (Exception e) { 090 throw new RuntimeException("Can not create and close " + name, e); 091 } 092 } 093 094 @Override 095 public boolean delete(String name) { 096 File path = new File(basePath.toFile(), name); 097 if (path.isDirectory()) { 098 deleteQueueBasePath(path); 099 return true; 100 } 101 return false; 102 } 103 104 105 @Override 106 public MQAppender<M> createAppender(String name) { 107 return ChronicleMQAppender.open(new File(basePath.toFile(), name), retentionDuration); 108 } 109 110 @Override 111 protected MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group) { 112 Collection<ChronicleMQTailer<M>> pTailers = new ArrayList<>(partitions.size()); 113 partitions.forEach(partition -> pTailers.add((ChronicleMQTailer<M>) ((ChronicleMQAppender<M>) getAppender(partition.name())).createTailer(partition, group))); 114 if (pTailers.size() == 1) { 115 return pTailers.iterator().next(); 116 } 117 return new ChronicleCompoundMQTailer<>(pTailers, group); 118 } 119 120 @Override 121 protected MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener) { 122 throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation"); 123 124 } 125 126 private static void deleteQueueBasePath(File basePath) { 127 try { 128 log.info("Removing Chronicle Queues directory: " + basePath); 129 // Performs a recursive delete if the directory contains only chronicles files 130 try (Stream<Path> paths = Files.list(basePath.toPath())) { 131 int count = (int) paths.filter(path -> (Files.isRegularFile(path) && !path.toString().endsWith(".cq4"))).count(); 132 if (count > 0) { 133 String msg = "ChronicleMQueue basePath: " + basePath + " contains unknown files, please choose another basePath"; 134 log.error(msg); 135 throw new IllegalArgumentException(msg); 136 } 137 } 138 deleteDirectory(basePath); 139 } catch (IOException e) { 140 String msg = "Can not remove Chronicle Queues directory: " + basePath + " " + e.getMessage(); 141 log.error(msg, e); 142 throw new IllegalArgumentException(msg, e); 143 } 144 } 145 146}