001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029 030import javax.transaction.xa.XAException; 031import javax.transaction.xa.Xid; 032 033import org.apache.commons.collections.map.AbstractReferenceMap; 034import org.apache.commons.collections.map.ReferenceMap; 035import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 036import org.nuxeo.runtime.metrics.MetricsService; 037 038import com.codahale.metrics.Counter; 039import com.codahale.metrics.MetricRegistry; 040import com.codahale.metrics.SharedMetricRegistries; 041import com.codahale.metrics.Timer; 042 043/** 044 * A {@link RowMapper} that has an internal cache. 045 * <p> 046 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 047 */ 048public class SoftRefCachingRowMapper implements RowMapper { 049 050 private static final String ABSENT = "__ABSENT__\0\0\0"; 051 052 /** 053 * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be 054 * refetched if needed. 055 * <p> 056 * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT} 057 * to denote a fragment known to be absent from the database. 058 * <p> 059 * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC 060 * collects it. 061 */ 062 // we use a new Row instance for the absent case to avoid keeping other 063 // references to it which would prevent its GCing 064 private final Map<RowId, Row> cache; 065 066 private Model model; 067 068 /** 069 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 070 */ 071 private RowMapper rowMapper; 072 073 /** 074 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 075 * post-commit time. 076 */ 077 private final Invalidations localInvalidations; 078 079 /** 080 * The queue of cache invalidations received from other session, to process at pre-transaction time. 081 */ 082 // public for unit tests 083 public final InvalidationsQueue cacheQueue; 084 085 /** 086 * The propagator of invalidations to other mappers. 087 */ 088 private InvalidationsPropagator cachePropagator; 089 090 /** 091 * Cache statistics 092 * 093 * @since 5.7 094 */ 095 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 096 097 protected Counter cacheHitCount; 098 099 protected Timer cacheGetTimer; 100 101 // sor means system of record (database access) 102 protected Counter sorRows; 103 104 protected Timer sorGetTimer; 105 106 @SuppressWarnings("unchecked") 107 public SoftRefCachingRowMapper() { 108 cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT); 109 localInvalidations = new Invalidations(); 110 cacheQueue = new InvalidationsQueue("mapper-" + this); 111 } 112 113 public void initialize(String repositoryName, Model model, RowMapper rowMapper, 114 InvalidationsPropagator cachePropagator, Map<String, String> properties) { 115 this.model = model; 116 this.rowMapper = rowMapper; 117 this.cachePropagator = cachePropagator; 118 cachePropagator.addQueue(cacheQueue); 119 setMetrics(repositoryName); 120 } 121 122 protected void setMetrics(String repositoryName) { 123 cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 124 "soft-ref", "hits")); 125 cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", 126 "soft-ref", "get")); 127 sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref", 128 "sor", "rows")); 129 sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref", 130 "sor", "get")); 131 } 132 133 public void close() { 134 clearCache(); 135 cachePropagator.removeQueue(cacheQueue); 136 } 137 138 @Override 139 public Serializable generateNewId() { 140 return rowMapper.generateNewId(); 141 } 142 143 /* 144 * ----- Cache ----- 145 */ 146 147 protected static boolean isAbsent(Row row) { 148 return row.tableName == ABSENT; // == is ok 149 } 150 151 protected void cachePut(Row row) { 152 row = row.clone(); 153 // for ACL collections, make sure the order is correct 154 // (without the cache, the query to get a list of collection does an 155 // ORDER BY pos, so users of the cache must get the same behavior) 156 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 157 row.values = sortACLRows((ACLRow[]) row.values); 158 } 159 cache.put(new RowId(row), row); 160 } 161 162 protected ACLRow[] sortACLRows(ACLRow[] acls) { 163 List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls)); 164 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 165 ACLRow[] res = new ACLRow[acls.length]; 166 return list.toArray(res); 167 } 168 169 protected void cachePutAbsent(RowId rowId) { 170 cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 171 } 172 173 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 174 if (row != null) { 175 cachePut(row); 176 } else { 177 cachePutAbsent(rowId); 178 } 179 } 180 181 protected void cachePutAbsentIfRowId(RowId rowId) { 182 if (rowId instanceof Row) { 183 cachePut((Row) rowId); 184 } else { 185 cachePutAbsent(rowId); 186 } 187 } 188 189 protected Row cacheGet(RowId rowId) { 190 final Timer.Context context = cacheGetTimer.time(); 191 try { 192 Row row = cache.get(rowId); 193 if (row != null && !isAbsent(row)) { 194 row = row.clone(); 195 } 196 if (row != null) { 197 cacheHitCount.inc(); 198 } 199 return row; 200 } finally { 201 context.stop(); 202 } 203 } 204 205 protected void cacheRemove(RowId rowId) { 206 cache.remove(rowId); 207 } 208 209 /* 210 * ----- Invalidations / Cache Management ----- 211 */ 212 213 @Override 214 public Invalidations receiveInvalidations() { 215 // invalidations from the underlying mapper (cluster) 216 // already propagated to our invalidations queue 217 rowMapper.receiveInvalidations(); 218 219 Invalidations invalidations = cacheQueue.getInvalidations(); 220 221 // invalidate our cache 222 if (invalidations.all) { 223 clearCache(); 224 } 225 if (invalidations.modified != null) { 226 for (RowId rowId : invalidations.modified) { 227 cacheRemove(rowId); 228 } 229 } 230 if (invalidations.deleted != null) { 231 for (RowId rowId : invalidations.deleted) { 232 cachePutAbsent(rowId); 233 } 234 } 235 236 return invalidations.isEmpty() ? null : invalidations; 237 } 238 239 // propagate invalidations 240 @Override 241 public void sendInvalidations(Invalidations invalidations) { 242 // add local invalidations 243 if (!localInvalidations.isEmpty()) { 244 if (invalidations == null) { 245 invalidations = new Invalidations(); 246 } 247 invalidations.add(localInvalidations); 248 localInvalidations.clear(); 249 } 250 251 if (invalidations != null && !invalidations.isEmpty()) { 252 // send to underlying mapper 253 rowMapper.sendInvalidations(invalidations); 254 255 // queue to other local mappers' caches 256 cachePropagator.propagateInvalidations(invalidations, cacheQueue); 257 } 258 } 259 260 @Override 261 public void clearCache() { 262 cache.clear(); 263 sorRows.dec(sorRows.getCount()); 264 localInvalidations.clear(); 265 rowMapper.clearCache(); 266 } 267 268 @Override 269 public long getCacheSize() { 270 return cache.size(); 271 } 272 273 @Override 274 public void rollback(Xid xid) throws XAException { 275 try { 276 rowMapper.rollback(xid); 277 } finally { 278 clearCache(); 279 } 280 } 281 282 /* 283 * ----- Batch ----- 284 */ 285 286 /* 287 * Use those from the cache if available, read from the mapper for the rest. 288 */ 289 @Override 290 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 291 List<RowId> res = new ArrayList<RowId>(rowIds.size()); 292 // find which are in cache, and which not 293 List<RowId> todo = new LinkedList<RowId>(); 294 for (RowId rowId : rowIds) { 295 Row row = cacheGet(rowId); 296 if (row == null) { 297 if (cacheOnly) { 298 res.add(new RowId(rowId)); 299 } else { 300 todo.add(rowId); 301 } 302 } else if (isAbsent(row)) { 303 res.add(new RowId(rowId)); 304 } else { 305 res.add(row); 306 } 307 } 308 if (!todo.isEmpty()) { 309 final Timer.Context context = sorGetTimer.time(); 310 try { 311 // ask missing ones to underlying row mapper 312 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 313 // add them to the cache 314 for (RowId rowId : fetched) { 315 cachePutAbsentIfRowId(rowId); 316 } 317 // merge results 318 res.addAll(fetched); 319 sorRows.inc(fetched.size()); 320 } finally { 321 context.stop(); 322 } 323 } 324 return res; 325 } 326 327 /* 328 * Save in the cache then pass all the writes to the mapper. 329 */ 330 @Override 331 public void write(RowBatch batch) { 332 for (Row row : batch.creates) { 333 cachePut(row); 334 // we need to send modified invalidations for created 335 // fragments because other session's ABSENT fragments have 336 // to be invalidated 337 localInvalidations.addModified(new RowId(row)); 338 } 339 for (RowUpdate rowu : batch.updates) { 340 cachePut(rowu.row); 341 localInvalidations.addModified(new RowId(rowu.row)); 342 } 343 for (RowId rowId : batch.deletes) { 344 if (rowId instanceof Row) { 345 throw new AssertionError(); 346 } 347 cachePutAbsent(rowId); 348 localInvalidations.addDeleted(rowId); 349 } 350 for (RowId rowId : batch.deletesDependent) { 351 if (rowId instanceof Row) { 352 throw new AssertionError(); 353 } 354 cachePutAbsent(rowId); 355 localInvalidations.addDeleted(rowId); 356 } 357 358 // propagate to underlying mapper 359 rowMapper.write(batch); 360 } 361 362 /* 363 * ----- Read ----- 364 */ 365 366 @Override 367 public Row readSimpleRow(RowId rowId) { 368 Row row = cacheGet(rowId); 369 if (row == null) { 370 row = rowMapper.readSimpleRow(rowId); 371 cachePutAbsentIfNull(rowId, row); 372 return row; 373 } else if (isAbsent(row)) { 374 return null; 375 } else { 376 return row; 377 } 378 } 379 380 @Override 381 public Map<String, String> getBinaryFulltext(RowId rowId) { 382 return rowMapper.getBinaryFulltext(rowId); 383 } 384 385 @Override 386 public Serializable[] readCollectionRowArray(RowId rowId) { 387 Row row = cacheGet(rowId); 388 if (row == null) { 389 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 390 assert array != null; 391 row = new Row(rowId.tableName, rowId.id, array); 392 cachePut(row); 393 return row.values; 394 } else if (isAbsent(row)) { 395 return null; 396 } else { 397 return row.values; 398 } 399 } 400 401 @Override 402 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 403 Serializable criterion, boolean limitToOne) { 404 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 405 for (Row row : rows) { 406 cachePut(row); 407 } 408 return rows; 409 } 410 411 /* 412 * ----- Copy ----- 413 */ 414 415 @Override 416 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) { 417 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow); 418 Invalidations invalidations = result.invalidations; 419 if (invalidations.modified != null) { 420 for (RowId rowId : invalidations.modified) { 421 cacheRemove(rowId); 422 localInvalidations.addModified(new RowId(rowId)); 423 } 424 } 425 if (invalidations.deleted != null) { 426 for (RowId rowId : invalidations.deleted) { 427 cacheRemove(rowId); 428 localInvalidations.addDeleted(rowId); 429 } 430 } 431 return result; 432 } 433 434 @Override 435 public List<NodeInfo> remove(NodeInfo rootInfo) { 436 List<NodeInfo> infos = rowMapper.remove(rootInfo); 437 for (NodeInfo info : infos) { 438 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) { 439 RowId rowId = new RowId(fragmentName, info.id); 440 cacheRemove(rowId); 441 localInvalidations.addDeleted(rowId); 442 } 443 } 444 // we only put as absent the root fragment, to avoid polluting the cache 445 // with lots of absent info. the rest is removed entirely 446 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id)); 447 return infos; 448 } 449 450}