001/* 002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012package org.nuxeo.ecm.core.storage.sql; 013 014import java.io.Serializable; 015import java.util.ArrayList; 016import java.util.Arrays; 017import java.util.Collection; 018import java.util.Collections; 019import java.util.LinkedList; 020import java.util.List; 021import java.util.Map; 022 023import javax.transaction.xa.XAException; 024import javax.transaction.xa.Xid; 025 026import org.apache.commons.collections.map.AbstractReferenceMap; 027import org.apache.commons.collections.map.ReferenceMap; 028import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 029import org.nuxeo.runtime.metrics.MetricsService; 030 031import com.codahale.metrics.Counter; 032import com.codahale.metrics.MetricRegistry; 033import com.codahale.metrics.SharedMetricRegistries; 034import com.codahale.metrics.Timer; 035 036/** 037 * A {@link RowMapper} that has an internal cache. 038 * <p> 039 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 040 */ 041public class SoftRefCachingRowMapper implements RowMapper { 042 043 private static final String ABSENT = "__ABSENT__\0\0\0"; 044 045 /** 046 * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be 047 * refetched if needed. 048 * <p> 049 * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT} 050 * to denote a fragment known to be absent from the database. 051 * <p> 052 * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC 053 * collects it. 054 */ 055 // we use a new Row instance for the absent case to avoid keeping other 056 // references to it which would prevent its GCing 057 private final Map<RowId, Row> cache; 058 059 private Model model; 060 061 /** 062 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 063 */ 064 private RowMapper rowMapper; 065 066 /** 067 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 068 * post-commit time. 069 */ 070 private final Invalidations localInvalidations; 071 072 /** 073 * The queue of cache invalidations received from other session, to process at pre-transaction time. 074 */ 075 // public for unit tests 076 public final InvalidationsQueue cacheQueue; 077 078 /** 079 * The propagator of invalidations to other mappers. 080 */ 081 private InvalidationsPropagator cachePropagator; 082 083 /** 084 * Cache statistics 085 * 086 * @since 5.7 087 */ 088 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 089 090 protected Counter cacheHitCount; 091 092 protected Timer cacheGetTimer; 093 094 // sor means system of record (database access) 095 protected Counter sorRows; 096 097 protected Timer sorGetTimer; 098 099 @SuppressWarnings("unchecked") 100 public SoftRefCachingRowMapper() { 101 cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT); 102 localInvalidations = new Invalidations(); 103 cacheQueue = new InvalidationsQueue("mapper-" + this); 104 } 105 106 public void initialize(String repositoryName, Model model, RowMapper rowMapper, 107 InvalidationsPropagator cachePropagator, Map<String, String> properties) { 108 this.model = model; 109 this.rowMapper = rowMapper; 110 this.cachePropagator = cachePropagator; 111 cachePropagator.addQueue(cacheQueue); 112 setMetrics(repositoryName); 113 } 114 115 protected void setMetrics(String repositoryName) { 116 cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 117 "soft-ref", "hits")); 118 cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 119 "soft-ref", "get")); 120 sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref", 121 "sor", "rows")); 122 sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref", 123 "sor", "get")); 124 } 125 126 public void close() { 127 clearCache(); 128 cachePropagator.removeQueue(cacheQueue); 129 } 130 131 @Override 132 public Serializable generateNewId() { 133 return rowMapper.generateNewId(); 134 } 135 136 /* 137 * ----- Cache ----- 138 */ 139 140 protected static boolean isAbsent(Row row) { 141 return row.tableName == ABSENT; // == is ok 142 } 143 144 protected void cachePut(Row row) { 145 row = row.clone(); 146 // for ACL collections, make sure the order is correct 147 // (without the cache, the query to get a list of collection does an 148 // ORDER BY pos, so users of the cache must get the same behavior) 149 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 150 row.values = sortACLRows((ACLRow[]) row.values); 151 } 152 cache.put(new RowId(row), row); 153 } 154 155 protected ACLRow[] sortACLRows(ACLRow[] acls) { 156 List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls)); 157 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 158 ACLRow[] res = new ACLRow[acls.length]; 159 return list.toArray(res); 160 } 161 162 protected void cachePutAbsent(RowId rowId) { 163 cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 164 } 165 166 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 167 if (row != null) { 168 cachePut(row); 169 } else { 170 cachePutAbsent(rowId); 171 } 172 } 173 174 protected void cachePutAbsentIfRowId(RowId rowId) { 175 if (rowId instanceof Row) { 176 cachePut((Row) rowId); 177 } else { 178 cachePutAbsent(rowId); 179 } 180 } 181 182 protected Row cacheGet(RowId rowId) { 183 final Timer.Context context = cacheGetTimer.time(); 184 try { 185 Row row = cache.get(rowId); 186 if (row != null && !isAbsent(row)) { 187 row = row.clone(); 188 } 189 if (row != null) { 190 cacheHitCount.inc(); 191 } 192 return row; 193 } finally { 194 context.stop(); 195 } 196 } 197 198 protected void cacheRemove(RowId rowId) { 199 cache.remove(rowId); 200 } 201 202 /* 203 * ----- Invalidations / Cache Management ----- 204 */ 205 206 @Override 207 public Invalidations receiveInvalidations() { 208 // invalidations from the underlying mapper (cluster) 209 // already propagated to our invalidations queue 210 rowMapper.receiveInvalidations(); 211 212 Invalidations invalidations = cacheQueue.getInvalidations(); 213 214 // invalidate our cache 215 if (invalidations.all) { 216 clearCache(); 217 } 218 if (invalidations.modified != null) { 219 for (RowId rowId : invalidations.modified) { 220 cacheRemove(rowId); 221 } 222 } 223 if (invalidations.deleted != null) { 224 for (RowId rowId : invalidations.deleted) { 225 cachePutAbsent(rowId); 226 } 227 } 228 229 return invalidations.isEmpty() ? null : invalidations; 230 } 231 232 // propagate invalidations 233 @Override 234 public void sendInvalidations(Invalidations invalidations) { 235 // add local invalidations 236 if (!localInvalidations.isEmpty()) { 237 if (invalidations == null) { 238 invalidations = new Invalidations(); 239 } 240 invalidations.add(localInvalidations); 241 localInvalidations.clear(); 242 } 243 244 if (invalidations != null && !invalidations.isEmpty()) { 245 // send to underlying mapper 246 rowMapper.sendInvalidations(invalidations); 247 248 // queue to other local mappers' caches 249 cachePropagator.propagateInvalidations(invalidations, cacheQueue); 250 } 251 } 252 253 @Override 254 public void clearCache() { 255 cache.clear(); 256 sorRows.dec(sorRows.getCount()); 257 localInvalidations.clear(); 258 rowMapper.clearCache(); 259 } 260 261 @Override 262 public long getCacheSize() { 263 return cache.size(); 264 } 265 266 @Override 267 public void rollback(Xid xid) throws XAException { 268 try { 269 rowMapper.rollback(xid); 270 } finally { 271 clearCache(); 272 } 273 } 274 275 /* 276 * ----- Batch ----- 277 */ 278 279 /* 280 * Use those from the cache if available, read from the mapper for the rest. 281 */ 282 @Override 283 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 284 List<RowId> res = new ArrayList<RowId>(rowIds.size()); 285 // find which are in cache, and which not 286 List<RowId> todo = new LinkedList<RowId>(); 287 for (RowId rowId : rowIds) { 288 Row row = cacheGet(rowId); 289 if (row == null) { 290 if (cacheOnly) { 291 res.add(new RowId(rowId)); 292 } else { 293 todo.add(rowId); 294 } 295 } else if (isAbsent(row)) { 296 res.add(new RowId(rowId)); 297 } else { 298 res.add(row); 299 } 300 } 301 if (!todo.isEmpty()) { 302 final Timer.Context context = sorGetTimer.time(); 303 try { 304 // ask missing ones to underlying row mapper 305 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 306 // add them to the cache 307 for (RowId rowId : fetched) { 308 cachePutAbsentIfRowId(rowId); 309 } 310 // merge results 311 res.addAll(fetched); 312 sorRows.inc(fetched.size()); 313 } finally { 314 context.stop(); 315 } 316 } 317 return res; 318 } 319 320 /* 321 * Save in the cache then pass all the writes to the mapper. 322 */ 323 @Override 324 public void write(RowBatch batch) { 325 for (Row row : batch.creates) { 326 cachePut(row); 327 // we need to send modified invalidations for created 328 // fragments because other session's ABSENT fragments have 329 // to be invalidated 330 localInvalidations.addModified(new RowId(row)); 331 } 332 for (RowUpdate rowu : batch.updates) { 333 cachePut(rowu.row); 334 localInvalidations.addModified(new RowId(rowu.row)); 335 } 336 for (RowId rowId : batch.deletes) { 337 if (rowId instanceof Row) { 338 throw new AssertionError(); 339 } 340 cachePutAbsent(rowId); 341 localInvalidations.addDeleted(rowId); 342 } 343 for (RowId rowId : batch.deletesDependent) { 344 if (rowId instanceof Row) { 345 throw new AssertionError(); 346 } 347 cachePutAbsent(rowId); 348 localInvalidations.addDeleted(rowId); 349 } 350 351 // propagate to underlying mapper 352 rowMapper.write(batch); 353 } 354 355 /* 356 * ----- Read ----- 357 */ 358 359 @Override 360 public Row readSimpleRow(RowId rowId) { 361 Row row = cacheGet(rowId); 362 if (row == null) { 363 row = rowMapper.readSimpleRow(rowId); 364 cachePutAbsentIfNull(rowId, row); 365 return row; 366 } else if (isAbsent(row)) { 367 return null; 368 } else { 369 return row; 370 } 371 } 372 373 @Override 374 public Map<String, String> getBinaryFulltext(RowId rowId) { 375 return rowMapper.getBinaryFulltext(rowId); 376 } 377 378 @Override 379 public Serializable[] readCollectionRowArray(RowId rowId) { 380 Row row = cacheGet(rowId); 381 if (row == null) { 382 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 383 assert array != null; 384 row = new Row(rowId.tableName, rowId.id, array); 385 cachePut(row); 386 return row.values; 387 } else if (isAbsent(row)) { 388 return null; 389 } else { 390 return row.values; 391 } 392 } 393 394 @Override 395 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 396 Serializable criterion, boolean limitToOne) { 397 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 398 for (Row row : rows) { 399 cachePut(row); 400 } 401 return rows; 402 } 403 404 /* 405 * ----- Copy ----- 406 */ 407 408 @Override 409 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) { 410 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow); 411 Invalidations invalidations = result.invalidations; 412 if (invalidations.modified != null) { 413 for (RowId rowId : invalidations.modified) { 414 cacheRemove(rowId); 415 localInvalidations.addModified(new RowId(rowId)); 416 } 417 } 418 if (invalidations.deleted != null) { 419 for (RowId rowId : invalidations.deleted) { 420 cacheRemove(rowId); 421 localInvalidations.addDeleted(rowId); 422 } 423 } 424 return result; 425 } 426 427 @Override 428 public List<NodeInfo> remove(NodeInfo rootInfo) { 429 List<NodeInfo> infos = rowMapper.remove(rootInfo); 430 for (NodeInfo info : infos) { 431 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) { 432 RowId rowId = new RowId(fragmentName, info.id); 433 cacheRemove(rowId); 434 localInvalidations.addDeleted(rowId); 435 } 436 } 437 // we only put as absent the root fragment, to avoid polluting the cache 438 // with lots of absent info. the rest is removed entirely 439 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id)); 440 return infos; 441 } 442 443}