001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.SortedMap; 032import java.util.concurrent.atomic.AtomicInteger; 033 034import javax.management.MBeanServer; 035import javax.transaction.SystemException; 036import javax.transaction.Transaction; 037import javax.transaction.TransactionManager; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.management.ServerLocator; 044import org.nuxeo.runtime.metrics.MetricsService; 045 046import io.dropwizard.metrics5.Counter; 047import io.dropwizard.metrics5.Gauge; 048import io.dropwizard.metrics5.MetricName; 049import io.dropwizard.metrics5.MetricRegistry; 050import io.dropwizard.metrics5.SharedMetricRegistries; 051import io.dropwizard.metrics5.Timer; 052import io.dropwizard.metrics5.Timer.Context; 053 054import net.sf.ehcache.Cache; 055import net.sf.ehcache.CacheManager; 056import net.sf.ehcache.Element; 057import net.sf.ehcache.management.ManagementService; 058import net.sf.ehcache.transaction.manager.TransactionManagerLookup; 059 060/** 061 * A {@link RowMapper} that use an unified ehcache. 062 * <p> 063 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 064 */ 065public class UnifiedCachingRowMapper implements RowMapper { 066 067 private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class); 068 069 private static final String ABSENT = "__ABSENT__\0\0\0"; 070 071 private static CacheManager cacheManager = null; 072 073 protected static boolean isXA; 074 075 private Cache cache; 076 077 private Model model; 078 079 /** 080 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 081 */ 082 private RowMapper rowMapper; 083 084 /** 085 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 086 * post-commit time. 087 */ 088 private final VCSInvalidations localInvalidations; 089 090 /** 091 * The queue of invalidations received from other session or from the cluster invalidator, to process at 092 * pre-transaction time. 093 */ 094 private final VCSInvalidationsQueue invalidationsQueue; 095 096 /** 097 * The propagator of invalidations to other mappers. 098 */ 099 private VCSInvalidationsPropagator invalidationsPropagator; 100 101 private static final String CACHE_NAME = "unifiedVCSCache"; 102 103 private static final String EHCACHE_FILE_PROP = "ehcacheFilePath"; 104 105 private static AtomicInteger rowMapperCount = new AtomicInteger(); 106 107 /** 108 * Cache statistics 109 * 110 * @since 5.7 111 */ 112 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 113 114 protected Counter cacheHitCount; 115 116 protected Timer cacheGetTimer; 117 118 // sor means system of record (database access) 119 protected Counter sorRows; 120 121 protected Timer sorGetTimer; 122 123 public UnifiedCachingRowMapper() { 124 localInvalidations = new VCSInvalidations(); 125 invalidationsQueue = new VCSInvalidationsQueue("mapper-" + this); 126 } 127 128 synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper, 129 VCSInvalidationsPropagator invalidationsPropagator, Map<String, String> properties) { 130 this.model = model; 131 this.rowMapper = rowMapper; 132 this.invalidationsPropagator = invalidationsPropagator; 133 invalidationsPropagator.addQueue(invalidationsQueue); 134 if (cacheManager == null) { 135 if (properties.containsKey(EHCACHE_FILE_PROP)) { 136 String value = properties.get(EHCACHE_FILE_PROP); 137 log.info("Creating ehcache manager for VCS, using ehcache file: " + value); 138 cacheManager = CacheManager.create(value); 139 } else { 140 log.info("Creating ehcache manager for VCS, No ehcache file provided"); 141 cacheManager = CacheManager.create(); 142 } 143 isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional(); 144 // Exposes cache to JMX 145 MBeanServer mBeanServer = Framework.getService(ServerLocator.class).lookupServer(); 146 ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true); 147 } 148 rowMapperCount.incrementAndGet(); 149 cache = cacheManager.getCache(CACHE_NAME); 150 setMetrics(repositoryName); 151 } 152 153 protected void setMetrics(String repositoryName) { 154 cacheHitCount = registry.counter( 155 MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "hit") 156 .tagged("repository", repositoryName)); 157 cacheGetTimer = registry.timer( 158 MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "timer") 159 .tagged("repository", repositoryName)); 160 sorRows = registry.counter( 161 MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "sor", "rows") 162 .tagged("repository", repositoryName)); 163 sorGetTimer = registry.timer( 164 MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "sor", "timer") 165 .tagged("repository", repositoryName)); 166 MetricName gaugeName = MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "size"); 167 @SuppressWarnings("rawtypes") 168 SortedMap<MetricName, Gauge> gauges = registry.getGauges(); 169 if (!gauges.containsKey(gaugeName)) { 170 registry.register(gaugeName, new Gauge<Integer>() { 171 @Override 172 public Integer getValue() { 173 if (cacheManager != null) { 174 return cacheManager.getCache(CACHE_NAME).getSize(); 175 } 176 return 0; 177 } 178 }); 179 } 180 } 181 182 public void close() { 183 invalidationsPropagator.removeQueue(invalidationsQueue); 184 rowMapperCount.decrementAndGet(); 185 } 186 187 @Override 188 public Serializable generateNewId() { 189 return rowMapper.generateNewId(); 190 } 191 192 /* 193 * ----- ehcache ----- 194 */ 195 196 protected boolean hasTransaction() { 197 TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup(); 198 if (transactionManagerLookup == null) { 199 return false; 200 } 201 TransactionManager transactionManager = transactionManagerLookup.getTransactionManager(); 202 if (transactionManager == null) { 203 return false; 204 } 205 Transaction transaction; 206 try { 207 transaction = transactionManager.getTransaction(); 208 } catch (SystemException e) { 209 throw new RuntimeException(e); 210 } 211 return transaction != null; 212 } 213 214 protected boolean useEhCache() { 215 return !isXA || hasTransaction(); 216 } 217 218 protected void ehCachePut(Element element) { 219 if (useEhCache()) { 220 cache.put(element); 221 } 222 } 223 224 protected Element ehCacheGet(Serializable key) { 225 if (useEhCache()) { 226 return cache.get(key); 227 } 228 return null; 229 } 230 231 protected int ehCacheGetSize() { 232 if (useEhCache()) { 233 return cache.getSize(); 234 } 235 return 0; 236 } 237 238 protected boolean ehCacheRemove(Serializable key) { 239 if (useEhCache()) { 240 return cache.remove(key); 241 } 242 return false; 243 } 244 245 protected void ehCacheRemoveAll() { 246 if (useEhCache()) { 247 cache.removeAll(); 248 } 249 } 250 251 /* 252 * ----- Cache ----- 253 */ 254 255 protected static boolean isAbsent(Row row) { 256 return row.tableName == ABSENT; // == is ok 257 } 258 259 protected void cachePut(Row row) { 260 row = row.clone(); 261 // for ACL collections, make sure the order is correct 262 // (without the cache, the query to get a list of collection does an 263 // ORDER BY pos, so users of the cache must get the same behavior) 264 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 265 row.values = sortACLRows((ACLRow[]) row.values); 266 } 267 Element element = new Element(new RowId(row), row); 268 ehCachePut(element); 269 } 270 271 protected ACLRow[] sortACLRows(ACLRow[] acls) { 272 List<ACLRow> list = new ArrayList<>(Arrays.asList(acls)); 273 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 274 ACLRow[] res = new ACLRow[acls.length]; 275 return list.toArray(res); 276 } 277 278 protected void cachePutAbsent(RowId rowId) { 279 Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 280 ehCachePut(element); 281 } 282 283 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 284 if (row != null) { 285 cachePut(row); 286 } else { 287 cachePutAbsent(rowId); 288 } 289 } 290 291 protected void cachePutAbsentIfRowId(RowId rowId) { 292 if (rowId instanceof Row) { 293 cachePut((Row) rowId); 294 } else { 295 cachePutAbsent(rowId); 296 } 297 } 298 299 @SuppressWarnings("resource") // Time.Context closed by stop() 300 protected Row cacheGet(RowId rowId) { 301 final Context context = cacheGetTimer.time(); 302 try { 303 Element element = ehCacheGet(rowId); 304 Row row = null; 305 if (element != null) { 306 row = (Row) element.getObjectValue(); 307 } 308 if (row != null && !isAbsent(row)) { 309 row = row.clone(); 310 } 311 if (row != null) { 312 cacheHitCount.inc(); 313 } 314 return row; 315 } finally { 316 context.stop(); 317 } 318 } 319 320 protected void cacheRemove(RowId rowId) { 321 ehCacheRemove(rowId); 322 } 323 324 /* 325 * ----- Invalidations / Cache Management ----- 326 */ 327 328 @Override 329 public VCSInvalidations receiveInvalidations() { 330 // invalidations from the underlying mapper (cluster) 331 // already propagated to our invalidations queue 332 VCSInvalidations remoteInvals = rowMapper.receiveInvalidations(); 333 334 VCSInvalidations ret = invalidationsQueue.getInvalidations(); 335 336 if (remoteInvals != null) { 337 if (!ret.all) { 338 // only handle remote invalidations, the cache is shared and transactional 339 if (remoteInvals.modified != null) { 340 for (RowId rowId : remoteInvals.modified) { 341 cacheRemove(rowId); 342 } 343 } 344 if (remoteInvals.deleted != null) { 345 for (RowId rowId : remoteInvals.deleted) { 346 cachePutAbsent(rowId); 347 } 348 } 349 } 350 } 351 352 // invalidate our cache 353 if (ret.all) { 354 clearCache(); 355 } 356 357 return ret.isEmpty() ? null : ret; 358 } 359 360 // propagate invalidations 361 @Override 362 public void sendInvalidations(VCSInvalidations invalidations) { 363 // add local invalidations 364 if (!localInvalidations.isEmpty()) { 365 if (invalidations == null) { 366 invalidations = new VCSInvalidations(); 367 } 368 invalidations.add(localInvalidations); 369 localInvalidations.clear(); 370 } 371 372 if (invalidations != null && !invalidations.isEmpty()) { 373 // send to underlying mapper 374 rowMapper.sendInvalidations(invalidations); 375 376 // queue to other mappers' caches 377 invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue); 378 } 379 } 380 381 @Override 382 public void clearCache() { 383 ehCacheRemoveAll(); 384 localInvalidations.clear(); 385 rowMapper.clearCache(); 386 } 387 388 @Override 389 public void rollback() { 390 try { 391 rowMapper.rollback(); 392 } finally { 393 ehCacheRemoveAll(); 394 localInvalidations.clear(); 395 } 396 } 397 398 /* 399 * ----- Batch ----- 400 */ 401 402 /* 403 * Use those from the cache if available, read from the mapper for the rest. 404 */ 405 @Override 406 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 407 List<RowId> res = new ArrayList<>(rowIds.size()); 408 // find which are in cache, and which not 409 List<RowId> todo = new LinkedList<>(); 410 for (RowId rowId : rowIds) { 411 Row row = cacheGet(rowId); 412 if (row == null) { 413 if (cacheOnly) { 414 res.add(new RowId(rowId)); 415 } else { 416 todo.add(rowId); 417 } 418 } else if (isAbsent(row)) { 419 res.add(new RowId(rowId)); 420 } else { 421 res.add(row); 422 } 423 } 424 if (!todo.isEmpty()) { 425 @SuppressWarnings("resource") 426 final Context context = sorGetTimer.time(); 427 try { 428 // ask missing ones to underlying row mapper 429 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 430 // add them to the cache 431 for (RowId rowId : fetched) { 432 cachePutAbsentIfRowId(rowId); 433 } 434 // merge results 435 res.addAll(fetched); 436 sorRows.inc(fetched.size()); 437 } finally { 438 context.stop(); 439 } 440 } 441 return res; 442 } 443 444 /* 445 * Save in the cache then pass all the writes to the mapper. 446 */ 447 @Override 448 public void write(RowBatch batch) { 449 // we avoid gathering invalidations for a write-only table: fulltext 450 for (Row row : batch.creates) { 451 cachePut(row); 452 if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) { 453 // we need to send modified invalidations for created 454 // fragments because other session's ABSENT fragments have 455 // to be invalidated 456 localInvalidations.addModified(new RowId(row)); 457 } 458 } 459 for (RowUpdate rowu : batch.updates) { 460 cachePut(rowu.row); 461 if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) { 462 localInvalidations.addModified(new RowId(rowu.row)); 463 } 464 } 465 for (RowId rowId : batch.deletes) { 466 if (rowId instanceof Row) { 467 throw new AssertionError(); 468 } 469 cachePutAbsent(rowId); 470 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 471 localInvalidations.addDeleted(rowId); 472 } 473 } 474 for (RowId rowId : batch.deletesDependent) { 475 if (rowId instanceof Row) { 476 throw new AssertionError(); 477 } 478 cachePutAbsent(rowId); 479 if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) { 480 localInvalidations.addDeleted(rowId); 481 } 482 } 483 484 // propagate to underlying mapper 485 rowMapper.write(batch); 486 } 487 488 /* 489 * ----- Read ----- 490 */ 491 492 @Override 493 public Row readSimpleRow(RowId rowId) { 494 Row row = cacheGet(rowId); 495 if (row == null) { 496 row = rowMapper.readSimpleRow(rowId); 497 cachePutAbsentIfNull(rowId, row); 498 return row; 499 } else if (isAbsent(row)) { 500 return null; 501 } else { 502 return row; 503 } 504 } 505 506 @Override 507 public Map<String, String> getBinaryFulltext(RowId rowId) { 508 return rowMapper.getBinaryFulltext(rowId); 509 } 510 511 @Override 512 public Serializable[] readCollectionRowArray(RowId rowId) { 513 Row row = cacheGet(rowId); 514 if (row == null) { 515 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 516 assert array != null; 517 row = new Row(rowId.tableName, rowId.id, array); 518 cachePut(row); 519 return row.values; 520 } else if (isAbsent(row)) { 521 return null; 522 } else { 523 return row.values; 524 } 525 } 526 527 @Override 528 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 529 Serializable criterion, boolean limitToOne) { 530 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 531 for (Row row : rows) { 532 cachePut(row); 533 } 534 return rows; 535 } 536 537 @Override 538 public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) { 539 return rowMapper.readSelectionsIds(selType, values); 540 } 541 542 /* 543 * ----- Copy ----- 544 */ 545 546 @Override 547 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow, 548 boolean excludeSpecialChildren, boolean excludeACL) { 549 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow, excludeSpecialChildren, 550 excludeACL); 551 VCSInvalidations invalidations = result.invalidations; 552 if (invalidations.modified != null) { 553 for (RowId rowId : invalidations.modified) { 554 cacheRemove(rowId); 555 localInvalidations.addModified(new RowId(rowId)); 556 } 557 } 558 if (invalidations.deleted != null) { 559 for (RowId rowId : invalidations.deleted) { 560 cacheRemove(rowId); 561 localInvalidations.addDeleted(rowId); 562 } 563 } 564 return result; 565 } 566 567 @Override 568 public List<NodeInfo> getDescendantsInfo(Serializable rootId) { 569 return rowMapper.getDescendantsInfo(rootId); 570 } 571 572 @Override 573 public void remove(Serializable rootId, List<NodeInfo> nodeInfos) { 574 rowMapper.remove(rootId, nodeInfos); 575 for (NodeInfo info : nodeInfos) { 576 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info))) { 577 RowId rowId = new RowId(fragmentName, info.id); 578 cacheRemove(rowId); 579 localInvalidations.addDeleted(rowId); 580 } 581 } 582 // we only put as absent the root fragment, to avoid polluting the cache 583 // with lots of absent info. the rest is removed entirely 584 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId)); 585 } 586 587 @Override 588 public long getCacheSize() { 589 // The unified cache is reported by the cache-size gauge 590 return 0; 591 } 592 593}