001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.api; 020 021import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 022 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.function.Function; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034/** 035 * A cursor service which holds cursors on DB in order to perform scroll operations. 036 * 037 * @param <C> The cursor type. 038 * @param <O> The cursor item type. 039 * @param <R> The result type. 040 * @since 9.1 041 */ 042public class CursorService<C, O, R> { 043 044 private static final Log log = LogFactory.getLog(CursorService.class); 045 046 protected final Map<String, CursorResult<C, O>> cursorResults = new ConcurrentHashMap<>(); 047 048 protected final Function<O, R> extractor; 049 050 public CursorService(Function<O, R> extractor) { 051 this.extractor = extractor; 052 } 053 054 public void checkForTimedOutScroll() { 055 cursorResults.forEach(this::isScrollTimedOut); 056 } 057 058 protected boolean isScrollTimedOut(String scrollId, CursorResult<C, O> cursorResult) { 059 if (cursorResult.timedOut()) { 060 if (unregisterCursor(scrollId)) { 061 log.warn("Scroll '" + scrollId + "' timed out"); 062 } 063 return true; 064 } 065 return false; 066 } 067 068 /** 069 * Registers the input {@link C} and generates a new <code>scrollId</code> to associate with. 070 * 071 * @return the scrollId associated to the cursor. 072 */ 073 public String registerCursor(C cursor, int batchSize, int keepAliveSeconds) { 074 return registerCursorResult(new CursorResult<>(cursor, batchSize, keepAliveSeconds)); 075 } 076 077 /** 078 * Registers the input {@link C} associated to the input <code>scrollId</code>. 079 * 080 * @return the scrollId associated to the cursor. 081 */ 082 public String registerCursor(String scrollId, C cursor, int batchSize, int keepAliveSeconds) { 083 return registerCursorResult(scrollId, new CursorResult<>(cursor, batchSize, keepAliveSeconds)); 084 } 085 086 /** 087 * Registers the input {@link CursorResult} and generates a new <code>scrollId</code> to associate with. 088 * 089 * @return the scrollId associated to the cursor result. 090 */ 091 public String registerCursorResult(CursorResult<C, O> cursorResult) { 092 String scrollId = UUID.randomUUID().toString(); 093 return registerCursorResult(scrollId, cursorResult); 094 } 095 096 /** 097 * Registers the input {@link CursorResult} associated to the input <code>scrollId</code>. 098 * 099 * @return the scrollId associated to the cursor result. 100 */ 101 public String registerCursorResult(String scrollId, CursorResult<C, O> cursorResult) { 102 cursorResults.put(scrollId, cursorResult); 103 return scrollId; 104 } 105 106 /** 107 * Unregisters cursor associated to the input <code>scrollId</code>. 108 * 109 * @param scrollId The scoll id of {@link CursorResult} to unregister 110 * @return Whether or not the cursor was unregistered. 111 */ 112 public boolean unregisterCursor(String scrollId) { 113 CursorResult<C, O> cursorResult = cursorResults.remove(scrollId); 114 if (cursorResult != null) { 115 cursorResult.close(); 116 return true; 117 } 118 return false; 119 } 120 121 /** 122 * @return the next batch of cursor associated to the input <code>scrollId</code> 123 */ 124 public ScrollResult<R> scroll(String scrollId) { 125 CursorResult<C, O> cursorResult = cursorResults.get(scrollId); 126 if (cursorResult == null) { 127 throw new NuxeoException("Unknown or timed out scrollId"); 128 } else if (isScrollTimedOut(scrollId, cursorResult)) { 129 throw new NuxeoException("Timed out scrollId"); 130 } 131 cursorResult.touch(); 132 List<R> results = new ArrayList<>(cursorResult.getBatchSize()); 133 synchronized (cursorResult) { 134 if (!cursorResult.hasNext()) { 135 unregisterCursor(scrollId); 136 return emptyResult(); 137 } 138 while (results.size() < cursorResult.getBatchSize()) { 139 if (!cursorResult.hasNext()) { 140 // Don't unregister cursor here because we don't want scroll API to throw an exception during next 141 // call as it's a legitimate case - but close cursor 142 cursorResult.close(); 143 break; 144 } else { 145 O obj = cursorResult.next(); 146 R result = extractor.apply(obj); 147 if (result == null) { 148 log.error("Got a document without result: " + obj); 149 } else { 150 results.add(result); 151 } 152 } 153 } 154 } 155 return new ScrollResultImpl<>(scrollId, results); 156 } 157 158 /** 159 * Clear and close all cursors owned by this service. 160 */ 161 public void clear() { 162 Iterator<CursorResult<C, O>> values = cursorResults.values().iterator(); 163 while (values.hasNext()) { 164 values.next().close(); 165 values.remove(); 166 } 167 } 168 169}