001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *  
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.api;
020
021import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
022
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.UUID;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.function.Function;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034/**
035 * A cursor service which holds cursors on DB in order to perform scroll operations.
036 *
037 * @param <C> The cursor type.
038 * @param <O> The cursor item type.
039 * @param <R> The result type.
040 * @since 9.1
041 */
042public class CursorService<C, O, R> {
043
044    private static final Log log = LogFactory.getLog(CursorService.class);
045
046    protected final Map<String, CursorResult<C, O>> cursorResults = new ConcurrentHashMap<>();
047
048    protected final Function<O, R> extractor;
049
050    public CursorService(Function<O, R> extractor) {
051        this.extractor = extractor;
052    }
053
054    public void checkForTimedOutScroll() {
055        cursorResults.forEach(this::isScrollTimedOut);
056    }
057
058    protected boolean isScrollTimedOut(String scrollId, CursorResult<C, O> cursorResult) {
059        if (cursorResult.timedOut()) {
060            if (unregisterCursor(scrollId)) {
061                log.warn("Scroll '" + scrollId + "' timed out");
062            }
063            return true;
064        }
065        return false;
066    }
067
068    /**
069     * Registers the input {@link C} and generates a new <code>scrollId</code> to associate with.
070     *
071     * @return the scrollId associated to the cursor.
072     */
073    public String registerCursor(C cursor, int batchSize, int keepAliveSeconds) {
074        return registerCursorResult(new CursorResult<>(cursor, batchSize, keepAliveSeconds));
075    }
076
077    /**
078     * Registers the input {@link C} associated to the input <code>scrollId</code>.
079     *
080     * @return the scrollId associated to the cursor.
081     */
082    public String registerCursor(String scrollId, C cursor, int batchSize, int keepAliveSeconds) {
083        return registerCursorResult(scrollId, new CursorResult<>(cursor, batchSize, keepAliveSeconds));
084    }
085
086    /**
087     * Registers the input {@link CursorResult} and generates a new <code>scrollId</code> to associate with.
088     *
089     * @return the scrollId associated to the cursor result.
090     */
091    public String registerCursorResult(CursorResult<C, O> cursorResult) {
092        String scrollId = UUID.randomUUID().toString();
093        return registerCursorResult(scrollId, cursorResult);
094    }
095
096    /**
097     * Registers the input {@link CursorResult} associated to the input <code>scrollId</code>.
098     *
099     * @return the scrollId associated to the cursor result.
100     */
101    public String registerCursorResult(String scrollId, CursorResult<C, O> cursorResult) {
102        cursorResults.put(scrollId, cursorResult);
103        return scrollId;
104    }
105
106    /**
107     * Unregisters cursor associated to the input <code>scrollId</code>.
108     *
109     * @param scrollId The scoll id of {@link CursorResult} to unregister
110     * @return Whether or not the cursor was unregistered.
111     */
112    public boolean unregisterCursor(String scrollId) {
113        CursorResult<C, O> cursorResult = cursorResults.remove(scrollId);
114        if (cursorResult != null) {
115            cursorResult.close();
116            return true;
117        }
118        return false;
119    }
120
121    /**
122     * @return the next batch of cursor associated to the input <code>scrollId</code>
123     */
124    public ScrollResult<R> scroll(String scrollId) {
125        CursorResult<C, O> cursorResult = cursorResults.get(scrollId);
126        if (cursorResult == null) {
127            throw new NuxeoException("Unknown or timed out scrollId");
128        } else if (isScrollTimedOut(scrollId, cursorResult)) {
129            throw new NuxeoException("Timed out scrollId");
130        }
131        cursorResult.touch();
132        List<R> results = new ArrayList<>(cursorResult.getBatchSize());
133        synchronized (cursorResult) {
134            if (!cursorResult.hasNext()) {
135                unregisterCursor(scrollId);
136                return emptyResult();
137            }
138            while (results.size() < cursorResult.getBatchSize()) {
139                if (!cursorResult.hasNext()) {
140                    // Don't unregister cursor here because we don't want scroll API to throw an exception during next
141                    // call as it's a legitimate case - but close cursor
142                    cursorResult.close();
143                    break;
144                } else {
145                    O obj = cursorResult.next();
146                    R result = extractor.apply(obj);
147                    if (result == null) {
148                        log.error("Got a document without result: " + obj);
149                    } else {
150                        results.add(result);
151                    }
152                }
153            }
154        }
155        return new ScrollResultImpl<>(scrollId, results);
156    }
157
158    /**
159     * Clear and close all cursors owned by this service.
160     */
161    public void clear() {
162        Iterator<CursorResult<C, O>> values = cursorResults.values().iterator();
163        while (values.hasNext()) {
164            values.next().close();
165            values.remove();
166        }
167    }
168
169}