001/* 002 * (C) Copyright 2016-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.dbs; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 024 025import java.io.Serializable; 026import java.security.SecureRandom; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.Comparator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035import java.util.concurrent.TimeUnit; 036import java.util.stream.Stream; 037 038import org.apache.commons.lang3.StringUtils; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.api.Lock; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.api.PartialList; 044import org.nuxeo.ecm.core.api.ScrollResult; 045import org.nuxeo.ecm.core.api.repository.FulltextConfiguration; 046import org.nuxeo.ecm.core.blob.BlobManager; 047import org.nuxeo.ecm.core.model.LockManager; 048import org.nuxeo.ecm.core.model.Session; 049import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 050import org.nuxeo.ecm.core.storage.State; 051import org.nuxeo.ecm.core.storage.State.StateDiff; 052import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 053import org.nuxeo.runtime.metrics.MetricsService; 054 055import com.codahale.metrics.MetricRegistry; 056import com.codahale.metrics.SharedMetricRegistries; 057import com.google.common.cache.Cache; 058import com.google.common.cache.CacheBuilder; 059import com.google.common.collect.ImmutableMap; 060import com.google.common.collect.Ordering; 061 062/** 063 * The DBS Cache layer used to cache some method call of real repository 064 * 065 * @since 8.10 066 */ 067public class DBSCachingRepository implements DBSRepository { 068 069 private static final Log log = LogFactory.getLog(DBSCachingRepository.class); 070 071 private static final Random RANDOM = new SecureRandom(); 072 073 private final DBSRepository repository; 074 075 private final Cache<String, State> cache; 076 077 private final Cache<String, String> childCache; 078 079 private DBSClusterInvalidator clusterInvalidator; 080 081 private final DBSInvalidations invalidations; 082 083 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 084 085 public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) { 086 this.repository = repository; 087 // Init caches 088 cache = newCache(descriptor); 089 registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache")); 090 childCache = newCache(descriptor); 091 registry.registerAll( 092 GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache")); 093 if (log.isInfoEnabled()) { 094 log.info(String.format("DBS cache activated on '%s' repository", repository.getName())); 095 } 096 invalidations = new DBSInvalidations(); 097 if (descriptor.isClusteringEnabled()) { 098 initClusterInvalidator(descriptor); 099 } 100 } 101 102 protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) { 103 CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder(); 104 builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats(); 105 if (descriptor.cacheConcurrencyLevel != null) { 106 builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue()); 107 } 108 if (descriptor.cacheMaxSize != null) { 109 builder = builder.maximumSize(descriptor.cacheMaxSize.longValue()); 110 } 111 return builder.build(); 112 } 113 114 protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) { 115 String nodeId = descriptor.clusterNodeId; 116 if (StringUtils.isBlank(nodeId)) { 117 nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)); 118 log.warn("Missing cluster node id configuration, please define it explicitly " 119 + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId); 120 } else { 121 nodeId = nodeId.trim(); 122 } 123 clusterInvalidator = createClusterInvalidator(descriptor); 124 clusterInvalidator.initialize(nodeId, getName()); 125 } 126 127 protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) { 128 Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass; 129 if (klass == null) { 130 throw new NuxeoException( 131 "Unable to get cluster invalidator class from descriptor whereas clustering is enabled"); 132 } 133 try { 134 return klass.newInstance(); 135 } catch (ReflectiveOperationException e) { 136 throw new NuxeoException(e); 137 } 138 139 } 140 141 public void begin() { 142 repository.begin(); 143 processReceivedInvalidations(); 144 } 145 146 @Override 147 public void commit() { 148 repository.commit(); 149 sendInvalidationsToOther(); 150 processReceivedInvalidations(); 151 } 152 153 @Override 154 public void rollback() { 155 repository.rollback(); 156 } 157 158 @Override 159 public void shutdown() { 160 repository.shutdown(); 161 // Clear caches 162 cache.invalidateAll(); 163 childCache.invalidateAll(); 164 // Remove metrics 165 String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache"); 166 String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache"); 167 registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName)); 168 if (log.isInfoEnabled()) { 169 log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName())); 170 } 171 // Send invalidations 172 if (clusterInvalidator != null) { 173 clusterInvalidator.sendInvalidations(new DBSInvalidations(true)); 174 } 175 176 } 177 178 @Override 179 public State readState(String id) { 180 State state = cache.getIfPresent(id); 181 if (state == null) { 182 state = repository.readState(id); 183 if (state != null) { 184 putInCache(state); 185 } 186 } 187 return state; 188 } 189 190 @Override 191 public State readPartialState(String id, Collection<String> keys) { 192 // bypass caches, as the goal of this method is to not trash caches for one-shot reads 193 return repository.readPartialState(id, keys); 194 } 195 196 @Override 197 public List<State> readStates(List<String> ids) { 198 ImmutableMap<String, State> statesMap = cache.getAllPresent(ids); 199 List<String> idsToRetrieve = new ArrayList<>(ids); 200 idsToRetrieve.removeAll(statesMap.keySet()); 201 // Read missing states from repository 202 List<State> states = repository.readStates(idsToRetrieve); 203 // Cache them 204 states.forEach(this::putInCache); 205 // Add previous cached one 206 states.addAll(statesMap.values()); 207 // Sort them 208 states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids))); 209 return states; 210 } 211 212 @Override 213 public void createState(State state) { 214 repository.createState(state); 215 // don't cache new state, it is inefficient on mass import 216 } 217 218 @Override 219 public void createStates(List<State> states) { 220 repository.createStates(states); 221 // don't cache new states, it is inefficient on mass import 222 } 223 224 @Override 225 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 226 repository.updateState(id, diff, changeTokenUpdater); 227 invalidate(id); 228 } 229 230 @Override 231 public void deleteStates(Set<String> ids) { 232 repository.deleteStates(ids); 233 invalidateAll(ids); 234 } 235 236 @Override 237 public State readChildState(String parentId, String name, Set<String> ignored) { 238 processReceivedInvalidations(); 239 240 String childCacheKey = computeChildCacheKey(parentId, name); 241 String stateId = childCache.getIfPresent(childCacheKey); 242 if (stateId != null) { 243 State state = cache.getIfPresent(stateId); 244 if (state != null) { 245 // As we don't have invalidation for childCache we need to check if retrieved state is the right one 246 // and not a previous document which was moved or renamed 247 if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) { 248 return state; 249 } else { 250 // We can invalidate the entry in cache as the document seemed to be moved or renamed 251 childCache.invalidate(childCacheKey); 252 } 253 } 254 } 255 State state = repository.readChildState(parentId, name, ignored); 256 putInCache(state); 257 return state; 258 } 259 260 private void putInCache(State state) { 261 if (state != null) { 262 String stateId = state.get(KEY_ID).toString(); 263 cache.put(stateId, state); 264 Object stateParentId = state.get(KEY_PARENT_ID); 265 if (stateParentId != null) { 266 childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId); 267 } 268 } 269 } 270 271 private String computeChildCacheKey(String parentId, String name) { 272 return parentId + '_' + name; 273 } 274 275 private void invalidate(String id) { 276 invalidateAll(Collections.singleton(id)); 277 } 278 279 private void invalidateAll(Collection<String> ids) { 280 cache.invalidateAll(ids); 281 if (clusterInvalidator != null) { 282 synchronized (invalidations) { 283 invalidations.addAll(ids); 284 } 285 } 286 } 287 288 protected void sendInvalidationsToOther() { 289 synchronized (invalidations) { 290 if (!invalidations.isEmpty()) { 291 if (clusterInvalidator != null) { 292 clusterInvalidator.sendInvalidations(invalidations); 293 } 294 invalidations.clear(); 295 } 296 } 297 } 298 299 protected void processReceivedInvalidations() { 300 if (clusterInvalidator != null) { 301 DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations(); 302 if (invalidations.all) { 303 cache.invalidateAll(); 304 childCache.invalidateAll(); 305 } else if (invalidations.ids != null) { 306 cache.invalidateAll(invalidations.ids); 307 } 308 } 309 } 310 311 @Override 312 public BlobManager getBlobManager() { 313 return repository.getBlobManager(); 314 } 315 316 @Override 317 public FulltextConfiguration getFulltextConfiguration() { 318 return repository.getFulltextConfiguration(); 319 } 320 321 @Override 322 public boolean isFulltextDisabled() { 323 return repository.isFulltextDisabled(); 324 } 325 326 @Override 327 public boolean isFulltextSearchDisabled() { 328 return repository.isFulltextSearchDisabled(); 329 } 330 331 @Override 332 public boolean isChangeTokenEnabled() { 333 return repository.isChangeTokenEnabled(); 334 } 335 336 @Override 337 public String getRootId() { 338 return repository.getRootId(); 339 } 340 341 @Override 342 public String generateNewId() { 343 return repository.generateNewId(); 344 } 345 346 @Override 347 public boolean hasChild(String parentId, String name, Set<String> ignored) { 348 return repository.hasChild(parentId, name, ignored); 349 } 350 351 @Override 352 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 353 return repository.queryKeyValue(key, value, ignored); 354 } 355 356 @Override 357 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 358 return repository.queryKeyValue(key1, value1, key2, value2, ignored); 359 } 360 361 @Override 362 public Stream<State> getDescendants(String id, Set<String> keys) { 363 return repository.getDescendants(id, keys); 364 } 365 366 @Override 367 public Stream<State> getDescendants(String id, Set<String> keys, int limit) { 368 return repository.getDescendants(id, keys, limit); 369 } 370 371 @Override 372 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 373 return repository.queryKeyValuePresence(key, value, ignored); 374 } 375 376 @Override 377 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 378 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 379 return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo); 380 } 381 382 @Override 383 public LockManager getLockManager() { 384 return repository.getLockManager(); 385 } 386 387 @Override 388 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 389 return repository.scroll(evaluator, batchSize, keepAliveSeconds); 390 } 391 392 @Override 393 public ScrollResult<String> scroll(String scrollId) { 394 return repository.scroll(scrollId); 395 } 396 397 @Override 398 public Lock getLock(String id) { 399 return repository.getLock(id); 400 } 401 402 @Override 403 public Lock setLock(String id, Lock lock) { 404 return repository.setLock(id, lock); 405 } 406 407 @Override 408 public Lock removeLock(String id, String owner) { 409 return repository.removeLock(id, owner); 410 } 411 412 @Override 413 public void closeLockManager() { 414 repository.closeLockManager(); 415 } 416 417 @Override 418 public void clearLockManagerCaches() { 419 repository.clearLockManagerCaches(); 420 } 421 422 @Override 423 public String getName() { 424 return repository.getName(); 425 } 426 427 @Override 428 public Session getSession() { 429 if (repository instanceof DBSRepositoryBase) { 430 return ((DBSRepositoryBase) repository).getSession(this); 431 } 432 return repository.getSession(); 433 } 434 435 @Override 436 public int getActiveSessionsCount() { 437 return repository.getActiveSessionsCount(); 438 } 439 440 @Override 441 public void markReferencedBinaries() { 442 repository.markReferencedBinaries(); 443 } 444 445}