001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.api; 020 021import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 022 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.function.Function; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034/** 035 * A cursor service which holds cursors on DB in order to perform scroll operations. 036 * 037 * @param <C> The cursor type. 038 * @param <O> The cursor item type. 039 * @since 9.1 040 */ 041public class CursorService<C, O> { 042 043 private static final Log log = LogFactory.getLog(CursorService.class); 044 045 protected Map<String, CursorResult<C, O>> cursorResults = new ConcurrentHashMap<>(); 046 047 public void checkForTimedOutScroll() { 048 cursorResults.entrySet().stream().forEach(e -> isScrollTimedOut(e.getKey(), e.getValue())); 049 } 050 051 protected boolean isScrollTimedOut(String scrollId, CursorResult<C, O> cursorResult) { 052 if (cursorResult.timedOut()) { 053 if (unregisterCursor(scrollId)) { 054 log.warn("Scroll '" + scrollId + "' timed out"); 055 } 056 return true; 057 } 058 return false; 059 } 060 061 /** 062 * Registers the input {@link C} and generates a new <code>scrollId</code> to associate with. 063 * 064 * @return the scrollId associated to the cursor. 065 */ 066 public String registerCursor(C cursor, int batchSize, int keepAliveSeconds) { 067 return registerCursorResult(new CursorResult<>(cursor, batchSize, keepAliveSeconds)); 068 } 069 070 /** 071 * Registers the input {@link C} associated to the input <code>scrollId</code>. 072 * 073 * @return the scrollId associated to the cursor. 074 */ 075 public String registerCursor(String scrollId, C cursor, int batchSize, int keepAliveSeconds) { 076 return registerCursorResult(scrollId, new CursorResult<>(cursor, batchSize, keepAliveSeconds)); 077 } 078 079 /** 080 * Registers the input {@link CursorResult} and generates a new <code>scrollId</code> to associate with. 081 * 082 * @return the scrollId associated to the cursor result. 083 */ 084 public String registerCursorResult(CursorResult<C, O> cursorResult) { 085 String scrollId = UUID.randomUUID().toString(); 086 return registerCursorResult(scrollId, cursorResult); 087 } 088 089 /** 090 * Registers the input {@link CursorResult} associated to the input <code>scrollId</code>. 091 * 092 * @return the scrollId associated to the cursor result. 093 */ 094 public String registerCursorResult(String scrollId, CursorResult<C, O> cursorResult) { 095 cursorResults.put(scrollId, cursorResult); 096 return scrollId; 097 } 098 099 /** 100 * Unregisters cursor associated to the input <code>scrollId</code>. 101 * 102 * @param scrollId The scoll id of {@link CursorResult} to unregister 103 * @return Whether or not the cursor was unregistered. 104 */ 105 public boolean unregisterCursor(String scrollId) { 106 CursorResult<C, O> cursorResult = cursorResults.remove(scrollId); 107 if (cursorResult != null) { 108 cursorResult.close(); 109 return true; 110 } 111 return false; 112 } 113 114 /** 115 * @return the next batch of cursor associated to the input <code>scrollId</code> 116 */ 117 public ScrollResult scroll(String scrollId, Function<O, String> idExtractor) { 118 CursorResult<C, O> cursorResult = cursorResults.get(scrollId); 119 if (cursorResult == null) { 120 throw new NuxeoException("Unknown or timed out scrollId"); 121 } else if (isScrollTimedOut(scrollId, cursorResult)) { 122 throw new NuxeoException("Timed out scrollId"); 123 } 124 cursorResult.touch(); 125 List<String> ids = new ArrayList<>(cursorResult.getBatchSize()); 126 synchronized (cursorResult) { 127 if (!cursorResult.hasNext()) { 128 unregisterCursor(scrollId); 129 return emptyResult(); 130 } 131 while (ids.size() < cursorResult.getBatchSize()) { 132 if (!cursorResult.hasNext()) { 133 // Don't unregister cursor here because we don't want scroll API to throw an exception during next 134 // call as it's a legitimate case - but close cursor 135 cursorResult.close(); 136 break; 137 } else { 138 O obj = cursorResult.next(); 139 String id = idExtractor.apply(obj); 140 if (id == null) { 141 log.error("Got a document without id: " + obj); 142 } else { 143 ids.add(id); 144 } 145 } 146 } 147 } 148 return new ScrollResultImpl(scrollId, ids); 149 } 150 151 /** 152 * Clear and close all cursors owned by this service. 153 */ 154 public void clear() { 155 Iterator<CursorResult<C, O>> values = cursorResults.values().iterator(); 156 while (values.hasNext()) { 157 values.next().close(); 158 values.remove(); 159 } 160 } 161 162}