001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.dbs; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 024 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.TimeUnit; 035 036import org.apache.commons.lang.StringUtils; 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.api.Lock; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.api.PartialList; 042import org.nuxeo.ecm.core.api.ScrollResult; 043import org.nuxeo.ecm.core.blob.BlobManager; 044import org.nuxeo.ecm.core.model.LockManager; 045import org.nuxeo.ecm.core.model.Session; 046import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 047import org.nuxeo.ecm.core.storage.FulltextConfiguration; 048import org.nuxeo.ecm.core.storage.State; 049import org.nuxeo.ecm.core.storage.State.StateDiff; 050import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 051import org.nuxeo.runtime.metrics.MetricsService; 052 053import com.codahale.metrics.MetricRegistry; 054import com.codahale.metrics.SharedMetricRegistries; 055import com.google.common.cache.Cache; 056import com.google.common.cache.CacheBuilder; 057import com.google.common.collect.ImmutableMap; 058import com.google.common.collect.Ordering; 059 060/** 061 * The DBS Cache layer used to cache some method call of real repository 062 * 063 * @since 8.10 064 */ 065public class DBSCachingRepository implements DBSRepository { 066 067 private static final Log log = LogFactory.getLog(DBSCachingRepository.class); 068 069 private static final Random RANDOM = new Random(); 070 071 private final DBSRepository repository; 072 073 private final Cache<String, State> cache; 074 075 private final Cache<String, String> childCache; 076 077 private DBSClusterInvalidator clusterInvalidator; 078 079 private final DBSInvalidations invalidations; 080 081 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 082 083 public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) { 084 this.repository = repository; 085 // Init caches 086 cache = newCache(descriptor); 087 registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache")); 088 childCache = newCache(descriptor); 089 registry.registerAll( 090 GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache")); 091 if (log.isInfoEnabled()) { 092 log.info(String.format("DBS cache activated on '%s' repository", repository.getName())); 093 } 094 invalidations = new DBSInvalidations(); 095 if (descriptor.isClusteringEnabled()) { 096 initClusterInvalidator(descriptor); 097 } 098 } 099 100 protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) { 101 CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder(); 102 builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats(); 103 if (descriptor.cacheConcurrencyLevel != null) { 104 builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue()); 105 } 106 if (descriptor.cacheMaxSize != null) { 107 builder = builder.maximumSize(descriptor.cacheMaxSize.longValue()); 108 } 109 return builder.build(); 110 } 111 112 protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) { 113 String nodeId = descriptor.clusterNodeId; 114 if (StringUtils.isBlank(nodeId)) { 115 nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)); 116 log.warn("Missing cluster node id configuration, please define it explicitly " 117 + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId); 118 } else { 119 nodeId = nodeId.trim(); 120 } 121 clusterInvalidator = createClusterInvalidator(descriptor); 122 clusterInvalidator.initialize(nodeId, getName()); 123 } 124 125 protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) { 126 Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass; 127 if (klass == null) { 128 throw new NuxeoException( 129 "Unable to get cluster invalidator class from descriptor whereas clustering is enabled"); 130 } 131 try { 132 return klass.newInstance(); 133 } catch (ReflectiveOperationException e) { 134 throw new NuxeoException(e); 135 } 136 137 } 138 139 public void begin() { 140 repository.begin(); 141 processReceivedInvalidations(); 142 } 143 144 @Override 145 public void commit() { 146 repository.commit(); 147 sendInvalidationsToOther(); 148 processReceivedInvalidations(); 149 } 150 151 @Override 152 public void rollback() { 153 repository.rollback(); 154 } 155 156 @Override 157 public void shutdown() { 158 repository.shutdown(); 159 // Clear caches 160 cache.invalidateAll(); 161 childCache.invalidateAll(); 162 // Remove metrics 163 String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache"); 164 String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache"); 165 registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName)); 166 if (log.isInfoEnabled()) { 167 log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName())); 168 } 169 // Send invalidations 170 if (clusterInvalidator != null) { 171 clusterInvalidator.sendInvalidations(new DBSInvalidations(true)); 172 } 173 174 } 175 176 @Override 177 public State readState(String id) { 178 State state = cache.getIfPresent(id); 179 if (state == null) { 180 state = repository.readState(id); 181 if (state != null) { 182 putInCache(state); 183 } 184 } 185 return state; 186 } 187 188 @Override 189 public List<State> readStates(List<String> ids) { 190 ImmutableMap<String, State> statesMap = cache.getAllPresent(ids); 191 List<String> idsToRetrieve = new ArrayList<>(ids); 192 idsToRetrieve.removeAll(statesMap.keySet()); 193 // Read missing states from repository 194 List<State> states = repository.readStates(idsToRetrieve); 195 // Cache them 196 states.forEach(this::putInCache); 197 // Add previous cached one 198 states.addAll(statesMap.values()); 199 // Sort them 200 states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids))); 201 return states; 202 } 203 204 @Override 205 public void createState(State state) { 206 repository.createState(state); 207 // don't cache new state, it is inefficient on mass import 208 } 209 210 @Override 211 public void createStates(List<State> states) { 212 repository.createStates(states); 213 // don't cache new states, it is inefficient on mass import 214 } 215 216 @Override 217 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 218 repository.updateState(id, diff, changeTokenUpdater); 219 invalidate(id); 220 } 221 222 @Override 223 public void deleteStates(Set<String> ids) { 224 repository.deleteStates(ids); 225 invalidateAll(ids); 226 } 227 228 @Override 229 public State readChildState(String parentId, String name, Set<String> ignored) { 230 processReceivedInvalidations(); 231 232 String childCacheKey = computeChildCacheKey(parentId, name); 233 String stateId = childCache.getIfPresent(childCacheKey); 234 if (stateId != null) { 235 State state = cache.getIfPresent(stateId); 236 if (state != null) { 237 // As we don't have invalidation for childCache we need to check if retrieved state is the right one 238 // and not a previous document which was moved or renamed 239 if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) { 240 return state; 241 } else { 242 // We can invalidate the entry in cache as the document seemed to be moved or renamed 243 childCache.invalidate(childCacheKey); 244 } 245 } 246 } 247 State state = repository.readChildState(parentId, name, ignored); 248 putInCache(state); 249 return state; 250 } 251 252 private void putInCache(State state) { 253 if (state != null) { 254 String stateId = state.get(KEY_ID).toString(); 255 cache.put(stateId, state); 256 Object stateParentId = state.get(KEY_PARENT_ID); 257 if (stateParentId != null) { 258 childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId); 259 } 260 } 261 } 262 263 private String computeChildCacheKey(String parentId, String name) { 264 return parentId + '_' + name; 265 } 266 267 private void invalidate(String id) { 268 invalidateAll(Collections.singleton(id)); 269 } 270 271 private void invalidateAll(Collection<String> ids) { 272 cache.invalidateAll(ids); 273 if (clusterInvalidator != null) { 274 synchronized (invalidations) { 275 invalidations.addAll(ids); 276 } 277 } 278 } 279 280 protected void sendInvalidationsToOther() { 281 synchronized (invalidations) { 282 if (!invalidations.isEmpty()) { 283 if (clusterInvalidator != null) { 284 clusterInvalidator.sendInvalidations(invalidations); 285 } 286 invalidations.clear(); 287 } 288 } 289 } 290 291 protected void processReceivedInvalidations() { 292 if (clusterInvalidator != null) { 293 DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations(); 294 if (invalidations.all) { 295 cache.invalidateAll(); 296 childCache.invalidateAll(); 297 } else if (invalidations.ids != null) { 298 cache.invalidateAll(invalidations.ids); 299 } 300 } 301 } 302 303 @Override 304 public BlobManager getBlobManager() { 305 return repository.getBlobManager(); 306 } 307 308 @Override 309 public FulltextConfiguration getFulltextConfiguration() { 310 return repository.getFulltextConfiguration(); 311 } 312 313 @Override 314 public boolean isFulltextDisabled() { 315 return repository.isFulltextDisabled(); 316 } 317 318 @Override 319 public boolean isChangeTokenEnabled() { 320 return repository.isChangeTokenEnabled(); 321 } 322 323 @Override 324 public String getRootId() { 325 return repository.getRootId(); 326 } 327 328 @Override 329 public String generateNewId() { 330 return repository.generateNewId(); 331 } 332 333 @Override 334 public boolean hasChild(String parentId, String name, Set<String> ignored) { 335 return repository.hasChild(parentId, name, ignored); 336 } 337 338 @Override 339 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 340 return repository.queryKeyValue(key, value, ignored); 341 } 342 343 @Override 344 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 345 return repository.queryKeyValue(key1, value1, key2, value2, ignored); 346 } 347 348 @Override 349 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 350 Map<String, Object[]> targetProxies) { 351 repository.queryKeyValueArray(key, value, ids, proxyTargets, targetProxies); 352 } 353 354 @Override 355 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 356 return repository.queryKeyValuePresence(key, value, ignored); 357 } 358 359 @Override 360 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 361 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 362 return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo); 363 } 364 365 @Override 366 public LockManager getLockManager() { 367 return repository.getLockManager(); 368 } 369 370 @Override 371 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 372 return repository.scroll(evaluator, batchSize, keepAliveSeconds); 373 } 374 375 @Override 376 public ScrollResult scroll(String scrollId) { 377 return repository.scroll(scrollId); 378 } 379 380 @Override 381 public Lock getLock(String id) { 382 return repository.getLock(id); 383 } 384 385 @Override 386 public Lock setLock(String id, Lock lock) { 387 return repository.setLock(id, lock); 388 } 389 390 @Override 391 public Lock removeLock(String id, String owner) { 392 return repository.removeLock(id, owner); 393 } 394 395 @Override 396 public void closeLockManager() { 397 repository.closeLockManager(); 398 } 399 400 @Override 401 public void clearLockManagerCaches() { 402 repository.clearLockManagerCaches(); 403 } 404 405 @Override 406 public String getName() { 407 return repository.getName(); 408 } 409 410 @Override 411 public Session getSession() { 412 if (repository instanceof DBSRepositoryBase) { 413 return ((DBSRepositoryBase) repository).getSession(this); 414 } 415 return repository.getSession(); 416 } 417 418 @Override 419 public int getActiveSessionsCount() { 420 return repository.getActiveSessionsCount(); 421 } 422 423 @Override 424 public void markReferencedBinaries() { 425 repository.markReferencedBinaries(); 426 } 427 428}