001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.log;
020
021import static java.lang.Math.max;
022import static java.lang.Math.min;
023
024import java.util.List;
025
026/**
027 * Represent the number of messages between 2 offsets
028 *
029 * @since 9.3
030 */
031public class LogLag {
032    protected final long lowerOffset;
033
034    protected final long upperOffset;
035
036    protected final long lag;
037
038    protected final long upper;
039
040    public LogLag(long lowerOffset, long upperOffset) {
041        this(lowerOffset, upperOffset, upperOffset - lowerOffset, upperOffset);
042    }
043
044    public LogLag(long lowerOffset, long upperOffset, long lag, long upper) {
045        this.lowerOffset = lowerOffset;
046        this.upperOffset = upperOffset;
047        this.upper = upper;
048        this.lag = lag;
049    }
050
051    public static LogLag of(long lowerOffset, long upperOffset) {
052        return new LogLag(lowerOffset, upperOffset);
053    }
054
055    public static LogLag of(long lag) {
056        return new LogLag(0, lag, lag, lag);
057    }
058
059    public static LogLag of(List<LogLag> lags) {
060        final long[] end = { 0 };
061        final long[] pos = { Long.MAX_VALUE };
062        final long[] lag = { 0 };
063        final long[] endMessages = { 0 };
064        lags.forEach(item -> {
065            if (item.lowerOffset() > 0) {
066                pos[0] = min(pos[0], item.lowerOffset());
067            }
068            end[0] = max(end[0], item.upperOffset());
069            endMessages[0] += item.upper();
070            lag[0] += item.lag();
071        });
072        return new LogLag(pos[0] == Long.MAX_VALUE ? 0 : pos[0], end[0], lag[0], endMessages[0]);
073    }
074
075    /**
076     * Returns the number of messages between lower and upper offsets.
077     */
078    public long lag() {
079        return lag;
080    }
081
082    /**
083     * Convert the upperOffset into a number of messages.
084     */
085    public long upper() {
086        return upper;
087    }
088
089    /**
090     * Convert the lowerOffset into a number of messages.
091     */
092    public long lower() {
093        return upper - lag;
094    }
095
096    public long upperOffset() {
097        return upperOffset;
098    }
099
100    public long lowerOffset() {
101        return lowerOffset;
102    }
103
104    @Override
105    public String toString() {
106        return "LogLag{" + "lower=" + lower() + ", upper=" + upper + ", lag=" + lag + ", lowerOffset=" + lowerOffset
107                + ", upperOffset=" + upperOffset + '}';
108    }
109
110    @Override
111    public boolean equals(Object o) {
112        if (this == o)
113            return true;
114        if (o == null || getClass() != o.getClass())
115            return false;
116
117        LogLag lag1 = (LogLag) o;
118
119        return lag == lag1.lag;
120    }
121
122    @Override
123    public int hashCode() {
124        return (int) (lag ^ (lag >>> 32));
125    }
126
127}