001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import javax.transaction.xa.XAException; 032import javax.transaction.xa.Xid; 033 034import org.apache.commons.collections.map.AbstractReferenceMap; 035import org.apache.commons.collections.map.ReferenceMap; 036import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 037import org.nuxeo.runtime.metrics.MetricsService; 038 039import com.codahale.metrics.Counter; 040import com.codahale.metrics.MetricRegistry; 041import com.codahale.metrics.SharedMetricRegistries; 042import com.codahale.metrics.Timer; 043 044/** 045 * A {@link RowMapper} that has an internal cache. 046 * <p> 047 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 048 */ 049public class SoftRefCachingRowMapper implements RowMapper { 050 051 private static final String ABSENT = "__ABSENT__\0\0\0"; 052 053 /** 054 * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be 055 * refetched if needed. 056 * <p> 057 * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT} 058 * to denote a fragment known to be absent from the database. 059 * <p> 060 * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC 061 * collects it. 062 */ 063 // we use a new Row instance for the absent case to avoid keeping other 064 // references to it which would prevent its GCing 065 private final Map<RowId, Row> cache; 066 067 private Model model; 068 069 /** 070 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 071 */ 072 private RowMapper rowMapper; 073 074 /** 075 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 076 * post-commit time. 077 */ 078 private final Invalidations localInvalidations; 079 080 /** 081 * The queue of cache invalidations received from other session, to process at pre-transaction time. 082 */ 083 // public for unit tests 084 public final InvalidationsQueue cacheQueue; 085 086 /** 087 * The propagator of invalidations to other mappers. 088 */ 089 private InvalidationsPropagator cachePropagator; 090 091 /** 092 * Cache statistics 093 * 094 * @since 5.7 095 */ 096 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 097 098 protected Counter cacheHitCount; 099 100 protected Timer cacheGetTimer; 101 102 // sor means system of record (database access) 103 protected Counter sorRows; 104 105 protected Timer sorGetTimer; 106 107 @SuppressWarnings("unchecked") 108 public SoftRefCachingRowMapper() { 109 cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT); 110 localInvalidations = new Invalidations(); 111 cacheQueue = new InvalidationsQueue("mapper-" + this); 112 } 113 114 public void initialize(String repositoryName, Model model, RowMapper rowMapper, 115 InvalidationsPropagator cachePropagator, Map<String, String> properties) { 116 this.model = model; 117 this.rowMapper = rowMapper; 118 this.cachePropagator = cachePropagator; 119 cachePropagator.addQueue(cacheQueue); 120 setMetrics(repositoryName); 121 } 122 123 protected void setMetrics(String repositoryName) { 124 cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 125 "soft-ref", "hits")); 126 cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 127 "soft-ref", "get")); 128 sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref", 129 "sor", "rows")); 130 sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref", 131 "sor", "get")); 132 } 133 134 public void close() { 135 clearCache(); 136 cachePropagator.removeQueue(cacheQueue); 137 } 138 139 @Override 140 public Serializable generateNewId() { 141 return rowMapper.generateNewId(); 142 } 143 144 /* 145 * ----- Cache ----- 146 */ 147 148 protected static boolean isAbsent(Row row) { 149 return row.tableName == ABSENT; // == is ok 150 } 151 152 protected void cachePut(Row row) { 153 row = row.clone(); 154 // for ACL collections, make sure the order is correct 155 // (without the cache, the query to get a list of collection does an 156 // ORDER BY pos, so users of the cache must get the same behavior) 157 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 158 row.values = sortACLRows((ACLRow[]) row.values); 159 } 160 cache.put(new RowId(row), row); 161 } 162 163 protected ACLRow[] sortACLRows(ACLRow[] acls) { 164 List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls)); 165 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 166 ACLRow[] res = new ACLRow[acls.length]; 167 return list.toArray(res); 168 } 169 170 protected void cachePutAbsent(RowId rowId) { 171 cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 172 } 173 174 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 175 if (row != null) { 176 cachePut(row); 177 } else { 178 cachePutAbsent(rowId); 179 } 180 } 181 182 protected void cachePutAbsentIfRowId(RowId rowId) { 183 if (rowId instanceof Row) { 184 cachePut((Row) rowId); 185 } else { 186 cachePutAbsent(rowId); 187 } 188 } 189 190 protected Row cacheGet(RowId rowId) { 191 final Timer.Context context = cacheGetTimer.time(); 192 try { 193 Row row = cache.get(rowId); 194 if (row != null && !isAbsent(row)) { 195 row = row.clone(); 196 } 197 if (row != null) { 198 cacheHitCount.inc(); 199 } 200 return row; 201 } finally { 202 context.stop(); 203 } 204 } 205 206 protected void cacheRemove(RowId rowId) { 207 cache.remove(rowId); 208 } 209 210 /* 211 * ----- Invalidations / Cache Management ----- 212 */ 213 214 @Override 215 public Invalidations receiveInvalidations() { 216 // invalidations from the underlying mapper (cluster) 217 // already propagated to our invalidations queue 218 rowMapper.receiveInvalidations(); 219 220 Invalidations invalidations = cacheQueue.getInvalidations(); 221 222 // invalidate our cache 223 if (invalidations.all) { 224 clearCache(); 225 } 226 if (invalidations.modified != null) { 227 for (RowId rowId : invalidations.modified) { 228 cacheRemove(rowId); 229 } 230 } 231 if (invalidations.deleted != null) { 232 for (RowId rowId : invalidations.deleted) { 233 cachePutAbsent(rowId); 234 } 235 } 236 237 return invalidations.isEmpty() ? null : invalidations; 238 } 239 240 // propagate invalidations 241 @Override 242 public void sendInvalidations(Invalidations invalidations) { 243 // add local invalidations 244 if (!localInvalidations.isEmpty()) { 245 if (invalidations == null) { 246 invalidations = new Invalidations(); 247 } 248 invalidations.add(localInvalidations); 249 localInvalidations.clear(); 250 } 251 252 if (invalidations != null && !invalidations.isEmpty()) { 253 // send to underlying mapper 254 rowMapper.sendInvalidations(invalidations); 255 256 // queue to other local mappers' caches 257 cachePropagator.propagateInvalidations(invalidations, cacheQueue); 258 } 259 } 260 261 @Override 262 public void clearCache() { 263 cache.clear(); 264 sorRows.dec(sorRows.getCount()); 265 localInvalidations.clear(); 266 rowMapper.clearCache(); 267 } 268 269 @Override 270 public long getCacheSize() { 271 return cache.size(); 272 } 273 274 @Override 275 public void rollback(Xid xid) throws XAException { 276 try { 277 rowMapper.rollback(xid); 278 } finally { 279 clearCache(); 280 } 281 } 282 283 /* 284 * ----- Batch ----- 285 */ 286 287 /* 288 * Use those from the cache if available, read from the mapper for the rest. 289 */ 290 @Override 291 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 292 List<RowId> res = new ArrayList<RowId>(rowIds.size()); 293 // find which are in cache, and which not 294 List<RowId> todo = new LinkedList<RowId>(); 295 for (RowId rowId : rowIds) { 296 Row row = cacheGet(rowId); 297 if (row == null) { 298 if (cacheOnly) { 299 res.add(new RowId(rowId)); 300 } else { 301 todo.add(rowId); 302 } 303 } else if (isAbsent(row)) { 304 res.add(new RowId(rowId)); 305 } else { 306 res.add(row); 307 } 308 } 309 if (!todo.isEmpty()) { 310 final Timer.Context context = sorGetTimer.time(); 311 try { 312 // ask missing ones to underlying row mapper 313 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 314 // add them to the cache 315 for (RowId rowId : fetched) { 316 cachePutAbsentIfRowId(rowId); 317 } 318 // merge results 319 res.addAll(fetched); 320 sorRows.inc(fetched.size()); 321 } finally { 322 context.stop(); 323 } 324 } 325 return res; 326 } 327 328 /* 329 * Save in the cache then pass all the writes to the mapper. 330 */ 331 @Override 332 public void write(RowBatch batch) { 333 for (Row row : batch.creates) { 334 cachePut(row); 335 // we need to send modified invalidations for created 336 // fragments because other session's ABSENT fragments have 337 // to be invalidated 338 localInvalidations.addModified(new RowId(row)); 339 } 340 for (RowUpdate rowu : batch.updates) { 341 cachePut(rowu.row); 342 localInvalidations.addModified(new RowId(rowu.row)); 343 } 344 for (RowId rowId : batch.deletes) { 345 if (rowId instanceof Row) { 346 throw new AssertionError(); 347 } 348 cachePutAbsent(rowId); 349 localInvalidations.addDeleted(rowId); 350 } 351 for (RowId rowId : batch.deletesDependent) { 352 if (rowId instanceof Row) { 353 throw new AssertionError(); 354 } 355 cachePutAbsent(rowId); 356 localInvalidations.addDeleted(rowId); 357 } 358 359 // propagate to underlying mapper 360 rowMapper.write(batch); 361 } 362 363 /* 364 * ----- Read ----- 365 */ 366 367 @Override 368 public Row readSimpleRow(RowId rowId) { 369 Row row = cacheGet(rowId); 370 if (row == null) { 371 row = rowMapper.readSimpleRow(rowId); 372 cachePutAbsentIfNull(rowId, row); 373 return row; 374 } else if (isAbsent(row)) { 375 return null; 376 } else { 377 return row; 378 } 379 } 380 381 @Override 382 public Map<String, String> getBinaryFulltext(RowId rowId) { 383 return rowMapper.getBinaryFulltext(rowId); 384 } 385 386 @Override 387 public Serializable[] readCollectionRowArray(RowId rowId) { 388 Row row = cacheGet(rowId); 389 if (row == null) { 390 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 391 assert array != null; 392 row = new Row(rowId.tableName, rowId.id, array); 393 cachePut(row); 394 return row.values; 395 } else if (isAbsent(row)) { 396 return null; 397 } else { 398 return row.values; 399 } 400 } 401 402 @Override 403 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 404 Serializable criterion, boolean limitToOne) { 405 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 406 for (Row row : rows) { 407 cachePut(row); 408 } 409 return rows; 410 } 411 412 @Override 413 public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) { 414 return rowMapper.readSelectionsIds(selType, values); 415 } 416 417 /* 418 * ----- Copy ----- 419 */ 420 421 @Override 422 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) { 423 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow); 424 Invalidations invalidations = result.invalidations; 425 if (invalidations.modified != null) { 426 for (RowId rowId : invalidations.modified) { 427 cacheRemove(rowId); 428 localInvalidations.addModified(new RowId(rowId)); 429 } 430 } 431 if (invalidations.deleted != null) { 432 for (RowId rowId : invalidations.deleted) { 433 cacheRemove(rowId); 434 localInvalidations.addDeleted(rowId); 435 } 436 } 437 return result; 438 } 439 440 @Override 441 public List<NodeInfo> getDescendantsInfo(Serializable rootId) { 442 return rowMapper.getDescendantsInfo(rootId); 443 } 444 445 @Override 446 public void remove(Serializable rootId, List<NodeInfo> nodeInfos) { 447 rowMapper.remove(rootId, nodeInfos); 448 for (NodeInfo info : nodeInfos) { 449 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) { 450 RowId rowId = new RowId(fragmentName, info.id); 451 cacheRemove(rowId); 452 localInvalidations.addDeleted(rowId); 453 } 454 } 455 // we only put as absent the root fragment, to avoid polluting the cache 456 // with lots of absent info. the rest is removed entirely 457 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId)); 458 } 459 460}