001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *  
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.api;
020
021import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
022
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.UUID;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.function.Function;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * A cursor service which holds cursors on DB in order to perform scroll operations.
036 *
037 * @param <C> The cursor type.
038 * @param <O> The cursor item type.
039 * @since 9.1
040 */
041public class CursorService<C, O> {
042
043    private static final Log log = LogFactory.getLog(CursorService.class);
044
045    protected Map<String, CursorResult<C, O>> cursorResults = new ConcurrentHashMap<>();
046
047    public void checkForTimedOutScroll() {
048        cursorResults.entrySet().stream().forEach(e -> isScrollTimedOut(e.getKey(), e.getValue()));
049    }
050
051    protected boolean isScrollTimedOut(String scrollId, CursorResult<C, O> cursorResult) {
052        if (cursorResult.timedOut()) {
053            if (unregisterCursor(scrollId)) {
054                log.warn("Scroll '" + scrollId + "' timed out");
055            }
056            return true;
057        }
058        return false;
059    }
060
061    /**
062     * Registers the input {@link C} and generates a new <code>scrollId</code> to associate with.
063     *
064     * @return the scrollId associated to the cursor.
065     */
066    public String registerCursor(C cursor, int batchSize, int keepAliveSeconds) {
067        return registerCursorResult(new CursorResult<>(cursor, batchSize, keepAliveSeconds));
068    }
069
070    /**
071     * Registers the input {@link C} associated to the input <code>scrollId</code>.
072     *
073     * @return the scrollId associated to the cursor.
074     */
075    public String registerCursor(String scrollId, C cursor, int batchSize, int keepAliveSeconds) {
076        return registerCursorResult(scrollId, new CursorResult<>(cursor, batchSize, keepAliveSeconds));
077    }
078
079    /**
080     * Registers the input {@link CursorResult} and generates a new <code>scrollId</code> to associate with.
081     *
082     * @return the scrollId associated to the cursor result.
083     */
084    public String registerCursorResult(CursorResult<C, O> cursorResult) {
085        String scrollId = UUID.randomUUID().toString();
086        return registerCursorResult(scrollId, cursorResult);
087    }
088
089    /**
090     * Registers the input {@link CursorResult} associated to the input <code>scrollId</code>.
091     *
092     * @return the scrollId associated to the cursor result.
093     */
094    public String registerCursorResult(String scrollId, CursorResult<C, O> cursorResult) {
095        cursorResults.put(scrollId, cursorResult);
096        return scrollId;
097    }
098
099    /**
100     * Unregisters cursor associated to the input <code>scrollId</code>.
101     *
102     * @param scrollId The scoll id of {@link CursorResult} to unregister
103     * @return Whether or not the cursor was unregistered.
104     */
105    public boolean unregisterCursor(String scrollId) {
106        CursorResult<C, O> cursorResult = cursorResults.remove(scrollId);
107        if (cursorResult != null) {
108            cursorResult.close();
109            return true;
110        }
111        return false;
112    }
113
114    /**
115     * @return the next batch of cursor associated to the input <code>scrollId</code>
116     */
117    public ScrollResult scroll(String scrollId, Function<O, String> idExtractor) {
118        CursorResult<C, O> cursorResult = cursorResults.get(scrollId);
119        if (cursorResult == null) {
120            throw new NuxeoException("Unknown or timed out scrollId");
121        } else if (isScrollTimedOut(scrollId, cursorResult)) {
122            throw new NuxeoException("Timed out scrollId");
123        }
124        cursorResult.touch();
125        List<String> ids = new ArrayList<>(cursorResult.getBatchSize());
126        synchronized (cursorResult) {
127            if (!cursorResult.hasNext()) {
128                unregisterCursor(scrollId);
129                return emptyResult();
130            }
131            while (ids.size() < cursorResult.getBatchSize()) {
132                if (!cursorResult.hasNext()) {
133                    // Don't unregister cursor here because we don't want scroll API to throw an exception during next
134                    // call as it's a legitimate case - but close cursor
135                    cursorResult.close();
136                    break;
137                } else {
138                    O obj = cursorResult.next();
139                    String id = idExtractor.apply(obj);
140                    if (id == null) {
141                        log.error("Got a document without id: " + obj);
142                    } else {
143                        ids.add(id);
144                    }
145                }
146            }
147        }
148        return new ScrollResultImpl(scrollId, ids);
149    }
150
151    /**
152     * Clear and close all cursors owned by this service.
153     */
154    public void clear() {
155        Iterator<CursorResult<C, O>> values = cursorResults.values().iterator();
156        while (values.hasNext()) {
157            values.next().close();
158            values.remove();
159        }
160    }
161
162}