001/*
002 * (C) Copyright 2019 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.time.Duration;
022import java.util.EnumSet;
023import java.util.Map;
024
025import org.apache.commons.lang3.StringUtils;
026import org.apache.logging.log4j.LogManager;
027import org.apache.logging.log4j.Logger;
028import org.nuxeo.common.utils.DurationUtils;
029import org.nuxeo.lib.stream.computation.Record;
030import org.nuxeo.lib.stream.computation.RecordFilter;
031import org.nuxeo.lib.stream.log.LogOffset;
032
033/**
034 * Base for filter that saves long record's value in an alternate storage. The record is then marked with an internal
035 * flag and contains an empty value.
036 *
037 * @since 11.1
038 */
039public abstract class BaseOverflowRecordFilter implements RecordFilter {
040    private static final Logger log = LogManager.getLogger(BaseOverflowRecordFilter.class);
041
042    public static final String STORE_NAME_OPTION = "storeName";
043
044    public static final String DEFAULT_STORE_NAME = "default";
045
046    public static final String STORE_TTL_OPTION = "storeTTL";
047
048    public static final String DEFAULT_STORE_TTL = "1h";
049
050    public static final String THRESHOLD_SIZE_OPTION = "thresholdSize";
051
052    public static final int DEFAULT_THRESHOLD_SIZE = 1_000_000;
053
054    public static final String PREFIX_OPTION = "prefix";
055
056    public static final String DEFAULT_PREFIX = "bigRecord:";
057
058    protected String prefix;
059
060    protected int thresholdSize;
061
062    protected Duration storeTTL;
063
064    protected String storeName;
065
066    /**
067     * Sets the value associated to the key.
068     */
069    protected abstract void storeValue(String key, byte[] data);
070
071    /**
072     * Fetches a value previously stored by {@link #storeValue(String, byte[])}
073     *
074     * @return the value, or {@code null} if there is no value
075     */
076    protected abstract byte[] fetchValue(String key);
077
078    @Override
079    public void init(Map<String, String> options) {
080        storeName = options.getOrDefault(STORE_NAME_OPTION, DEFAULT_STORE_NAME);
081        prefix = options.getOrDefault(PREFIX_OPTION, DEFAULT_PREFIX);
082        thresholdSize = parseIntOrDefault(options.get(THRESHOLD_SIZE_OPTION), DEFAULT_THRESHOLD_SIZE);
083        storeTTL = DurationUtils.parse(options.getOrDefault(STORE_TTL_OPTION, DEFAULT_STORE_TTL));
084    }
085
086    protected int parseIntOrDefault(String valueAsString, int defaultValue) {
087        if (StringUtils.isEmpty(valueAsString)) {
088            return defaultValue;
089        }
090        try {
091            return Integer.parseInt(valueAsString);
092        } catch (NumberFormatException e) {
093            log.error("Invalid number for RecordFilter option: " + valueAsString, e);
094            return defaultValue;
095        }
096    }
097
098    @Override
099    public Record beforeAppend(Record record) {
100        if (record.getData().length <= getThresholdSize()) {
101            return record;
102        }
103        if (log.isDebugEnabled()) {
104            log.debug(String.format("Record: %s overflow value of size: %d", record.getKey(), record.getData().length));
105        }
106        EnumSet<Record.Flag> flags = EnumSet.copyOf(record.getFlags());
107        flags.add(Record.Flag.EXTERNAL_VALUE);
108        storeValue(getUniqRecordKey(record), record.getData());
109        return new Record(record.getKey(), null, record.getWatermark(), flags);
110    }
111
112    protected String getUniqRecordKey(Record record) {
113        // this is needed to support different records using an identical key
114        return String.format("%s:%d", record.getKey(), record.getWatermark());
115    }
116
117    @Override
118    public Record afterRead(Record record, LogOffset offset) {
119        if (record.getFlags().contains(Record.Flag.EXTERNAL_VALUE) && record.getData().length == 0) {
120            byte[] value = fetchValue(getUniqRecordKey(record));
121            if (log.isDebugEnabled()) {
122                log.debug(String.format("Record: %s retrieve value of size: %d", record.getKey(),
123                        record.getData().length));
124            }
125            if (value == null || value.length == 0) {
126                log.error(String.format("Record %s offset %s value not found, the record is lost, skipping",
127                        record.toString(), offset));
128                return null;
129            }
130            EnumSet<Record.Flag> flags = record.getFlags();
131            flags.remove(Record.Flag.EXTERNAL_VALUE);
132            return new Record(record.getKey(), value, record.getWatermark(), flags);
133        }
134        return record;
135    }
136
137    public int getThresholdSize() {
138        return thresholdSize;
139    }
140
141    public void setThresholdSize(int thresholdSize) {
142        this.thresholdSize = thresholdSize;
143    }
144
145    public String getStoreName() {
146        return storeName;
147    }
148
149    public void setStoreName(String storeName) {
150        this.storeName = storeName;
151    }
152
153    public Duration getStoreTTL() {
154        return storeTTL;
155    }
156
157    public void setStoreTTL(Duration storeTTL) {
158        this.storeTTL = storeTTL;
159    }
160
161    public String getPrefix() {
162        return prefix;
163    }
164
165    public void setPrefix(String prefix) {
166        this.prefix = prefix;
167    }
168
169    protected String getPrefixedKey(String recordKey) {
170        return getPrefix() + recordKey;
171    }
172}