001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.api; 020 021import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 022 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.function.Function; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034/** 035 * A low level holder of DB cursors that manages cleaning on timeout. 036 * 037 * @param <C> The cursor type. 038 * @param <O> The cursor item type. 039 * @param <R> The result type. 040 * @since 9.1 041 */ 042public class CursorService<C, O, R> { 043 044 private static final Log log = LogFactory.getLog(CursorService.class); 045 046 protected final Map<String, CursorResult<C, O>> cursorResults = new ConcurrentHashMap<>(); 047 048 protected final Function<O, R> extractor; 049 050 public CursorService(Function<O, R> extractor) { 051 this.extractor = extractor; 052 } 053 054 public void checkForTimedOutScroll() { 055 cursorResults.forEach(this::isScrollTimedOut); 056 } 057 058 protected boolean isScrollTimedOut(String scrollId, CursorResult<C, O> cursorResult) { 059 if (cursorResult.timedOut()) { 060 if (unregisterCursor(scrollId)) { 061 log.warn("Scroll '" + scrollId + "' timed out"); 062 } 063 return true; 064 } 065 return false; 066 } 067 068 /** 069 * Registers the input {@link C} and generates a new <code>scrollId</code> to associate with. 070 * 071 * @return the scrollId associated to the cursor. 072 */ 073 @SuppressWarnings("resource") // CursorResult is being registered, must not be closed 074 public String registerCursor(C cursor, int batchSize, int keepAliveSeconds) { 075 return registerCursorResult(new CursorResult<>(cursor, batchSize, keepAliveSeconds)); 076 } 077 078 /** 079 * Registers the input {@link C} associated to the input <code>scrollId</code>. 080 * 081 * @return the scrollId associated to the cursor. 082 */ 083 @SuppressWarnings("resource") // CursorResult is being registered, must not be closed 084 public String registerCursor(String scrollId, C cursor, int batchSize, int keepAliveSeconds) { 085 return registerCursorResult(scrollId, new CursorResult<>(cursor, batchSize, keepAliveSeconds)); 086 } 087 088 /** 089 * Registers the input {@link CursorResult} and generates a new <code>scrollId</code> to associate with. 090 * 091 * @return the scrollId associated to the cursor result. 092 */ 093 public String registerCursorResult(CursorResult<C, O> cursorResult) { 094 String scrollId = UUID.randomUUID().toString(); 095 return registerCursorResult(scrollId, cursorResult); 096 } 097 098 /** 099 * Registers the input {@link CursorResult} associated to the input <code>scrollId</code>. 100 * 101 * @return the scrollId associated to the cursor result. 102 */ 103 public String registerCursorResult(String scrollId, CursorResult<C, O> cursorResult) { 104 cursorResults.put(scrollId, cursorResult); 105 return scrollId; 106 } 107 108 /** 109 * Unregisters cursor associated to the input <code>scrollId</code>. 110 * 111 * @param scrollId The scoll id of {@link CursorResult} to unregister 112 * @return Whether or not the cursor was unregistered. 113 */ 114 public boolean unregisterCursor(String scrollId) { 115 CursorResult<C, O> cursorResult = cursorResults.remove(scrollId); 116 if (cursorResult != null) { 117 cursorResult.close(); 118 return true; 119 } 120 return false; 121 } 122 123 /** 124 * @return the next batch of cursor associated to the input <code>scrollId</code> 125 */ 126 @SuppressWarnings("resource") // CursorResult closed at end of scroll or after timeout 127 public ScrollResult<R> scroll(String scrollId) { 128 CursorResult<C, O> cursorResult = cursorResults.get(scrollId); 129 if (cursorResult == null) { 130 throw new NuxeoException("Unknown or timed out scrollId"); 131 } else if (isScrollTimedOut(scrollId, cursorResult)) { 132 throw new NuxeoException("Timed out scrollId"); 133 } 134 cursorResult.touch(); 135 List<R> results = new ArrayList<>(cursorResult.getBatchSize()); 136 synchronized (cursorResult) { 137 if (!cursorResult.hasNext()) { 138 unregisterCursor(scrollId); 139 return emptyResult(); 140 } 141 while (results.size() < cursorResult.getBatchSize()) { 142 if (!cursorResult.hasNext()) { 143 // Don't unregister cursor here because we don't want scroll API to throw an exception during next 144 // call as it's a legitimate case - but close cursor 145 cursorResult.close(); 146 break; 147 } else { 148 O obj = cursorResult.next(); 149 R result = extractor.apply(obj); 150 if (result == null) { 151 log.error("Got a document without result: " + obj); 152 } else { 153 results.add(result); 154 } 155 } 156 } 157 } 158 return new ScrollResultImpl<>(scrollId, results); 159 } 160 161 /** 162 * Clear and close all cursors owned by this service. 163 */ 164 public void clear() { 165 Iterator<CursorResult<C, O>> values = cursorResults.values().iterator(); 166 while (values.hasNext()) { 167 values.next().close(); 168 values.remove(); 169 } 170 } 171 172}