001/* 002 * (C) Copyright 2019 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.time.Duration; 022import java.util.EnumSet; 023import java.util.Map; 024 025import org.apache.commons.lang3.StringUtils; 026import org.apache.logging.log4j.LogManager; 027import org.apache.logging.log4j.Logger; 028import org.nuxeo.common.utils.DurationUtils; 029import org.nuxeo.lib.stream.computation.Record; 030import org.nuxeo.lib.stream.computation.RecordFilter; 031import org.nuxeo.lib.stream.log.LogOffset; 032 033/** 034 * Base for filter that saves long record's value in an alternate storage. The record is then marked with an internal 035 * flag and contains an empty value. 036 * 037 * @since 11.1 038 */ 039public abstract class BaseOverflowRecordFilter implements RecordFilter { 040 private static final Logger log = LogManager.getLogger(BaseOverflowRecordFilter.class); 041 042 public static final String STORE_NAME_OPTION = "storeName"; 043 044 public static final String DEFAULT_STORE_NAME = "default"; 045 046 public static final String STORE_TTL_OPTION = "storeTTL"; 047 048 public static final String DEFAULT_STORE_TTL = "1h"; 049 050 public static final String THRESHOLD_SIZE_OPTION = "thresholdSize"; 051 052 public static final int DEFAULT_THRESHOLD_SIZE = 1_000_000; 053 054 public static final String PREFIX_OPTION = "prefix"; 055 056 public static final String DEFAULT_PREFIX = "bigRecord:"; 057 058 protected String prefix; 059 060 protected int thresholdSize; 061 062 protected Duration storeTTL; 063 064 protected String storeName; 065 066 /** 067 * Sets the value associated to the key. 068 */ 069 protected abstract void storeValue(String key, byte[] data); 070 071 /** 072 * Fetches a value previously stored by {@link #storeValue(String, byte[])} 073 * 074 * @return the value, or {@code null} if there is no value 075 */ 076 protected abstract byte[] fetchValue(String key); 077 078 @Override 079 public void init(Map<String, String> options) { 080 storeName = options.getOrDefault(STORE_NAME_OPTION, DEFAULT_STORE_NAME); 081 prefix = options.getOrDefault(PREFIX_OPTION, DEFAULT_PREFIX); 082 thresholdSize = parseIntOrDefault(options.get(THRESHOLD_SIZE_OPTION), DEFAULT_THRESHOLD_SIZE); 083 storeTTL = DurationUtils.parse(options.getOrDefault(STORE_TTL_OPTION, DEFAULT_STORE_TTL)); 084 } 085 086 protected int parseIntOrDefault(String valueAsString, int defaultValue) { 087 if (StringUtils.isEmpty(valueAsString)) { 088 return defaultValue; 089 } 090 try { 091 return Integer.parseInt(valueAsString); 092 } catch (NumberFormatException e) { 093 log.error("Invalid number for RecordFilter option: " + valueAsString, e); 094 return defaultValue; 095 } 096 } 097 098 @Override 099 public Record beforeAppend(Record record) { 100 if (record.getData().length <= getThresholdSize()) { 101 return record; 102 } 103 if (log.isDebugEnabled()) { 104 log.debug(String.format("Record: %s overflow value of size: %d", record.getKey(), record.getData().length)); 105 } 106 EnumSet<Record.Flag> flags = EnumSet.copyOf(record.getFlags()); 107 flags.add(Record.Flag.EXTERNAL_VALUE); 108 storeValue(getUniqRecordKey(record), record.getData()); 109 return new Record(record.getKey(), null, record.getWatermark(), flags); 110 } 111 112 protected String getUniqRecordKey(Record record) { 113 // this is needed to support different records using an identical key 114 return String.format("%s:%d", record.getKey(), record.getWatermark()); 115 } 116 117 @Override 118 public Record afterRead(Record record, LogOffset offset) { 119 if (record.getFlags().contains(Record.Flag.EXTERNAL_VALUE) && record.getData().length == 0) { 120 byte[] value = fetchValue(getUniqRecordKey(record)); 121 if (log.isDebugEnabled()) { 122 log.debug(String.format("Record: %s retrieve value of size: %d", record.getKey(), 123 record.getData().length)); 124 } 125 if (value == null || value.length == 0) { 126 log.error(String.format("Record %s offset %s value not found, the record is lost, skipping", 127 record.toString(), offset)); 128 return null; 129 } 130 EnumSet<Record.Flag> flags = record.getFlags(); 131 flags.remove(Record.Flag.EXTERNAL_VALUE); 132 return new Record(record.getKey(), value, record.getWatermark(), flags); 133 } 134 return record; 135 } 136 137 public int getThresholdSize() { 138 return thresholdSize; 139 } 140 141 public void setThresholdSize(int thresholdSize) { 142 this.thresholdSize = thresholdSize; 143 } 144 145 public String getStoreName() { 146 return storeName; 147 } 148 149 public void setStoreName(String storeName) { 150 this.storeName = storeName; 151 } 152 153 public Duration getStoreTTL() { 154 return storeTTL; 155 } 156 157 public void setStoreTTL(Duration storeTTL) { 158 this.storeTTL = storeTTL; 159 } 160 161 public String getPrefix() { 162 return prefix; 163 } 164 165 public void setPrefix(String prefix) { 166 this.prefix = prefix; 167 } 168 169 protected String getPrefixedKey(String recordKey) { 170 return getPrefix() + recordKey; 171 } 172}