001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.api;
020
021import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
022
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.UUID;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.function.Function;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * A low level holder of DB cursors that manages cleaning on timeout.
036 *
037 * @param <C> The cursor type.
038 * @param <O> The cursor item type.
039 * @param <R> The result type.
040 * @since 9.1
041 */
042public class CursorService<C, O, R> {
043
044    private static final Log log = LogFactory.getLog(CursorService.class);
045
046    protected final Map<String, CursorResult<C, O>> cursorResults = new ConcurrentHashMap<>();
047
048    protected final Function<O, R> extractor;
049
050    public CursorService(Function<O, R> extractor) {
051        this.extractor = extractor;
052    }
053
054    public void checkForTimedOutScroll() {
055        cursorResults.forEach(this::isScrollTimedOut);
056    }
057
058    protected boolean isScrollTimedOut(String scrollId, CursorResult<C, O> cursorResult) {
059        if (cursorResult.timedOut()) {
060            if (unregisterCursor(scrollId)) {
061                log.warn("Scroll '" + scrollId + "' timed out");
062            }
063            return true;
064        }
065        return false;
066    }
067
068    /**
069     * Registers the input {@link C} and generates a new <code>scrollId</code> to associate with.
070     *
071     * @return the scrollId associated to the cursor.
072     */
073    @SuppressWarnings("resource") // CursorResult is being registered, must not be closed
074    public String registerCursor(C cursor, int batchSize, int keepAliveSeconds) {
075        return registerCursorResult(new CursorResult<>(cursor, batchSize, keepAliveSeconds));
076    }
077
078    /**
079     * Registers the input {@link C} associated to the input <code>scrollId</code>.
080     *
081     * @return the scrollId associated to the cursor.
082     */
083    @SuppressWarnings("resource") // CursorResult is being registered, must not be closed
084    public String registerCursor(String scrollId, C cursor, int batchSize, int keepAliveSeconds) {
085        return registerCursorResult(scrollId, new CursorResult<>(cursor, batchSize, keepAliveSeconds));
086    }
087
088    /**
089     * Registers the input {@link CursorResult} and generates a new <code>scrollId</code> to associate with.
090     *
091     * @return the scrollId associated to the cursor result.
092     */
093    public String registerCursorResult(CursorResult<C, O> cursorResult) {
094        String scrollId = UUID.randomUUID().toString();
095        return registerCursorResult(scrollId, cursorResult);
096    }
097
098    /**
099     * Registers the input {@link CursorResult} associated to the input <code>scrollId</code>.
100     *
101     * @return the scrollId associated to the cursor result.
102     */
103    public String registerCursorResult(String scrollId, CursorResult<C, O> cursorResult) {
104        cursorResults.put(scrollId, cursorResult);
105        return scrollId;
106    }
107
108    /**
109     * Unregisters cursor associated to the input <code>scrollId</code>.
110     *
111     * @param scrollId The scoll id of {@link CursorResult} to unregister
112     * @return Whether or not the cursor was unregistered.
113     */
114    public boolean unregisterCursor(String scrollId) {
115        CursorResult<C, O> cursorResult = cursorResults.remove(scrollId);
116        if (cursorResult != null) {
117            cursorResult.close();
118            return true;
119        }
120        return false;
121    }
122
123    /**
124     * @return the next batch of cursor associated to the input <code>scrollId</code>
125     */
126    @SuppressWarnings("resource") // CursorResult closed at end of scroll or after timeout
127    public ScrollResult<R> scroll(String scrollId) {
128        CursorResult<C, O> cursorResult = cursorResults.get(scrollId);
129        if (cursorResult == null) {
130            throw new NuxeoException("Unknown or timed out scrollId");
131        } else if (isScrollTimedOut(scrollId, cursorResult)) {
132            throw new NuxeoException("Timed out scrollId");
133        }
134        cursorResult.touch();
135        List<R> results = new ArrayList<>(cursorResult.getBatchSize());
136        synchronized (cursorResult) {
137            if (!cursorResult.hasNext()) {
138                unregisterCursor(scrollId);
139                return emptyResult();
140            }
141            while (results.size() < cursorResult.getBatchSize()) {
142                if (!cursorResult.hasNext()) {
143                    // Don't unregister cursor here because we don't want scroll API to throw an exception during next
144                    // call as it's a legitimate case - but close cursor
145                    cursorResult.close();
146                    break;
147                } else {
148                    O obj = cursorResult.next();
149                    R result = extractor.apply(obj);
150                    if (result == null) {
151                        log.error("Got a document without result: " + obj);
152                    } else {
153                        results.add(result);
154                    }
155                }
156            }
157        }
158        return new ScrollResultImpl<>(scrollId, results);
159    }
160
161    /**
162     * Clear and close all cursors owned by this service.
163     */
164    public void clear() {
165        Iterator<CursorResult<C, O>> values = cursorResults.values().iterator();
166        while (values.hasNext()) {
167            values.next().close();
168            values.remove();
169        }
170    }
171
172}