001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.SortedMap; 032import java.util.concurrent.atomic.AtomicInteger; 033 034import javax.management.MBeanServer; 035import javax.transaction.SystemException; 036import javax.transaction.Transaction; 037import javax.transaction.TransactionManager; 038import javax.transaction.xa.XAException; 039import javax.transaction.xa.Xid; 040 041import net.sf.ehcache.Cache; 042import net.sf.ehcache.CacheManager; 043import net.sf.ehcache.Element; 044import net.sf.ehcache.management.ManagementService; 045import net.sf.ehcache.transaction.manager.TransactionManagerLookup; 046 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 050import org.nuxeo.runtime.api.Framework; 051import org.nuxeo.runtime.management.ServerLocator; 052import org.nuxeo.runtime.metrics.MetricsService; 053 054import com.codahale.metrics.Counter; 055import com.codahale.metrics.Gauge; 056import com.codahale.metrics.MetricRegistry; 057import com.codahale.metrics.SharedMetricRegistries; 058import com.codahale.metrics.Timer; 059import com.codahale.metrics.Timer.Context; 060 061/** 062 * A {@link RowMapper} that use an unified ehcache. 063 * <p> 064 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 065 */ 066public class UnifiedCachingRowMapper implements RowMapper { 067 068 private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class); 069 070 private static final String ABSENT = "__ABSENT__\0\0\0"; 071 072 private static CacheManager cacheManager = null; 073 074 protected static boolean isXA; 075 076 private Cache cache; 077 078 private Model model; 079 080 /** 081 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 082 */ 083 private RowMapper rowMapper; 084 085 /** 086 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 087 * post-commit time. 088 */ 089 private final Invalidations localInvalidations; 090 091 /** 092 * The queue of invalidations received from other session or from the cluster invalidator, to process at 093 * pre-transaction time. 094 */ 095 private final InvalidationsQueue invalidationsQueue; 096 097 /** 098 * The propagator of invalidations to other mappers. 099 */ 100 private InvalidationsPropagator invalidationsPropagator; 101 102 private static final String CACHE_NAME = "unifiedVCSCache"; 103 104 private static final String EHCACHE_FILE_PROP = "ehcacheFilePath"; 105 106 private static AtomicInteger rowMapperCount = new AtomicInteger(); 107 108 /** 109 * Cache statistics 110 * 111 * @since 5.7 112 */ 113 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 114 115 protected Counter cacheHitCount; 116 117 protected Timer cacheGetTimer; 118 119 // sor means system of record (database access) 120 protected Counter sorRows; 121 122 protected Timer sorGetTimer; 123 124 public UnifiedCachingRowMapper() { 125 localInvalidations = new Invalidations(); 126 invalidationsQueue = new InvalidationsQueue("mapper-" + this); 127 } 128 129 synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper, 130 InvalidationsPropagator invalidationsPropagator, Map<String, String> properties) { 131 this.model = model; 132 this.rowMapper = rowMapper; 133 this.invalidationsPropagator = invalidationsPropagator; 134 invalidationsPropagator.addQueue(invalidationsQueue); 135 if (cacheManager == null) { 136 if (properties.containsKey(EHCACHE_FILE_PROP)) { 137 String value = properties.get(EHCACHE_FILE_PROP); 138 log.info("Creating ehcache manager for VCS, using ehcache file: " + value); 139 cacheManager = CacheManager.create(value); 140 } else { 141 log.info("Creating ehcache manager for VCS, No ehcache file provided"); 142 cacheManager = CacheManager.create(); 143 } 144 isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional(); 145 // Exposes cache to JMX 146 MBeanServer mBeanServer = Framework.getLocalService(ServerLocator.class).lookupServer(); 147 ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true); 148 } 149 rowMapperCount.incrementAndGet(); 150 cache = cacheManager.getCache(CACHE_NAME); 151 setMetrics(repositoryName); 152 } 153 154 protected void setMetrics(String repositoryName) { 155 cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 156 "unified", "hits")); 157 cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 158 "unified", "get")); 159 sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 160 "sor", "rows")); 161 sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 162 "sor", "get")); 163 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 164 "cache-size"); 165 SortedMap<String, Gauge> gauges = registry.getGauges(); 166 if (!gauges.containsKey(gaugeName)) { 167 registry.register(gaugeName, new Gauge<Integer>() { 168 @Override 169 public Integer getValue() { 170 if (cacheManager != null) { 171 return cacheManager.getCache(CACHE_NAME).getSize(); 172 } 173 return 0; 174 } 175 }); 176 } 177 } 178 179 public void close() { 180 invalidationsPropagator.removeQueue(invalidationsQueue); 181 rowMapperCount.decrementAndGet(); 182 } 183 184 @Override 185 public Serializable generateNewId() { 186 return rowMapper.generateNewId(); 187 } 188 189 /* 190 * ----- ehcache ----- 191 */ 192 193 protected boolean hasTransaction() { 194 TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup(); 195 if (transactionManagerLookup == null) { 196 return false; 197 } 198 TransactionManager transactionManager = transactionManagerLookup.getTransactionManager(); 199 if (transactionManager == null) { 200 return false; 201 } 202 Transaction transaction; 203 try { 204 transaction = transactionManager.getTransaction(); 205 } catch (SystemException e) { 206 throw new RuntimeException(e); 207 } 208 return transaction != null; 209 } 210 211 protected boolean useEhCache() { 212 return !isXA || hasTransaction(); 213 } 214 215 protected void ehCachePut(Element element) { 216 if (useEhCache()) { 217 cache.put(element); 218 } 219 } 220 221 protected Element ehCacheGet(Serializable key) { 222 if (useEhCache()) { 223 return cache.get(key); 224 } 225 return null; 226 } 227 228 protected int ehCacheGetSize() { 229 if (useEhCache()) { 230 return cache.getSize(); 231 } 232 return 0; 233 } 234 235 protected boolean ehCacheRemove(Serializable key) { 236 if (useEhCache()) { 237 return cache.remove(key); 238 } 239 return false; 240 } 241 242 protected void ehCacheRemoveAll() { 243 if (useEhCache()) { 244 cache.removeAll(); 245 } 246 } 247 248 /* 249 * ----- Cache ----- 250 */ 251 252 protected static boolean isAbsent(Row row) { 253 return row.tableName == ABSENT; // == is ok 254 } 255 256 protected void cachePut(Row row) { 257 row = row.clone(); 258 // for ACL collections, make sure the order is correct 259 // (without the cache, the query to get a list of collection does an 260 // ORDER BY pos, so users of the cache must get the same behavior) 261 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 262 row.values = sortACLRows((ACLRow[]) row.values); 263 } 264 Element element = new Element(new RowId(row), row); 265 ehCachePut(element); 266 } 267 268 protected ACLRow[] sortACLRows(ACLRow[] acls) { 269 List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls)); 270 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 271 ACLRow[] res = new ACLRow[acls.length]; 272 return list.toArray(res); 273 } 274 275 protected void cachePutAbsent(RowId rowId) { 276 Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 277 ehCachePut(element); 278 } 279 280 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 281 if (row != null) { 282 cachePut(row); 283 } else { 284 cachePutAbsent(rowId); 285 } 286 } 287 288 protected void cachePutAbsentIfRowId(RowId rowId) { 289 if (rowId instanceof Row) { 290 cachePut((Row) rowId); 291 } else { 292 cachePutAbsent(rowId); 293 } 294 } 295 296 protected Row cacheGet(RowId rowId) { 297 final Context context = cacheGetTimer.time(); 298 try { 299 Element element = ehCacheGet(rowId); 300 Row row = null; 301 if (element != null) { 302 row = (Row) element.getObjectValue(); 303 } 304 if (row != null && !isAbsent(row)) { 305 row = row.clone(); 306 } 307 if (row != null) { 308 cacheHitCount.inc(); 309 } 310 return row; 311 } finally { 312 context.stop(); 313 } 314 } 315 316 protected void cacheRemove(RowId rowId) { 317 ehCacheRemove(rowId); 318 } 319 320 /* 321 * ----- Invalidations / Cache Management ----- 322 */ 323 324 @Override 325 public Invalidations receiveInvalidations() { 326 // invalidations from the underlying mapper (cluster) 327 // already propagated to our invalidations queue 328 Invalidations remoteInvals = rowMapper.receiveInvalidations(); 329 330 Invalidations ret = invalidationsQueue.getInvalidations(); 331 332 if (remoteInvals != null) { 333 if (!ret.all) { 334 // only handle remote invalidations, the cache is shared and transactional 335 if (remoteInvals.modified != null) { 336 for (RowId rowId : remoteInvals.modified) { 337 cacheRemove(rowId); 338 } 339 } 340 if (remoteInvals.deleted != null) { 341 for (RowId rowId : remoteInvals.deleted) { 342 cachePutAbsent(rowId); 343 } 344 } 345 } 346 } 347 348 // invalidate our cache 349 if (ret.all) { 350 clearCache(); 351 } 352 353 return ret.isEmpty() ? null : ret; 354 } 355 356 // propagate invalidations 357 @Override 358 public void sendInvalidations(Invalidations invalidations) { 359 // add local invalidations 360 if (!localInvalidations.isEmpty()) { 361 if (invalidations == null) { 362 invalidations = new Invalidations(); 363 } 364 invalidations.add(localInvalidations); 365 localInvalidations.clear(); 366 } 367 368 if (invalidations != null && !invalidations.isEmpty()) { 369 // send to underlying mapper 370 rowMapper.sendInvalidations(invalidations); 371 372 // queue to other mappers' caches 373 invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue); 374 } 375 } 376 377 @Override 378 public void clearCache() { 379 ehCacheRemoveAll(); 380 localInvalidations.clear(); 381 rowMapper.clearCache(); 382 } 383 384 @Override 385 public void rollback(Xid xid) throws XAException { 386 try { 387 rowMapper.rollback(xid); 388 } finally { 389 ehCacheRemoveAll(); 390 localInvalidations.clear(); 391 } 392 } 393 394 /* 395 * ----- Batch ----- 396 */ 397 398 /* 399 * Use those from the cache if available, read from the mapper for the rest. 400 */ 401 @Override 402 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 403 List<RowId> res = new ArrayList<RowId>(rowIds.size()); 404 // find which are in cache, and which not 405 List<RowId> todo = new LinkedList<RowId>(); 406 for (RowId rowId : rowIds) { 407 Row row = cacheGet(rowId); 408 if (row == null) { 409 if (cacheOnly) { 410 res.add(new RowId(rowId)); 411 } else { 412 todo.add(rowId); 413 } 414 } else if (isAbsent(row)) { 415 res.add(new RowId(rowId)); 416 } else { 417 res.add(row); 418 } 419 } 420 if (!todo.isEmpty()) { 421 final Context context = sorGetTimer.time(); 422 try { 423 // ask missing ones to underlying row mapper 424 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 425 // add them to the cache 426 for (RowId rowId : fetched) { 427 cachePutAbsentIfRowId(rowId); 428 } 429 // merge results 430 res.addAll(fetched); 431 sorRows.inc(fetched.size()); 432 } finally { 433 context.stop(); 434 } 435 } 436 return res; 437 } 438 439 /* 440 * Save in the cache then pass all the writes to the mapper. 441 */ 442 @Override 443 public void write(RowBatch batch) { 444 // we avoid gathering invalidations for a write-only table: fulltext 445 for (Row row : batch.creates) { 446 cachePut(row); 447 if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) { 448 // we need to send modified invalidations for created 449 // fragments because other session's ABSENT fragments have 450 // to be invalidated 451 localInvalidations.addModified(new RowId(row)); 452 } 453 } 454 for (RowUpdate rowu : batch.updates) { 455 cachePut(rowu.row); 456 if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) { 457 localInvalidations.addModified(new RowId(rowu.row)); 458 } 459 } 460 for (RowId rowId : batch.deletes) { 461 if (rowId instanceof Row) { 462 throw new AssertionError(); 463 } 464 cachePutAbsent(rowId); 465 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 466 localInvalidations.addDeleted(rowId); 467 } 468 } 469 for (RowId rowId : batch.deletesDependent) { 470 if (rowId instanceof Row) { 471 throw new AssertionError(); 472 } 473 cachePutAbsent(rowId); 474 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 475 localInvalidations.addDeleted(rowId); 476 } 477 } 478 479 // propagate to underlying mapper 480 rowMapper.write(batch); 481 } 482 483 /* 484 * ----- Read ----- 485 */ 486 487 @Override 488 public Row readSimpleRow(RowId rowId) { 489 Row row = cacheGet(rowId); 490 if (row == null) { 491 row = rowMapper.readSimpleRow(rowId); 492 cachePutAbsentIfNull(rowId, row); 493 return row; 494 } else if (isAbsent(row)) { 495 return null; 496 } else { 497 return row; 498 } 499 } 500 501 @Override 502 public Map<String, String> getBinaryFulltext(RowId rowId) { 503 return rowMapper.getBinaryFulltext(rowId); 504 } 505 506 @Override 507 public Serializable[] readCollectionRowArray(RowId rowId) { 508 Row row = cacheGet(rowId); 509 if (row == null) { 510 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 511 assert array != null; 512 row = new Row(rowId.tableName, rowId.id, array); 513 cachePut(row); 514 return row.values; 515 } else if (isAbsent(row)) { 516 return null; 517 } else { 518 return row.values; 519 } 520 } 521 522 @Override 523 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 524 Serializable criterion, boolean limitToOne) { 525 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 526 for (Row row : rows) { 527 cachePut(row); 528 } 529 return rows; 530 } 531 532 @Override 533 public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) { 534 return rowMapper.readSelectionsIds(selType, values); 535 } 536 537 /* 538 * ----- Copy ----- 539 */ 540 541 @Override 542 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) { 543 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow); 544 Invalidations invalidations = result.invalidations; 545 if (invalidations.modified != null) { 546 for (RowId rowId : invalidations.modified) { 547 cacheRemove(rowId); 548 localInvalidations.addModified(new RowId(rowId)); 549 } 550 } 551 if (invalidations.deleted != null) { 552 for (RowId rowId : invalidations.deleted) { 553 cacheRemove(rowId); 554 localInvalidations.addDeleted(rowId); 555 } 556 } 557 return result; 558 } 559 560 @Override 561 public List<NodeInfo> getDescendantsInfo(Serializable rootId) { 562 return rowMapper.getDescendantsInfo(rootId); 563 } 564 565 @Override 566 public void remove(Serializable rootId, List<NodeInfo> nodeInfos) { 567 rowMapper.remove(rootId, nodeInfos); 568 for (NodeInfo info : nodeInfos) { 569 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) { 570 RowId rowId = new RowId(fragmentName, info.id); 571 cacheRemove(rowId); 572 localInvalidations.addDeleted(rowId); 573 } 574 } 575 // we only put as absent the root fragment, to avoid polluting the cache 576 // with lots of absent info. the rest is removed entirely 577 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId)); 578 } 579 580 @Override 581 public long getCacheSize() { 582 // The unified cache is reported by the cache-size gauge 583 return 0; 584 } 585 586}