001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.mqueues;
020
021import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
022
023import java.time.Duration;
024
025/**
026 * A MQueues (for Multiple Queues) is a set of unbounded persisted queues.
027 *
028 * Producers can dispatch {@link Message} on different queues.
029 *
030 * Consumer read {@link Message} using a {@link Tailer}, the position of the tailer can be persisted.
031 *
032 * @since 9.1
033 */
034public interface MQueues<M extends Message> extends AutoCloseable {
035
036    /**
037     * Returns the size of the mqueues: the number of queues.
038     *
039     */
040    int size();
041
042    /**
043     * Append a message into a queue, returns an {@link Offset}.
044     *
045     * This method is thread safe, a queue can be shared by multiple producers.
046     *
047     * @param queue index lower than {@link #size()}
048     */
049    Offset append(int queue, M message);
050
051    /**
052     * Create a new {@link Tailer} associed with the queue index.
053     *
054     * The committed offset is presisted in the default namespace.
055     *
056     * There can be one and only one consumer for queue in a namespace.
057     *
058     * A tailer is not thread safe.
059     *
060     */
061    Tailer<M> createTailer(int queue);
062
063    /**
064     * Create a new {@link Tailer} associed to a queue index, using a specified offset name space.
065     *
066     * The committed offset position is shared by all tailers of the same queue with the same name.
067     *
068     * There can be one and only one consumer for queue in a namespace
069     *
070     * A tailer is not thread safe.
071     *
072     */
073    Tailer<M> createTailer(int queue, String name);
074
075    /**
076     * Wait for consumer to process a message up to the offset.
077     *
078     * The message is processed if a consumer commit its offset (or a bigger one) in the default name space.
079     *
080     * Return true if the message has been consumed, false in case of timeout.
081     */
082    boolean waitFor(Offset offset, Duration timeout) throws InterruptedException;
083
084    /**
085     * Sequential reader for a queue.
086     *
087     * A tailer is not thread safe and should not be shared by multiple threads.
088     *
089     */
090    interface Tailer<M> extends AutoCloseable {
091
092        /**
093         * Read a message from the queue within the timeout.
094         *
095         * @return null if there is no message in the queue after the timeout.
096         */
097        M read(Duration timeout) throws InterruptedException;
098
099        /**
100         * Go to the end of the queue.
101         */
102        void toEnd();
103
104        /**
105         * Go to the beginning of the queue.
106         */
107        void toStart();
108
109        /**
110         * Go just after the last committed message.
111         */
112        void toLastCommitted();
113
114        /**
115         * Commit the offset of the last message returned by read.
116         */
117        Offset commit();
118
119        /**
120         * Returns the associated queue index.
121         */
122        int getQueue();
123    }
124
125}