001/* 002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 * Benoit Delbosc 012 */ 013package org.nuxeo.ecm.core.storage.sql; 014 015import java.io.Serializable; 016import java.util.ArrayList; 017import java.util.Arrays; 018import java.util.Collection; 019import java.util.Collections; 020import java.util.LinkedList; 021import java.util.List; 022import java.util.Map; 023import java.util.SortedMap; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import javax.management.MBeanServer; 027import javax.transaction.SystemException; 028import javax.transaction.Transaction; 029import javax.transaction.TransactionManager; 030import javax.transaction.xa.XAException; 031import javax.transaction.xa.Xid; 032 033import net.sf.ehcache.Cache; 034import net.sf.ehcache.CacheManager; 035import net.sf.ehcache.Element; 036import net.sf.ehcache.management.ManagementService; 037import net.sf.ehcache.transaction.manager.TransactionManagerLookup; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.management.ServerLocator; 044import org.nuxeo.runtime.metrics.MetricsService; 045 046import com.codahale.metrics.Counter; 047import com.codahale.metrics.Gauge; 048import com.codahale.metrics.MetricRegistry; 049import com.codahale.metrics.SharedMetricRegistries; 050import com.codahale.metrics.Timer; 051import com.codahale.metrics.Timer.Context; 052 053/** 054 * A {@link RowMapper} that use an unified ehcache. 055 * <p> 056 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 057 */ 058public class UnifiedCachingRowMapper implements RowMapper { 059 060 private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class); 061 062 private static final String ABSENT = "__ABSENT__\0\0\0"; 063 064 private static CacheManager cacheManager = null; 065 066 protected static boolean isXA; 067 068 private Cache cache; 069 070 private Model model; 071 072 /** 073 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 074 */ 075 private RowMapper rowMapper; 076 077 /** 078 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 079 * post-commit time. 080 */ 081 private final Invalidations localInvalidations; 082 083 /** 084 * The queue of invalidations received from other session or from the cluster invalidator, to process at 085 * pre-transaction time. 086 */ 087 private final InvalidationsQueue invalidationsQueue; 088 089 /** 090 * The propagator of invalidations to other mappers. 091 */ 092 private InvalidationsPropagator invalidationsPropagator; 093 094 private static final String CACHE_NAME = "unifiedVCSCache"; 095 096 private static final String EHCACHE_FILE_PROP = "ehcacheFilePath"; 097 098 private static AtomicInteger rowMapperCount = new AtomicInteger(); 099 100 /** 101 * Cache statistics 102 * 103 * @since 5.7 104 */ 105 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 106 107 protected Counter cacheHitCount; 108 109 protected Timer cacheGetTimer; 110 111 // sor means system of record (database access) 112 protected Counter sorRows; 113 114 protected Timer sorGetTimer; 115 116 public UnifiedCachingRowMapper() { 117 localInvalidations = new Invalidations(); 118 invalidationsQueue = new InvalidationsQueue("mapper-" + this); 119 } 120 121 synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper, 122 InvalidationsPropagator invalidationsPropagator, Map<String, String> properties) { 123 this.model = model; 124 this.rowMapper = rowMapper; 125 this.invalidationsPropagator = invalidationsPropagator; 126 invalidationsPropagator.addQueue(invalidationsQueue); 127 if (cacheManager == null) { 128 if (properties.containsKey(EHCACHE_FILE_PROP)) { 129 String value = properties.get(EHCACHE_FILE_PROP); 130 log.info("Creating ehcache manager for VCS, using ehcache file: " + value); 131 cacheManager = CacheManager.create(value); 132 } else { 133 log.info("Creating ehcache manager for VCS, No ehcache file provided"); 134 cacheManager = CacheManager.create(); 135 } 136 isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional(); 137 // Exposes cache to JMX 138 MBeanServer mBeanServer = Framework.getLocalService(ServerLocator.class).lookupServer(); 139 ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true); 140 } 141 rowMapperCount.incrementAndGet(); 142 cache = cacheManager.getCache(CACHE_NAME); 143 setMetrics(repositoryName); 144 } 145 146 protected void setMetrics(String repositoryName) { 147 cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 148 "unified", "hits")); 149 cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 150 "unified", "get")); 151 sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 152 "sor", "rows")); 153 sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 154 "sor", "get")); 155 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 156 "cache-size"); 157 SortedMap<String, Gauge> gauges = registry.getGauges(); 158 if (!gauges.containsKey(gaugeName)) { 159 registry.register(gaugeName, new Gauge<Integer>() { 160 @Override 161 public Integer getValue() { 162 if (cacheManager != null) { 163 return cacheManager.getCache(CACHE_NAME).getSize(); 164 } 165 return 0; 166 } 167 }); 168 } 169 } 170 171 public void close() { 172 invalidationsPropagator.removeQueue(invalidationsQueue); 173 rowMapperCount.decrementAndGet(); 174 } 175 176 @Override 177 public Serializable generateNewId() { 178 return rowMapper.generateNewId(); 179 } 180 181 /* 182 * ----- ehcache ----- 183 */ 184 185 protected boolean hasTransaction() { 186 TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup(); 187 if (transactionManagerLookup == null) { 188 return false; 189 } 190 TransactionManager transactionManager = transactionManagerLookup.getTransactionManager(); 191 if (transactionManager == null) { 192 return false; 193 } 194 Transaction transaction; 195 try { 196 transaction = transactionManager.getTransaction(); 197 } catch (SystemException e) { 198 throw new RuntimeException(e); 199 } 200 return transaction != null; 201 } 202 203 protected boolean useEhCache() { 204 return !isXA || hasTransaction(); 205 } 206 207 protected void ehCachePut(Element element) { 208 if (useEhCache()) { 209 cache.put(element); 210 } 211 } 212 213 protected Element ehCacheGet(Serializable key) { 214 if (useEhCache()) { 215 return cache.get(key); 216 } 217 return null; 218 } 219 220 protected int ehCacheGetSize() { 221 if (useEhCache()) { 222 return cache.getSize(); 223 } 224 return 0; 225 } 226 227 protected boolean ehCacheRemove(Serializable key) { 228 if (useEhCache()) { 229 return cache.remove(key); 230 } 231 return false; 232 } 233 234 protected void ehCacheRemoveAll() { 235 if (useEhCache()) { 236 cache.removeAll(); 237 } 238 } 239 240 /* 241 * ----- Cache ----- 242 */ 243 244 protected static boolean isAbsent(Row row) { 245 return row.tableName == ABSENT; // == is ok 246 } 247 248 protected void cachePut(Row row) { 249 row = row.clone(); 250 // for ACL collections, make sure the order is correct 251 // (without the cache, the query to get a list of collection does an 252 // ORDER BY pos, so users of the cache must get the same behavior) 253 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 254 row.values = sortACLRows((ACLRow[]) row.values); 255 } 256 Element element = new Element(new RowId(row), row); 257 ehCachePut(element); 258 } 259 260 protected ACLRow[] sortACLRows(ACLRow[] acls) { 261 List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls)); 262 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 263 ACLRow[] res = new ACLRow[acls.length]; 264 return list.toArray(res); 265 } 266 267 protected void cachePutAbsent(RowId rowId) { 268 Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 269 ehCachePut(element); 270 } 271 272 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 273 if (row != null) { 274 cachePut(row); 275 } else { 276 cachePutAbsent(rowId); 277 } 278 } 279 280 protected void cachePutAbsentIfRowId(RowId rowId) { 281 if (rowId instanceof Row) { 282 cachePut((Row) rowId); 283 } else { 284 cachePutAbsent(rowId); 285 } 286 } 287 288 protected Row cacheGet(RowId rowId) { 289 final Context context = cacheGetTimer.time(); 290 try { 291 Element element = ehCacheGet(rowId); 292 Row row = null; 293 if (element != null) { 294 row = (Row) element.getObjectValue(); 295 } 296 if (row != null && !isAbsent(row)) { 297 row = row.clone(); 298 } 299 if (row != null) { 300 cacheHitCount.inc(); 301 } 302 return row; 303 } finally { 304 context.stop(); 305 } 306 } 307 308 protected void cacheRemove(RowId rowId) { 309 ehCacheRemove(rowId); 310 } 311 312 /* 313 * ----- Invalidations / Cache Management ----- 314 */ 315 316 @Override 317 public Invalidations receiveInvalidations() { 318 // invalidations from the underlying mapper (cluster) 319 // already propagated to our invalidations queue 320 Invalidations remoteInvals = rowMapper.receiveInvalidations(); 321 322 Invalidations ret = invalidationsQueue.getInvalidations(); 323 324 if (remoteInvals != null) { 325 if (!ret.all) { 326 // only handle remote invalidations, the cache is shared and transactional 327 if (remoteInvals.modified != null) { 328 for (RowId rowId : remoteInvals.modified) { 329 cacheRemove(rowId); 330 } 331 } 332 if (remoteInvals.deleted != null) { 333 for (RowId rowId : remoteInvals.deleted) { 334 cachePutAbsent(rowId); 335 } 336 } 337 } 338 } 339 340 // invalidate our cache 341 if (ret.all) { 342 clearCache(); 343 } 344 345 return ret.isEmpty() ? null : ret; 346 } 347 348 // propagate invalidations 349 @Override 350 public void sendInvalidations(Invalidations invalidations) { 351 // add local invalidations 352 if (!localInvalidations.isEmpty()) { 353 if (invalidations == null) { 354 invalidations = new Invalidations(); 355 } 356 invalidations.add(localInvalidations); 357 localInvalidations.clear(); 358 } 359 360 if (invalidations != null && !invalidations.isEmpty()) { 361 // send to underlying mapper 362 rowMapper.sendInvalidations(invalidations); 363 364 // queue to other mappers' caches 365 invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue); 366 } 367 } 368 369 @Override 370 public void clearCache() { 371 ehCacheRemoveAll(); 372 localInvalidations.clear(); 373 rowMapper.clearCache(); 374 } 375 376 @Override 377 public void rollback(Xid xid) throws XAException { 378 try { 379 rowMapper.rollback(xid); 380 } finally { 381 ehCacheRemoveAll(); 382 localInvalidations.clear(); 383 } 384 } 385 386 /* 387 * ----- Batch ----- 388 */ 389 390 /* 391 * Use those from the cache if available, read from the mapper for the rest. 392 */ 393 @Override 394 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 395 List<RowId> res = new ArrayList<RowId>(rowIds.size()); 396 // find which are in cache, and which not 397 List<RowId> todo = new LinkedList<RowId>(); 398 for (RowId rowId : rowIds) { 399 Row row = cacheGet(rowId); 400 if (row == null) { 401 if (cacheOnly) { 402 res.add(new RowId(rowId)); 403 } else { 404 todo.add(rowId); 405 } 406 } else if (isAbsent(row)) { 407 res.add(new RowId(rowId)); 408 } else { 409 res.add(row); 410 } 411 } 412 if (!todo.isEmpty()) { 413 final Context context = sorGetTimer.time(); 414 try { 415 // ask missing ones to underlying row mapper 416 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 417 // add them to the cache 418 for (RowId rowId : fetched) { 419 cachePutAbsentIfRowId(rowId); 420 } 421 // merge results 422 res.addAll(fetched); 423 sorRows.inc(fetched.size()); 424 } finally { 425 context.stop(); 426 } 427 } 428 return res; 429 } 430 431 /* 432 * Save in the cache then pass all the writes to the mapper. 433 */ 434 @Override 435 public void write(RowBatch batch) { 436 // we avoid gathering invalidations for a write-only table: fulltext 437 for (Row row : batch.creates) { 438 cachePut(row); 439 if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) { 440 // we need to send modified invalidations for created 441 // fragments because other session's ABSENT fragments have 442 // to be invalidated 443 localInvalidations.addModified(new RowId(row)); 444 } 445 } 446 for (RowUpdate rowu : batch.updates) { 447 cachePut(rowu.row); 448 if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) { 449 localInvalidations.addModified(new RowId(rowu.row)); 450 } 451 } 452 for (RowId rowId : batch.deletes) { 453 if (rowId instanceof Row) { 454 throw new AssertionError(); 455 } 456 cachePutAbsent(rowId); 457 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 458 localInvalidations.addDeleted(rowId); 459 } 460 } 461 for (RowId rowId : batch.deletesDependent) { 462 if (rowId instanceof Row) { 463 throw new AssertionError(); 464 } 465 cachePutAbsent(rowId); 466 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 467 localInvalidations.addDeleted(rowId); 468 } 469 } 470 471 // propagate to underlying mapper 472 rowMapper.write(batch); 473 } 474 475 /* 476 * ----- Read ----- 477 */ 478 479 @Override 480 public Row readSimpleRow(RowId rowId) { 481 Row row = cacheGet(rowId); 482 if (row == null) { 483 row = rowMapper.readSimpleRow(rowId); 484 cachePutAbsentIfNull(rowId, row); 485 return row; 486 } else if (isAbsent(row)) { 487 return null; 488 } else { 489 return row; 490 } 491 } 492 493 @Override 494 public Map<String, String> getBinaryFulltext(RowId rowId) { 495 return rowMapper.getBinaryFulltext(rowId); 496 } 497 498 @Override 499 public Serializable[] readCollectionRowArray(RowId rowId) { 500 Row row = cacheGet(rowId); 501 if (row == null) { 502 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 503 assert array != null; 504 row = new Row(rowId.tableName, rowId.id, array); 505 cachePut(row); 506 return row.values; 507 } else if (isAbsent(row)) { 508 return null; 509 } else { 510 return row.values; 511 } 512 } 513 514 @Override 515 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 516 Serializable criterion, boolean limitToOne) { 517 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 518 for (Row row : rows) { 519 cachePut(row); 520 } 521 return rows; 522 } 523 524 /* 525 * ----- Copy ----- 526 */ 527 528 @Override 529 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) { 530 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow); 531 Invalidations invalidations = result.invalidations; 532 if (invalidations.modified != null) { 533 for (RowId rowId : invalidations.modified) { 534 cacheRemove(rowId); 535 localInvalidations.addModified(new RowId(rowId)); 536 } 537 } 538 if (invalidations.deleted != null) { 539 for (RowId rowId : invalidations.deleted) { 540 cacheRemove(rowId); 541 localInvalidations.addDeleted(rowId); 542 } 543 } 544 return result; 545 } 546 547 @Override 548 public List<NodeInfo> remove(NodeInfo rootInfo) { 549 List<NodeInfo> infos = rowMapper.remove(rootInfo); 550 for (NodeInfo info : infos) { 551 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) { 552 RowId rowId = new RowId(fragmentName, info.id); 553 cacheRemove(rowId); 554 localInvalidations.addDeleted(rowId); 555 } 556 } 557 // we only put as absent the root fragment, to avoid polluting the cache 558 // with lots of absent info. the rest is removed entirely 559 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id)); 560 return infos; 561 } 562 563 @Override 564 public long getCacheSize() { 565 // The unified cache is reported by the cache-size gauge 566 return 0; 567 } 568 569}