001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.dbs; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 024 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.TimeUnit; 035 036import org.apache.commons.lang.StringUtils; 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.api.Lock; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.api.PartialList; 042import org.nuxeo.ecm.core.api.ScrollResult; 043import org.nuxeo.ecm.core.blob.BlobManager; 044import org.nuxeo.ecm.core.model.LockManager; 045import org.nuxeo.ecm.core.model.Session; 046import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 047import org.nuxeo.ecm.core.storage.FulltextConfiguration; 048import org.nuxeo.ecm.core.storage.State; 049import org.nuxeo.ecm.core.storage.State.StateDiff; 050import org.nuxeo.runtime.metrics.MetricsService; 051 052import com.codahale.metrics.MetricRegistry; 053import com.codahale.metrics.SharedMetricRegistries; 054import com.google.common.cache.Cache; 055import com.google.common.cache.CacheBuilder; 056import com.google.common.collect.ImmutableMap; 057import com.google.common.collect.Ordering; 058 059/** 060 * The DBS Cache layer used to cache some method call of real repository 061 * 062 * @since 8.10 063 */ 064public class DBSCachingRepository implements DBSRepository { 065 066 private static final Log log = LogFactory.getLog(DBSCachingRepository.class); 067 068 private static final Random RANDOM = new Random(); 069 070 private final DBSRepository repository; 071 072 private final Cache<String, State> cache; 073 074 private final Cache<String, String> childCache; 075 076 private DBSClusterInvalidator clusterInvalidator; 077 078 private final DBSInvalidations invalidations; 079 080 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 081 082 public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) { 083 this.repository = repository; 084 // Init caches 085 cache = newCache(descriptor); 086 registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache")); 087 childCache = newCache(descriptor); 088 registry.registerAll( 089 GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache")); 090 if (log.isInfoEnabled()) { 091 log.info(String.format("DBS cache activated on '%s' repository", repository.getName())); 092 } 093 invalidations = new DBSInvalidations(); 094 if (descriptor.isClusteringEnabled()) { 095 initClusterInvalidator(descriptor); 096 } 097 } 098 099 protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) { 100 CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder(); 101 builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES); 102 if (descriptor.cacheConcurrencyLevel != null) { 103 builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue()); 104 } 105 if (descriptor.cacheMaxSize != null) { 106 builder = builder.maximumSize(descriptor.cacheMaxSize.longValue()); 107 } 108 return builder.build(); 109 } 110 111 protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) { 112 String nodeId = descriptor.clusterNodeId; 113 if (StringUtils.isBlank(nodeId)) { 114 nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)); 115 log.warn("Missing cluster node id configuration, please define it explicitly " 116 + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId); 117 } else { 118 nodeId = nodeId.trim(); 119 } 120 clusterInvalidator = createClusterInvalidator(descriptor); 121 clusterInvalidator.initialize(nodeId, this); 122 } 123 124 protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) { 125 Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass; 126 if (klass == null) { 127 throw new NuxeoException( 128 "Unable to get cluster invalidator class from descriptor whereas clustering is enabled"); 129 } 130 try { 131 return klass.newInstance(); 132 } catch (ReflectiveOperationException e) { 133 throw new NuxeoException(e); 134 } 135 136 } 137 138 public void begin() { 139 repository.begin(); 140 processReceivedInvalidations(); 141 } 142 143 @Override 144 public void commit() { 145 repository.commit(); 146 sendInvalidationsToOther(); 147 processReceivedInvalidations(); 148 } 149 150 @Override 151 public void rollback() { 152 repository.rollback(); 153 } 154 155 @Override 156 public void shutdown() { 157 repository.shutdown(); 158 // Clear caches 159 cache.invalidateAll(); 160 childCache.invalidateAll(); 161 // Remove metrics 162 String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache"); 163 String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache"); 164 registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName)); 165 if (log.isInfoEnabled()) { 166 log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName())); 167 } 168 // Send invalidations 169 if (clusterInvalidator != null) { 170 clusterInvalidator.sendInvalidations(new DBSInvalidations(true)); 171 } 172 173 } 174 175 @Override 176 public State readState(String id) { 177 State state = cache.getIfPresent(id); 178 if (state == null) { 179 state = repository.readState(id); 180 if (state != null) { 181 putInCache(state); 182 } 183 } 184 return state; 185 } 186 187 @Override 188 public List<State> readStates(List<String> ids) { 189 ImmutableMap<String, State> statesMap = cache.getAllPresent(ids); 190 List<String> idsToRetrieve = new ArrayList<>(ids); 191 idsToRetrieve.removeAll(statesMap.keySet()); 192 // Read missing states from repository 193 List<State> states = repository.readStates(idsToRetrieve); 194 // Cache them 195 states.forEach(this::putInCache); 196 // Add previous cached one 197 states.addAll(statesMap.values()); 198 // Sort them 199 states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids))); 200 return states; 201 } 202 203 @Override 204 public void createState(State state) { 205 repository.createState(state); 206 // TODO check if it's relevant 207 putInCache(state); 208 } 209 210 @Override 211 public void createStates(List<State> states) { 212 repository.createStates(states); 213 states.forEach(this::putInCache); 214 } 215 216 @Override 217 public void updateState(String id, StateDiff diff) { 218 repository.updateState(id, diff); 219 invalidate(id); 220 } 221 222 @Override 223 public void deleteStates(Set<String> ids) { 224 repository.deleteStates(ids); 225 invalidateAll(ids); 226 } 227 228 @Override 229 public State readChildState(String parentId, String name, Set<String> ignored) { 230 processReceivedInvalidations(); 231 232 String childCacheKey = computeChildCacheKey(parentId, name); 233 String stateId = childCache.getIfPresent(childCacheKey); 234 if (stateId != null) { 235 State state = cache.getIfPresent(stateId); 236 if (state != null) { 237 // As we don't have invalidation for childCache we need to check if retrieved state is the right one 238 // and not a previous document which was moved or renamed 239 if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) { 240 return state; 241 } else { 242 // We can invalidate the entry in cache as the document seemed to be moved or renamed 243 childCache.invalidate(childCacheKey); 244 } 245 } 246 } 247 State state = repository.readChildState(parentId, name, ignored); 248 putInCache(state); 249 return state; 250 } 251 252 private void putInCache(State state) { 253 if (state != null) { 254 String stateId = state.get(KEY_ID).toString(); 255 cache.put(stateId, state); 256 Object stateParentId = state.get(KEY_PARENT_ID); 257 if (stateParentId != null) { 258 childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId); 259 } 260 } 261 } 262 263 private String computeChildCacheKey(String parentId, String name) { 264 return parentId + '_' + name; 265 } 266 267 private void invalidate(String id) { 268 invalidateAll(Collections.singleton(id)); 269 } 270 271 private void invalidateAll(Collection<String> ids) { 272 cache.invalidateAll(ids); 273 if (clusterInvalidator != null) { 274 DBSInvalidations invalidations = new DBSInvalidations(); 275 invalidations.addAll(ids); 276 } 277 } 278 279 protected void sendInvalidationsToOther() { 280 if (!invalidations.isEmpty()) { 281 synchronized (invalidations) { 282 if (clusterInvalidator != null) { 283 clusterInvalidator.sendInvalidations(invalidations); 284 } 285 invalidations.clear(); 286 } 287 } 288 } 289 290 protected void processReceivedInvalidations() { 291 if (clusterInvalidator != null) { 292 DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations(); 293 if (invalidations.all) { 294 cache.invalidateAll(); 295 childCache.invalidateAll(); 296 } else if (invalidations.ids != null) { 297 cache.invalidateAll(invalidations.ids); 298 } 299 } 300 } 301 302 @Override 303 public BlobManager getBlobManager() { 304 return repository.getBlobManager(); 305 } 306 307 @Override 308 public FulltextConfiguration getFulltextConfiguration() { 309 return repository.getFulltextConfiguration(); 310 } 311 312 @Override 313 public boolean isFulltextDisabled() { 314 return repository.isFulltextDisabled(); 315 } 316 317 @Override 318 public String getRootId() { 319 return repository.getRootId(); 320 } 321 322 @Override 323 public String generateNewId() { 324 return repository.generateNewId(); 325 } 326 327 @Override 328 public boolean hasChild(String parentId, String name, Set<String> ignored) { 329 return repository.hasChild(parentId, name, ignored); 330 } 331 332 @Override 333 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 334 return repository.queryKeyValue(key, value, ignored); 335 } 336 337 @Override 338 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 339 return repository.queryKeyValue(key1, value1, key2, value2, ignored); 340 } 341 342 @Override 343 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 344 Map<String, Object[]> targetProxies) { 345 repository.queryKeyValueArray(key, value, ids, proxyTargets, targetProxies); 346 } 347 348 @Override 349 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 350 return repository.queryKeyValuePresence(key, value, ignored); 351 } 352 353 @Override 354 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 355 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 356 return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo); 357 } 358 359 @Override 360 public LockManager getLockManager() { 361 return repository.getLockManager(); 362 } 363 364 @Override 365 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 366 return repository.scroll(evaluator, batchSize, keepAliveSeconds); 367 } 368 369 @Override 370 public ScrollResult scroll(String scrollId) { 371 return repository.scroll(scrollId); 372 } 373 374 @Override 375 public Lock getLock(String id) { 376 return repository.getLock(id); 377 } 378 379 @Override 380 public Lock setLock(String id, Lock lock) { 381 return repository.setLock(id, lock); 382 } 383 384 @Override 385 public Lock removeLock(String id, String owner) { 386 return repository.removeLock(id, owner); 387 } 388 389 @Override 390 public void closeLockManager() { 391 repository.closeLockManager(); 392 } 393 394 @Override 395 public void clearLockManagerCaches() { 396 repository.clearLockManagerCaches(); 397 } 398 399 @Override 400 public String getName() { 401 return repository.getName(); 402 } 403 404 @Override 405 public Session getSession() { 406 if (repository instanceof DBSRepositoryBase) { 407 return ((DBSRepositoryBase) repository).getSession(this); 408 } 409 return repository.getSession(); 410 } 411 412 @Override 413 public int getActiveSessionsCount() { 414 return repository.getActiveSessionsCount(); 415 } 416 417 @Override 418 public void markReferencedBinaries() { 419 repository.markReferencedBinaries(); 420 } 421 422}