001/* 002 * (C) Copyright 2016-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.dbs; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 024 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.TimeUnit; 035import java.util.stream.Stream; 036 037import org.apache.commons.lang.StringUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.Lock; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.PartialList; 043import org.nuxeo.ecm.core.api.ScrollResult; 044import org.nuxeo.ecm.core.blob.BlobManager; 045import org.nuxeo.ecm.core.model.LockManager; 046import org.nuxeo.ecm.core.model.Session; 047import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 048import org.nuxeo.ecm.core.storage.FulltextConfiguration; 049import org.nuxeo.ecm.core.storage.State; 050import org.nuxeo.ecm.core.storage.State.StateDiff; 051import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 052import org.nuxeo.runtime.metrics.MetricsService; 053 054import com.codahale.metrics.MetricRegistry; 055import com.codahale.metrics.SharedMetricRegistries; 056import com.google.common.cache.Cache; 057import com.google.common.cache.CacheBuilder; 058import com.google.common.collect.ImmutableMap; 059import com.google.common.collect.Ordering; 060 061/** 062 * The DBS Cache layer used to cache some method call of real repository 063 * 064 * @since 8.10 065 */ 066public class DBSCachingRepository implements DBSRepository { 067 068 private static final Log log = LogFactory.getLog(DBSCachingRepository.class); 069 070 private static final Random RANDOM = new Random(); 071 072 private final DBSRepository repository; 073 074 private final Cache<String, State> cache; 075 076 private final Cache<String, String> childCache; 077 078 private DBSClusterInvalidator clusterInvalidator; 079 080 private final DBSInvalidations invalidations; 081 082 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 083 084 public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) { 085 this.repository = repository; 086 // Init caches 087 cache = newCache(descriptor); 088 registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache")); 089 childCache = newCache(descriptor); 090 registry.registerAll( 091 GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache")); 092 if (log.isInfoEnabled()) { 093 log.info(String.format("DBS cache activated on '%s' repository", repository.getName())); 094 } 095 invalidations = new DBSInvalidations(); 096 if (descriptor.isClusteringEnabled()) { 097 initClusterInvalidator(descriptor); 098 } 099 } 100 101 protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) { 102 CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder(); 103 builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats(); 104 if (descriptor.cacheConcurrencyLevel != null) { 105 builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue()); 106 } 107 if (descriptor.cacheMaxSize != null) { 108 builder = builder.maximumSize(descriptor.cacheMaxSize.longValue()); 109 } 110 return builder.build(); 111 } 112 113 protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) { 114 String nodeId = descriptor.clusterNodeId; 115 if (StringUtils.isBlank(nodeId)) { 116 nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)); 117 log.warn("Missing cluster node id configuration, please define it explicitly " 118 + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId); 119 } else { 120 nodeId = nodeId.trim(); 121 } 122 clusterInvalidator = createClusterInvalidator(descriptor); 123 clusterInvalidator.initialize(nodeId, getName()); 124 } 125 126 protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) { 127 Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass; 128 if (klass == null) { 129 throw new NuxeoException( 130 "Unable to get cluster invalidator class from descriptor whereas clustering is enabled"); 131 } 132 try { 133 return klass.newInstance(); 134 } catch (ReflectiveOperationException e) { 135 throw new NuxeoException(e); 136 } 137 138 } 139 140 public void begin() { 141 repository.begin(); 142 processReceivedInvalidations(); 143 } 144 145 @Override 146 public void commit() { 147 repository.commit(); 148 sendInvalidationsToOther(); 149 processReceivedInvalidations(); 150 } 151 152 @Override 153 public void rollback() { 154 repository.rollback(); 155 } 156 157 @Override 158 public void shutdown() { 159 repository.shutdown(); 160 // Clear caches 161 cache.invalidateAll(); 162 childCache.invalidateAll(); 163 // Remove metrics 164 String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache"); 165 String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache"); 166 registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName)); 167 if (log.isInfoEnabled()) { 168 log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName())); 169 } 170 // Send invalidations 171 if (clusterInvalidator != null) { 172 clusterInvalidator.sendInvalidations(new DBSInvalidations(true)); 173 } 174 175 } 176 177 @Override 178 public State readState(String id) { 179 State state = cache.getIfPresent(id); 180 if (state == null) { 181 state = repository.readState(id); 182 if (state != null) { 183 putInCache(state); 184 } 185 } 186 return state; 187 } 188 189 @Override 190 public State readPartialState(String id, Collection<String> keys) { 191 // bypass caches, as the goal of this method is to not trash caches for one-shot reads 192 return repository.readPartialState(id, keys); 193 } 194 195 @Override 196 public List<State> readStates(List<String> ids) { 197 ImmutableMap<String, State> statesMap = cache.getAllPresent(ids); 198 List<String> idsToRetrieve = new ArrayList<>(ids); 199 idsToRetrieve.removeAll(statesMap.keySet()); 200 // Read missing states from repository 201 List<State> states = repository.readStates(idsToRetrieve); 202 // Cache them 203 states.forEach(this::putInCache); 204 // Add previous cached one 205 states.addAll(statesMap.values()); 206 // Sort them 207 states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids))); 208 return states; 209 } 210 211 @Override 212 public void createState(State state) { 213 repository.createState(state); 214 // don't cache new state, it is inefficient on mass import 215 } 216 217 @Override 218 public void createStates(List<State> states) { 219 repository.createStates(states); 220 // don't cache new states, it is inefficient on mass import 221 } 222 223 @Override 224 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 225 repository.updateState(id, diff, changeTokenUpdater); 226 invalidate(id); 227 } 228 229 @Override 230 public void deleteStates(Set<String> ids) { 231 repository.deleteStates(ids); 232 invalidateAll(ids); 233 } 234 235 @Override 236 public State readChildState(String parentId, String name, Set<String> ignored) { 237 processReceivedInvalidations(); 238 239 String childCacheKey = computeChildCacheKey(parentId, name); 240 String stateId = childCache.getIfPresent(childCacheKey); 241 if (stateId != null) { 242 State state = cache.getIfPresent(stateId); 243 if (state != null) { 244 // As we don't have invalidation for childCache we need to check if retrieved state is the right one 245 // and not a previous document which was moved or renamed 246 if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) { 247 return state; 248 } else { 249 // We can invalidate the entry in cache as the document seemed to be moved or renamed 250 childCache.invalidate(childCacheKey); 251 } 252 } 253 } 254 State state = repository.readChildState(parentId, name, ignored); 255 putInCache(state); 256 return state; 257 } 258 259 private void putInCache(State state) { 260 if (state != null) { 261 String stateId = state.get(KEY_ID).toString(); 262 cache.put(stateId, state); 263 Object stateParentId = state.get(KEY_PARENT_ID); 264 if (stateParentId != null) { 265 childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId); 266 } 267 } 268 } 269 270 private String computeChildCacheKey(String parentId, String name) { 271 return parentId + '_' + name; 272 } 273 274 private void invalidate(String id) { 275 invalidateAll(Collections.singleton(id)); 276 } 277 278 private void invalidateAll(Collection<String> ids) { 279 cache.invalidateAll(ids); 280 if (clusterInvalidator != null) { 281 synchronized (invalidations) { 282 invalidations.addAll(ids); 283 } 284 } 285 } 286 287 protected void sendInvalidationsToOther() { 288 synchronized (invalidations) { 289 if (!invalidations.isEmpty()) { 290 if (clusterInvalidator != null) { 291 clusterInvalidator.sendInvalidations(invalidations); 292 } 293 invalidations.clear(); 294 } 295 } 296 } 297 298 protected void processReceivedInvalidations() { 299 if (clusterInvalidator != null) { 300 DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations(); 301 if (invalidations.all) { 302 cache.invalidateAll(); 303 childCache.invalidateAll(); 304 } else if (invalidations.ids != null) { 305 cache.invalidateAll(invalidations.ids); 306 } 307 } 308 } 309 310 @Override 311 public BlobManager getBlobManager() { 312 return repository.getBlobManager(); 313 } 314 315 @Override 316 public FulltextConfiguration getFulltextConfiguration() { 317 return repository.getFulltextConfiguration(); 318 } 319 320 @Override 321 public boolean isFulltextDisabled() { 322 return repository.isFulltextDisabled(); 323 } 324 325 @Override 326 public boolean isChangeTokenEnabled() { 327 return repository.isChangeTokenEnabled(); 328 } 329 330 @Override 331 public String getRootId() { 332 return repository.getRootId(); 333 } 334 335 @Override 336 public String generateNewId() { 337 return repository.generateNewId(); 338 } 339 340 @Override 341 public boolean hasChild(String parentId, String name, Set<String> ignored) { 342 return repository.hasChild(parentId, name, ignored); 343 } 344 345 @Override 346 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 347 return repository.queryKeyValue(key, value, ignored); 348 } 349 350 @Override 351 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 352 return repository.queryKeyValue(key1, value1, key2, value2, ignored); 353 } 354 355 @Override 356 public Stream<State> getDescendants(String id, Set<String> keys) { 357 return repository.getDescendants(id, keys); 358 } 359 360 @Override 361 public Stream<State> getDescendants(String id, Set<String> keys, int limit) { 362 return repository.getDescendants(id, keys, limit); 363 } 364 365 @Override 366 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 367 return repository.queryKeyValuePresence(key, value, ignored); 368 } 369 370 @Override 371 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 372 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 373 return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo); 374 } 375 376 @Override 377 public LockManager getLockManager() { 378 return repository.getLockManager(); 379 } 380 381 @Override 382 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 383 return repository.scroll(evaluator, batchSize, keepAliveSeconds); 384 } 385 386 @Override 387 public ScrollResult<String> scroll(String scrollId) { 388 return repository.scroll(scrollId); 389 } 390 391 @Override 392 public Lock getLock(String id) { 393 return repository.getLock(id); 394 } 395 396 @Override 397 public Lock setLock(String id, Lock lock) { 398 return repository.setLock(id, lock); 399 } 400 401 @Override 402 public Lock removeLock(String id, String owner) { 403 return repository.removeLock(id, owner); 404 } 405 406 @Override 407 public void closeLockManager() { 408 repository.closeLockManager(); 409 } 410 411 @Override 412 public void clearLockManagerCaches() { 413 repository.clearLockManagerCaches(); 414 } 415 416 @Override 417 public String getName() { 418 return repository.getName(); 419 } 420 421 @Override 422 public Session getSession() { 423 if (repository instanceof DBSRepositoryBase) { 424 return ((DBSRepositoryBase) repository).getSession(this); 425 } 426 return repository.getSession(); 427 } 428 429 @Override 430 public int getActiveSessionsCount() { 431 return repository.getActiveSessionsCount(); 432 } 433 434 @Override 435 public void markReferencedBinaries() { 436 repository.markReferencedBinaries(); 437 } 438 439}