001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.SortedMap; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import javax.management.MBeanServer; 034import javax.transaction.SystemException; 035import javax.transaction.Transaction; 036import javax.transaction.TransactionManager; 037import javax.transaction.xa.XAException; 038import javax.transaction.xa.Xid; 039 040import net.sf.ehcache.Cache; 041import net.sf.ehcache.CacheManager; 042import net.sf.ehcache.Element; 043import net.sf.ehcache.management.ManagementService; 044import net.sf.ehcache.transaction.manager.TransactionManagerLookup; 045 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 049import org.nuxeo.runtime.api.Framework; 050import org.nuxeo.runtime.management.ServerLocator; 051import org.nuxeo.runtime.metrics.MetricsService; 052 053import com.codahale.metrics.Counter; 054import com.codahale.metrics.Gauge; 055import com.codahale.metrics.MetricRegistry; 056import com.codahale.metrics.SharedMetricRegistries; 057import com.codahale.metrics.Timer; 058import com.codahale.metrics.Timer.Context; 059 060/** 061 * A {@link RowMapper} that use an unified ehcache. 062 * <p> 063 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 064 */ 065public class UnifiedCachingRowMapper implements RowMapper { 066 067 private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class); 068 069 private static final String ABSENT = "__ABSENT__\0\0\0"; 070 071 private static CacheManager cacheManager = null; 072 073 protected static boolean isXA; 074 075 private Cache cache; 076 077 private Model model; 078 079 /** 080 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 081 */ 082 private RowMapper rowMapper; 083 084 /** 085 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 086 * post-commit time. 087 */ 088 private final Invalidations localInvalidations; 089 090 /** 091 * The queue of invalidations received from other session or from the cluster invalidator, to process at 092 * pre-transaction time. 093 */ 094 private final InvalidationsQueue invalidationsQueue; 095 096 /** 097 * The propagator of invalidations to other mappers. 098 */ 099 private InvalidationsPropagator invalidationsPropagator; 100 101 private static final String CACHE_NAME = "unifiedVCSCache"; 102 103 private static final String EHCACHE_FILE_PROP = "ehcacheFilePath"; 104 105 private static AtomicInteger rowMapperCount = new AtomicInteger(); 106 107 /** 108 * Cache statistics 109 * 110 * @since 5.7 111 */ 112 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 113 114 protected Counter cacheHitCount; 115 116 protected Timer cacheGetTimer; 117 118 // sor means system of record (database access) 119 protected Counter sorRows; 120 121 protected Timer sorGetTimer; 122 123 public UnifiedCachingRowMapper() { 124 localInvalidations = new Invalidations(); 125 invalidationsQueue = new InvalidationsQueue("mapper-" + this); 126 } 127 128 synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper, 129 InvalidationsPropagator invalidationsPropagator, Map<String, String> properties) { 130 this.model = model; 131 this.rowMapper = rowMapper; 132 this.invalidationsPropagator = invalidationsPropagator; 133 invalidationsPropagator.addQueue(invalidationsQueue); 134 if (cacheManager == null) { 135 if (properties.containsKey(EHCACHE_FILE_PROP)) { 136 String value = properties.get(EHCACHE_FILE_PROP); 137 log.info("Creating ehcache manager for VCS, using ehcache file: " + value); 138 cacheManager = CacheManager.create(value); 139 } else { 140 log.info("Creating ehcache manager for VCS, No ehcache file provided"); 141 cacheManager = CacheManager.create(); 142 } 143 isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional(); 144 // Exposes cache to JMX 145 MBeanServer mBeanServer = Framework.getLocalService(ServerLocator.class).lookupServer(); 146 ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true); 147 } 148 rowMapperCount.incrementAndGet(); 149 cache = cacheManager.getCache(CACHE_NAME); 150 setMetrics(repositoryName); 151 } 152 153 protected void setMetrics(String repositoryName) { 154 cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 155 "unified", "hits")); 156 cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 157 "unified", "get")); 158 sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 159 "sor", "rows")); 160 sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 161 "sor", "get")); 162 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified", 163 "cache-size"); 164 SortedMap<String, Gauge> gauges = registry.getGauges(); 165 if (!gauges.containsKey(gaugeName)) { 166 registry.register(gaugeName, new Gauge<Integer>() { 167 @Override 168 public Integer getValue() { 169 if (cacheManager != null) { 170 return cacheManager.getCache(CACHE_NAME).getSize(); 171 } 172 return 0; 173 } 174 }); 175 } 176 } 177 178 public void close() { 179 invalidationsPropagator.removeQueue(invalidationsQueue); 180 rowMapperCount.decrementAndGet(); 181 } 182 183 @Override 184 public Serializable generateNewId() { 185 return rowMapper.generateNewId(); 186 } 187 188 /* 189 * ----- ehcache ----- 190 */ 191 192 protected boolean hasTransaction() { 193 TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup(); 194 if (transactionManagerLookup == null) { 195 return false; 196 } 197 TransactionManager transactionManager = transactionManagerLookup.getTransactionManager(); 198 if (transactionManager == null) { 199 return false; 200 } 201 Transaction transaction; 202 try { 203 transaction = transactionManager.getTransaction(); 204 } catch (SystemException e) { 205 throw new RuntimeException(e); 206 } 207 return transaction != null; 208 } 209 210 protected boolean useEhCache() { 211 return !isXA || hasTransaction(); 212 } 213 214 protected void ehCachePut(Element element) { 215 if (useEhCache()) { 216 cache.put(element); 217 } 218 } 219 220 protected Element ehCacheGet(Serializable key) { 221 if (useEhCache()) { 222 return cache.get(key); 223 } 224 return null; 225 } 226 227 protected int ehCacheGetSize() { 228 if (useEhCache()) { 229 return cache.getSize(); 230 } 231 return 0; 232 } 233 234 protected boolean ehCacheRemove(Serializable key) { 235 if (useEhCache()) { 236 return cache.remove(key); 237 } 238 return false; 239 } 240 241 protected void ehCacheRemoveAll() { 242 if (useEhCache()) { 243 cache.removeAll(); 244 } 245 } 246 247 /* 248 * ----- Cache ----- 249 */ 250 251 protected static boolean isAbsent(Row row) { 252 return row.tableName == ABSENT; // == is ok 253 } 254 255 protected void cachePut(Row row) { 256 row = row.clone(); 257 // for ACL collections, make sure the order is correct 258 // (without the cache, the query to get a list of collection does an 259 // ORDER BY pos, so users of the cache must get the same behavior) 260 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 261 row.values = sortACLRows((ACLRow[]) row.values); 262 } 263 Element element = new Element(new RowId(row), row); 264 ehCachePut(element); 265 } 266 267 protected ACLRow[] sortACLRows(ACLRow[] acls) { 268 List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls)); 269 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 270 ACLRow[] res = new ACLRow[acls.length]; 271 return list.toArray(res); 272 } 273 274 protected void cachePutAbsent(RowId rowId) { 275 Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 276 ehCachePut(element); 277 } 278 279 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 280 if (row != null) { 281 cachePut(row); 282 } else { 283 cachePutAbsent(rowId); 284 } 285 } 286 287 protected void cachePutAbsentIfRowId(RowId rowId) { 288 if (rowId instanceof Row) { 289 cachePut((Row) rowId); 290 } else { 291 cachePutAbsent(rowId); 292 } 293 } 294 295 protected Row cacheGet(RowId rowId) { 296 final Context context = cacheGetTimer.time(); 297 try { 298 Element element = ehCacheGet(rowId); 299 Row row = null; 300 if (element != null) { 301 row = (Row) element.getObjectValue(); 302 } 303 if (row != null && !isAbsent(row)) { 304 row = row.clone(); 305 } 306 if (row != null) { 307 cacheHitCount.inc(); 308 } 309 return row; 310 } finally { 311 context.stop(); 312 } 313 } 314 315 protected void cacheRemove(RowId rowId) { 316 ehCacheRemove(rowId); 317 } 318 319 /* 320 * ----- Invalidations / Cache Management ----- 321 */ 322 323 @Override 324 public Invalidations receiveInvalidations() { 325 // invalidations from the underlying mapper (cluster) 326 // already propagated to our invalidations queue 327 Invalidations remoteInvals = rowMapper.receiveInvalidations(); 328 329 Invalidations ret = invalidationsQueue.getInvalidations(); 330 331 if (remoteInvals != null) { 332 if (!ret.all) { 333 // only handle remote invalidations, the cache is shared and transactional 334 if (remoteInvals.modified != null) { 335 for (RowId rowId : remoteInvals.modified) { 336 cacheRemove(rowId); 337 } 338 } 339 if (remoteInvals.deleted != null) { 340 for (RowId rowId : remoteInvals.deleted) { 341 cachePutAbsent(rowId); 342 } 343 } 344 } 345 } 346 347 // invalidate our cache 348 if (ret.all) { 349 clearCache(); 350 } 351 352 return ret.isEmpty() ? null : ret; 353 } 354 355 // propagate invalidations 356 @Override 357 public void sendInvalidations(Invalidations invalidations) { 358 // add local invalidations 359 if (!localInvalidations.isEmpty()) { 360 if (invalidations == null) { 361 invalidations = new Invalidations(); 362 } 363 invalidations.add(localInvalidations); 364 localInvalidations.clear(); 365 } 366 367 if (invalidations != null && !invalidations.isEmpty()) { 368 // send to underlying mapper 369 rowMapper.sendInvalidations(invalidations); 370 371 // queue to other mappers' caches 372 invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue); 373 } 374 } 375 376 @Override 377 public void clearCache() { 378 ehCacheRemoveAll(); 379 localInvalidations.clear(); 380 rowMapper.clearCache(); 381 } 382 383 @Override 384 public void rollback(Xid xid) throws XAException { 385 try { 386 rowMapper.rollback(xid); 387 } finally { 388 ehCacheRemoveAll(); 389 localInvalidations.clear(); 390 } 391 } 392 393 /* 394 * ----- Batch ----- 395 */ 396 397 /* 398 * Use those from the cache if available, read from the mapper for the rest. 399 */ 400 @Override 401 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 402 List<RowId> res = new ArrayList<RowId>(rowIds.size()); 403 // find which are in cache, and which not 404 List<RowId> todo = new LinkedList<RowId>(); 405 for (RowId rowId : rowIds) { 406 Row row = cacheGet(rowId); 407 if (row == null) { 408 if (cacheOnly) { 409 res.add(new RowId(rowId)); 410 } else { 411 todo.add(rowId); 412 } 413 } else if (isAbsent(row)) { 414 res.add(new RowId(rowId)); 415 } else { 416 res.add(row); 417 } 418 } 419 if (!todo.isEmpty()) { 420 final Context context = sorGetTimer.time(); 421 try { 422 // ask missing ones to underlying row mapper 423 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 424 // add them to the cache 425 for (RowId rowId : fetched) { 426 cachePutAbsentIfRowId(rowId); 427 } 428 // merge results 429 res.addAll(fetched); 430 sorRows.inc(fetched.size()); 431 } finally { 432 context.stop(); 433 } 434 } 435 return res; 436 } 437 438 /* 439 * Save in the cache then pass all the writes to the mapper. 440 */ 441 @Override 442 public void write(RowBatch batch) { 443 // we avoid gathering invalidations for a write-only table: fulltext 444 for (Row row : batch.creates) { 445 cachePut(row); 446 if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) { 447 // we need to send modified invalidations for created 448 // fragments because other session's ABSENT fragments have 449 // to be invalidated 450 localInvalidations.addModified(new RowId(row)); 451 } 452 } 453 for (RowUpdate rowu : batch.updates) { 454 cachePut(rowu.row); 455 if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) { 456 localInvalidations.addModified(new RowId(rowu.row)); 457 } 458 } 459 for (RowId rowId : batch.deletes) { 460 if (rowId instanceof Row) { 461 throw new AssertionError(); 462 } 463 cachePutAbsent(rowId); 464 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 465 localInvalidations.addDeleted(rowId); 466 } 467 } 468 for (RowId rowId : batch.deletesDependent) { 469 if (rowId instanceof Row) { 470 throw new AssertionError(); 471 } 472 cachePutAbsent(rowId); 473 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 474 localInvalidations.addDeleted(rowId); 475 } 476 } 477 478 // propagate to underlying mapper 479 rowMapper.write(batch); 480 } 481 482 /* 483 * ----- Read ----- 484 */ 485 486 @Override 487 public Row readSimpleRow(RowId rowId) { 488 Row row = cacheGet(rowId); 489 if (row == null) { 490 row = rowMapper.readSimpleRow(rowId); 491 cachePutAbsentIfNull(rowId, row); 492 return row; 493 } else if (isAbsent(row)) { 494 return null; 495 } else { 496 return row; 497 } 498 } 499 500 @Override 501 public Map<String, String> getBinaryFulltext(RowId rowId) { 502 return rowMapper.getBinaryFulltext(rowId); 503 } 504 505 @Override 506 public Serializable[] readCollectionRowArray(RowId rowId) { 507 Row row = cacheGet(rowId); 508 if (row == null) { 509 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 510 assert array != null; 511 row = new Row(rowId.tableName, rowId.id, array); 512 cachePut(row); 513 return row.values; 514 } else if (isAbsent(row)) { 515 return null; 516 } else { 517 return row.values; 518 } 519 } 520 521 @Override 522 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 523 Serializable criterion, boolean limitToOne) { 524 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 525 for (Row row : rows) { 526 cachePut(row); 527 } 528 return rows; 529 } 530 531 /* 532 * ----- Copy ----- 533 */ 534 535 @Override 536 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) { 537 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow); 538 Invalidations invalidations = result.invalidations; 539 if (invalidations.modified != null) { 540 for (RowId rowId : invalidations.modified) { 541 cacheRemove(rowId); 542 localInvalidations.addModified(new RowId(rowId)); 543 } 544 } 545 if (invalidations.deleted != null) { 546 for (RowId rowId : invalidations.deleted) { 547 cacheRemove(rowId); 548 localInvalidations.addDeleted(rowId); 549 } 550 } 551 return result; 552 } 553 554 @Override 555 public List<NodeInfo> remove(NodeInfo rootInfo) { 556 List<NodeInfo> infos = rowMapper.remove(rootInfo); 557 for (NodeInfo info : infos) { 558 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) { 559 RowId rowId = new RowId(fragmentName, info.id); 560 cacheRemove(rowId); 561 localInvalidations.addDeleted(rowId); 562 } 563 } 564 // we only put as absent the root fragment, to avoid polluting the cache 565 // with lots of absent info. the rest is removed entirely 566 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id)); 567 return infos; 568 } 569 570 @Override 571 public long getCacheSize() { 572 // The unified cache is reported by the cache-size gauge 573 return 0; 574 } 575 576}