001/* 002 * (C) Copyright 2016-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.dbs; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 024 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.stream.Stream; 032 033import org.nuxeo.ecm.core.api.Lock; 034import org.nuxeo.ecm.core.api.PartialList; 035import org.nuxeo.ecm.core.api.ScrollResult; 036import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 037import org.nuxeo.ecm.core.storage.State; 038import org.nuxeo.ecm.core.storage.State.StateDiff; 039import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 040 041import com.google.common.cache.Cache; 042import com.google.common.collect.ImmutableMap; 043 044/** 045 * The DBS Cache layer used to cache some method call of real repository 046 * 047 * @since 11.1 (introduced in 8.10 as DBSCachingRepository) 048 */ 049public class DBSCachingConnection implements DBSConnection { 050 051 protected final DBSConnection connection; 052 053 private final Cache<String, State> cache; 054 055 private final Cache<String, String> childCache; 056 057 /** 058 * The local invalidations, due to writes to this connection, that should be propagated to other connections (and 059 * other cluster nodes) at post-commit time. 060 * <p> 061 * {@code null} if the repository is not transactional and there is no cluster. 062 */ 063 private final DBSInvalidations invalidations; 064 065 /** 066 * The cluster invalidator, sending invalidations to other cluster nodes. 067 * <p> 068 * {@code null} if there is no cluster. 069 */ 070 private final DBSClusterInvalidator clusterInvalidator; 071 072 /** 073 * The queue of invalidations received from other connections, to be processed at pre-transaction time. 074 * <p> 075 * {@code null} if the repository is not transactional. 076 */ 077 private final DBSInvalidationsQueue invalidationsQueue; 078 079 /** 080 * The propagator of invalidations to other connections. 081 * <p> 082 * {@code null} if the repository is not transactional. 083 */ 084 private final DBSInvalidationsPropagator invalidationsPropagator; 085 086 public DBSCachingConnection(DBSConnection connection, DBSCachingRepository repository) { 087 this.connection = connection; 088 // Init caches 089 if (repository.supportsTransactions()) { 090 // connection-local cache 091 cache = repository.newCache(false); 092 childCache = repository.newChildCache(false); 093 } else { 094 // no transaction, use a repository-wide cache 095 cache = repository.getCache(); 096 childCache = repository.getChildCache(); 097 } 098 // local invalidations 099 invalidationsPropagator = repository.getInvalidationsPropagator(); 100 if (invalidationsPropagator == null) { 101 invalidationsQueue = null; 102 } else { 103 invalidationsQueue = new DBSInvalidationsQueue("dbs-" + this); 104 invalidationsPropagator.addQueue(invalidationsQueue); 105 } 106 // cluster invalidations 107 clusterInvalidator = repository.getClusterInvalidator(); 108 // collected invalidations 109 if (invalidationsPropagator == null && clusterInvalidator == null) { 110 // no transactional backend and no cluster 111 invalidations = null; 112 } else { 113 invalidations = new DBSInvalidations(); 114 } 115 } 116 117 @Override 118 public void close() { 119 connection.close(); 120 if (invalidationsPropagator != null) { 121 invalidationsPropagator.removeQueue(invalidationsQueue); 122 } 123 if (cache != null) { 124 // Clear caches 125 cache.invalidateAll(); 126 childCache.invalidateAll(); 127 } 128 // Send invalidations 129 if (clusterInvalidator != null) { 130 clusterInvalidator.sendInvalidations(new DBSInvalidations(true)); 131 } 132 } 133 134 @Override 135 public void begin() { 136 connection.begin(); 137 processReceivedInvalidations(); 138 } 139 140 @Override 141 public void commit() { 142 connection.commit(); 143 sendInvalidationsToOthers(); 144 processReceivedInvalidations(); 145 } 146 147 @Override 148 public void rollback() { 149 connection.rollback(); 150 } 151 152 @Override 153 public State readState(String id) { 154 State state = cache.getIfPresent(id); 155 if (state == null) { 156 state = connection.readState(id); 157 if (state != null) { 158 putInCache(state); 159 } 160 } 161 return state; 162 } 163 164 @Override 165 public State readPartialState(String id, Collection<String> keys) { 166 // bypass caches, as the goal of this method is to not trash caches for one-shot reads 167 return connection.readPartialState(id, keys); 168 } 169 170 @Override 171 public List<State> readStates(List<String> ids) { 172 ImmutableMap<String, State> statesMap = cache.getAllPresent(ids); 173 List<String> idsToRetrieve = new ArrayList<>(ids); 174 idsToRetrieve.removeAll(statesMap.keySet()); 175 // Read missing states from repository 176 List<State> states = connection.readStates(idsToRetrieve); 177 // Cache them 178 states.forEach(this::putInCache); 179 // Add previous cached one 180 states.addAll(statesMap.values()); 181 return states; 182 } 183 184 @Override 185 public void createState(State state) { 186 connection.createState(state); 187 // don't cache new state, it is inefficient on mass import 188 } 189 190 @Override 191 public void createStates(List<State> states) { 192 connection.createStates(states); 193 // don't cache new states, it is inefficient on mass import 194 } 195 196 @Override 197 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 198 connection.updateState(id, diff, changeTokenUpdater); 199 invalidate(id); 200 } 201 202 @Override 203 public void deleteStates(Set<String> ids) { 204 connection.deleteStates(ids); 205 invalidate(ids); 206 } 207 208 @Override 209 public State readChildState(String parentId, String name, Set<String> ignored) { 210 processReceivedInvalidations(); 211 212 String childCacheKey = computeChildCacheKey(parentId, name); 213 String stateId = childCache.getIfPresent(childCacheKey); 214 if (stateId != null) { 215 State state = cache.getIfPresent(stateId); 216 if (state != null) { 217 // As we don't have invalidation for childCache we need to check if retrieved state is the right one 218 // and not a previous document which was moved or renamed 219 if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) { 220 return state; 221 } else { 222 // We can invalidate the entry in cache as the document seemed to be moved or renamed 223 childCache.invalidate(childCacheKey); 224 } 225 } 226 } 227 State state = connection.readChildState(parentId, name, ignored); 228 putInCache(state); 229 return state; 230 } 231 232 private void putInCache(State state) { 233 if (state != null) { 234 String stateId = state.get(KEY_ID).toString(); 235 cache.put(stateId, state); 236 Object stateParentId = state.get(KEY_PARENT_ID); 237 if (stateParentId != null) { 238 childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId); 239 } 240 } 241 } 242 243 private String computeChildCacheKey(String parentId, String name) { 244 return parentId + '_' + name; 245 } 246 247 private void invalidate(String id) { 248 invalidate(List.of(id)); 249 } 250 251 private void invalidate(Collection<String> ids) { 252 cache.invalidateAll(ids); 253 if (invalidations != null) { 254 invalidations.addAll(ids); 255 } 256 } 257 258 protected void sendInvalidationsToOthers() { 259 if (invalidations != null && !invalidations.isEmpty()) { 260 if (clusterInvalidator != null) { 261 // send to other cluster nodes 262 clusterInvalidator.sendInvalidations(invalidations); 263 } 264 if (invalidationsPropagator != null) { 265 // send to other connections 266 invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue); 267 } 268 invalidations.clear(); 269 } 270 } 271 272 protected void processReceivedInvalidations() { 273 DBSInvalidations invals; 274 // invalidations from other cluster nodes 275 if (clusterInvalidator != null) { 276 invals = clusterInvalidator.receiveInvalidations(); 277 // send cluster invalidations to all other connections 278 if (invals != null && !invals.isEmpty() && invalidationsPropagator != null) { 279 invalidationsPropagator.propagateInvalidations(invals, invalidationsQueue); 280 } 281 } else { 282 invals = null; 283 } 284 // invalidations from other connections 285 if (invalidationsQueue != null) { 286 DBSInvalidations inv = invalidationsQueue.getInvalidations(); 287 if (invals == null) { 288 invals = inv; 289 } else { 290 invals.add(inv); 291 } 292 } 293 // apply invalidations to the cache (connection-local or repository-wide) 294 if (invals != null && !invals.isEmpty()) { 295 if (invals.all) { 296 cache.invalidateAll(); 297 childCache.invalidateAll(); 298 } else if (invals.ids != null) { 299 cache.invalidateAll(invals.ids); 300 } 301 } 302 } 303 304 @Override 305 public String getRootId() { 306 return connection.getRootId(); 307 } 308 309 @Override 310 public String generateNewId() { 311 return connection.generateNewId(); 312 } 313 314 @Override 315 public boolean hasChild(String parentId, String name, Set<String> ignored) { 316 return connection.hasChild(parentId, name, ignored); 317 } 318 319 @Override 320 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 321 return connection.queryKeyValue(key, value, ignored); 322 } 323 324 @Override 325 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 326 return connection.queryKeyValue(key1, value1, key2, value2, ignored); 327 } 328 329 @Override 330 public Stream<State> getDescendants(String id, Set<String> keys) { 331 return connection.getDescendants(id, keys); 332 } 333 334 @Override 335 public Stream<State> getDescendants(String id, Set<String> keys, int limit) { 336 return connection.getDescendants(id, keys, limit); 337 } 338 339 @Override 340 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 341 return connection.queryKeyValuePresence(key, value, ignored); 342 } 343 344 @Override 345 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 346 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 347 return connection.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo); 348 } 349 350 @Override 351 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 352 return connection.scroll(evaluator, batchSize, keepAliveSeconds); 353 } 354 355 @Override 356 public ScrollResult<String> scroll(String scrollId) { 357 return connection.scroll(scrollId); 358 } 359 360 @Override 361 public Lock getLock(String id) { 362 return connection.getLock(id); 363 } 364 365 @Override 366 public Lock setLock(String id, Lock lock) { 367 return connection.setLock(id, lock); 368 } 369 370 @Override 371 public Lock removeLock(String id, String owner) { 372 return connection.removeLock(id, owner); 373 } 374 375 @Override 376 public List<State> queryKeyValueWithOperator(String key1, Object value1, String key2, DBSQueryOperator operator, 377 Object value2, Set<String> ignored) { 378 return connection.queryKeyValueWithOperator(key1, value1, key2, operator, value2, ignored); 379 } 380 381}