001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import org.apache.commons.collections.map.AbstractReferenceMap; 032import org.apache.commons.collections.map.ReferenceMap; 033import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator; 034import org.nuxeo.runtime.metrics.MetricsService; 035 036import io.dropwizard.metrics5.Counter; 037import io.dropwizard.metrics5.MetricName; 038import io.dropwizard.metrics5.MetricRegistry; 039import io.dropwizard.metrics5.SharedMetricRegistries; 040import io.dropwizard.metrics5.Timer; 041 042/** 043 * A {@link RowMapper} that has an internal cache. 044 * <p> 045 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}. 046 */ 047public class SoftRefCachingRowMapper implements RowMapper { 048 049 private static final String ABSENT = "__ABSENT__\0\0\0"; 050 051 /** 052 * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be 053 * refetched if needed. 054 * <p> 055 * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT} 056 * to denote a fragment known to be absent from the database. 057 * <p> 058 * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC 059 * collects it. 060 */ 061 // we use a new Row instance for the absent case to avoid keeping other 062 // references to it which would prevent its GCing 063 private final Map<RowId, Row> cache; 064 065 private Model model; 066 067 /** 068 * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated. 069 */ 070 private RowMapper rowMapper; 071 072 /** 073 * The local invalidations due to writes through this mapper that should be propagated to other sessions at 074 * post-commit time. 075 */ 076 private final VCSInvalidations localInvalidations; 077 078 /** 079 * The queue of cache invalidations received from other session, to process at pre-transaction time. 080 */ 081 // public for unit tests 082 public final VCSInvalidationsQueue cacheQueue; 083 084 /** 085 * The propagator of invalidations to other mappers. 086 */ 087 private VCSInvalidationsPropagator cachePropagator; 088 089 /** 090 * Cache statistics 091 * 092 * @since 5.7 093 */ 094 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 095 096 protected Counter cacheHitCount; 097 098 protected Timer cacheGetTimer; 099 100 // sor means system of record (database access) 101 protected Counter sorRows; 102 103 protected Timer sorGetTimer; 104 105 @SuppressWarnings("unchecked") 106 public SoftRefCachingRowMapper() { 107 cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT); 108 localInvalidations = new VCSInvalidations(); 109 cacheQueue = new VCSInvalidationsQueue("mapper-" + this); 110 } 111 112 public void initialize(String repositoryName, Model model, RowMapper rowMapper, 113 VCSInvalidationsPropagator cachePropagator, Map<String, String> properties) { 114 this.model = model; 115 this.rowMapper = rowMapper; 116 this.cachePropagator = cachePropagator; 117 cachePropagator.addQueue(cacheQueue); 118 setMetrics(repositoryName); 119 } 120 121 protected void setMetrics(String repositoryName) { 122 cacheHitCount = registry.counter( 123 MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "hit") 124 .tagged("repository", repositoryName)); 125 cacheGetTimer = registry.timer( 126 MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "timer") 127 .tagged("repository", repositoryName)); 128 sorRows = registry.counter( 129 MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "sor", "rows") 130 .tagged("repository", repositoryName)); 131 sorGetTimer = registry.timer( 132 MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "sor", "timer") 133 .tagged("repository", repositoryName)); 134 } 135 136 public void close() { 137 clearCache(); 138 cachePropagator.removeQueue(cacheQueue); 139 } 140 141 @Override 142 public Serializable generateNewId() { 143 return rowMapper.generateNewId(); 144 } 145 146 /* 147 * ----- Cache ----- 148 */ 149 150 protected static boolean isAbsent(Row row) { 151 return row.tableName == ABSENT; // == is ok 152 } 153 154 protected void cachePut(Row row) { 155 row = row.clone(); 156 // for ACL collections, make sure the order is correct 157 // (without the cache, the query to get a list of collection does an 158 // ORDER BY pos, so users of the cache must get the same behavior) 159 if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) { 160 row.values = sortACLRows((ACLRow[]) row.values); 161 } 162 cache.put(new RowId(row), row); 163 } 164 165 protected ACLRow[] sortACLRows(ACLRow[] acls) { 166 List<ACLRow> list = new ArrayList<>(Arrays.asList(acls)); 167 Collections.sort(list, ACLRowPositionComparator.INSTANCE); 168 ACLRow[] res = new ACLRow[acls.length]; 169 return list.toArray(res); 170 } 171 172 protected void cachePutAbsent(RowId rowId) { 173 cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null)); 174 } 175 176 protected void cachePutAbsentIfNull(RowId rowId, Row row) { 177 if (row != null) { 178 cachePut(row); 179 } else { 180 cachePutAbsent(rowId); 181 } 182 } 183 184 protected void cachePutAbsentIfRowId(RowId rowId) { 185 if (rowId instanceof Row) { 186 cachePut((Row) rowId); 187 } else { 188 cachePutAbsent(rowId); 189 } 190 } 191 192 @SuppressWarnings("resource") // Time.Context closed by stop() 193 protected Row cacheGet(RowId rowId) { 194 final Timer.Context context = cacheGetTimer.time(); 195 try { 196 Row row = cache.get(rowId); 197 if (row != null && !isAbsent(row)) { 198 row = row.clone(); 199 } 200 if (row != null) { 201 cacheHitCount.inc(); 202 } 203 return row; 204 } finally { 205 context.stop(); 206 } 207 } 208 209 protected void cacheRemove(RowId rowId) { 210 cache.remove(rowId); 211 } 212 213 /* 214 * ----- Invalidations / Cache Management ----- 215 */ 216 217 @Override 218 public VCSInvalidations receiveInvalidations() { 219 // invalidations from the underlying mapper (cluster) 220 // already propagated to our invalidations queue 221 rowMapper.receiveInvalidations(); 222 223 VCSInvalidations invalidations = cacheQueue.getInvalidations(); 224 225 // invalidate our cache 226 if (invalidations.all) { 227 clearCache(); 228 } 229 if (invalidations.modified != null) { 230 for (RowId rowId : invalidations.modified) { 231 cacheRemove(rowId); 232 } 233 } 234 if (invalidations.deleted != null) { 235 for (RowId rowId : invalidations.deleted) { 236 cachePutAbsent(rowId); 237 } 238 } 239 240 return invalidations.isEmpty() ? null : invalidations; 241 } 242 243 // propagate invalidations 244 @Override 245 public void sendInvalidations(VCSInvalidations invalidations) { 246 // add local invalidations 247 if (!localInvalidations.isEmpty()) { 248 if (invalidations == null) { 249 invalidations = new VCSInvalidations(); 250 } 251 invalidations.add(localInvalidations); 252 localInvalidations.clear(); 253 } 254 255 if (invalidations != null && !invalidations.isEmpty()) { 256 // send to underlying mapper 257 rowMapper.sendInvalidations(invalidations); 258 259 // queue to other local mappers' caches 260 cachePropagator.propagateInvalidations(invalidations, cacheQueue); 261 } 262 } 263 264 @Override 265 public void clearCache() { 266 cache.clear(); 267 sorRows.dec(sorRows.getCount()); 268 localInvalidations.clear(); 269 rowMapper.clearCache(); 270 } 271 272 @Override 273 public long getCacheSize() { 274 return cache.size(); 275 } 276 277 @Override 278 public void rollback() { 279 try { 280 rowMapper.rollback(); 281 } finally { 282 clearCache(); 283 } 284 } 285 286 /* 287 * ----- Batch ----- 288 */ 289 290 /* 291 * Use those from the cache if available, read from the mapper for the rest. 292 */ 293 @Override 294 public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) { 295 List<RowId> res = new ArrayList<>(rowIds.size()); 296 // find which are in cache, and which not 297 List<RowId> todo = new LinkedList<>(); 298 for (RowId rowId : rowIds) { 299 Row row = cacheGet(rowId); 300 if (row == null) { 301 if (cacheOnly) { 302 res.add(new RowId(rowId)); 303 } else { 304 todo.add(rowId); 305 } 306 } else if (isAbsent(row)) { 307 res.add(new RowId(rowId)); 308 } else { 309 res.add(row); 310 } 311 } 312 if (!todo.isEmpty()) { 313 @SuppressWarnings("resource") 314 final Timer.Context context = sorGetTimer.time(); 315 try { 316 // ask missing ones to underlying row mapper 317 List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly); 318 // add them to the cache 319 for (RowId rowId : fetched) { 320 cachePutAbsentIfRowId(rowId); 321 } 322 // merge results 323 res.addAll(fetched); 324 sorRows.inc(fetched.size()); 325 } finally { 326 context.stop(); 327 } 328 } 329 return res; 330 } 331 332 /* 333 * Save in the cache then pass all the writes to the mapper. 334 */ 335 @Override 336 public void write(RowBatch batch) { 337 for (Row row : batch.creates) { 338 cachePut(row); 339 // we need to send modified invalidations for created 340 // fragments because other session's ABSENT fragments have 341 // to be invalidated 342 localInvalidations.addModified(new RowId(row)); 343 } 344 for (RowUpdate rowu : batch.updates) { 345 cachePut(rowu.row); 346 localInvalidations.addModified(new RowId(rowu.row)); 347 } 348 for (RowId rowId : batch.deletes) { 349 if (rowId instanceof Row) { 350 throw new AssertionError(); 351 } 352 cachePutAbsent(rowId); 353 localInvalidations.addDeleted(rowId); 354 } 355 for (RowId rowId : batch.deletesDependent) { 356 if (rowId instanceof Row) { 357 throw new AssertionError(); 358 } 359 cachePutAbsent(rowId); 360 localInvalidations.addDeleted(rowId); 361 } 362 363 // propagate to underlying mapper 364 rowMapper.write(batch); 365 } 366 367 /* 368 * ----- Read ----- 369 */ 370 371 @Override 372 public Row readSimpleRow(RowId rowId) { 373 Row row = cacheGet(rowId); 374 if (row == null) { 375 row = rowMapper.readSimpleRow(rowId); 376 cachePutAbsentIfNull(rowId, row); 377 return row; 378 } else if (isAbsent(row)) { 379 return null; 380 } else { 381 return row; 382 } 383 } 384 385 @Override 386 public Map<String, String> getBinaryFulltext(RowId rowId) { 387 return rowMapper.getBinaryFulltext(rowId); 388 } 389 390 @Override 391 public Serializable[] readCollectionRowArray(RowId rowId) { 392 Row row = cacheGet(rowId); 393 if (row == null) { 394 Serializable[] array = rowMapper.readCollectionRowArray(rowId); 395 assert array != null; 396 row = new Row(rowId.tableName, rowId.id, array); 397 cachePut(row); 398 return row.values; 399 } else if (isAbsent(row)) { 400 return null; 401 } else { 402 return row.values; 403 } 404 } 405 406 @Override 407 public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter, 408 Serializable criterion, boolean limitToOne) { 409 List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne); 410 for (Row row : rows) { 411 cachePut(row); 412 } 413 return rows; 414 } 415 416 @Override 417 public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) { 418 return rowMapper.readSelectionsIds(selType, values); 419 } 420 421 /* 422 * ----- Copy ----- 423 */ 424 425 @Override 426 public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow, 427 boolean excludeSpecialChildren, boolean excludeACL) { 428 CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow, excludeSpecialChildren, 429 excludeACL); 430 VCSInvalidations invalidations = result.invalidations; 431 if (invalidations.modified != null) { 432 for (RowId rowId : invalidations.modified) { 433 cacheRemove(rowId); 434 localInvalidations.addModified(new RowId(rowId)); 435 } 436 } 437 if (invalidations.deleted != null) { 438 for (RowId rowId : invalidations.deleted) { 439 cacheRemove(rowId); 440 localInvalidations.addDeleted(rowId); 441 } 442 } 443 return result; 444 } 445 446 @Override 447 public List<NodeInfo> getDescendantsInfo(Serializable rootId) { 448 return rowMapper.getDescendantsInfo(rootId); 449 } 450 451 @Override 452 public void remove(Serializable rootId, List<NodeInfo> nodeInfos) { 453 rowMapper.remove(rootId, nodeInfos); 454 for (NodeInfo info : nodeInfos) { 455 for (String fragmentName : model.getTypeFragments(new IdWithTypes(info))) { 456 RowId rowId = new RowId(fragmentName, info.id); 457 cacheRemove(rowId); 458 localInvalidations.addDeleted(rowId); 459 } 460 } 461 // we only put as absent the root fragment, to avoid polluting the cache 462 // with lots of absent info. the rest is removed entirely 463 cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId)); 464 } 465 466}