001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import org.apache.commons.collections.map.AbstractReferenceMap;
032import org.apache.commons.collections.map.ReferenceMap;
033import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
034import org.nuxeo.runtime.metrics.MetricsService;
035
036import io.dropwizard.metrics5.Counter;
037import io.dropwizard.metrics5.MetricName;
038import io.dropwizard.metrics5.MetricRegistry;
039import io.dropwizard.metrics5.SharedMetricRegistries;
040import io.dropwizard.metrics5.Timer;
041
042/**
043 * A {@link RowMapper} that has an internal cache.
044 * <p>
045 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
046 */
047public class SoftRefCachingRowMapper implements RowMapper {
048
049    private static final String ABSENT = "__ABSENT__\0\0\0";
050
051    /**
052     * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be
053     * refetched if needed.
054     * <p>
055     * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT}
056     * to denote a fragment known to be absent from the database.
057     * <p>
058     * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC
059     * collects it.
060     */
061    // we use a new Row instance for the absent case to avoid keeping other
062    // references to it which would prevent its GCing
063    private final Map<RowId, Row> cache;
064
065    private Model model;
066
067    /**
068     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
069     */
070    private RowMapper rowMapper;
071
072    /**
073     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
074     * post-commit time.
075     */
076    private final VCSInvalidations localInvalidations;
077
078    /**
079     * The queue of cache invalidations received from other session, to process at pre-transaction time.
080     */
081    // public for unit tests
082    public final VCSInvalidationsQueue cacheQueue;
083
084    /**
085     * The propagator of invalidations to other mappers.
086     */
087    private VCSInvalidationsPropagator cachePropagator;
088
089    /**
090     * Cache statistics
091     *
092     * @since 5.7
093     */
094    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
095
096    protected Counter cacheHitCount;
097
098    protected Timer cacheGetTimer;
099
100    // sor means system of record (database access)
101    protected Counter sorRows;
102
103    protected Timer sorGetTimer;
104
105    @SuppressWarnings("unchecked")
106    public SoftRefCachingRowMapper() {
107        cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT);
108        localInvalidations = new VCSInvalidations();
109        cacheQueue = new VCSInvalidationsQueue("mapper-" + this);
110    }
111
112    public void initialize(String repositoryName, Model model, RowMapper rowMapper,
113            VCSInvalidationsPropagator cachePropagator, Map<String, String> properties) {
114        this.model = model;
115        this.rowMapper = rowMapper;
116        this.cachePropagator = cachePropagator;
117        cachePropagator.addQueue(cacheQueue);
118        setMetrics(repositoryName);
119    }
120
121    protected void setMetrics(String repositoryName) {
122        cacheHitCount = registry.counter(
123                MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "hit")
124                          .tagged("repository", repositoryName));
125        cacheGetTimer = registry.timer(
126                MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "timer")
127                          .tagged("repository", repositoryName));
128        sorRows = registry.counter(
129                MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "sor", "rows")
130                          .tagged("repository", repositoryName));
131        sorGetTimer = registry.timer(
132                MetricName.build("nuxeo", "repositories", "repository", "cache", "soft-ref", "sor", "timer")
133                          .tagged("repository", repositoryName));
134    }
135
136    public void close() {
137        clearCache();
138        cachePropagator.removeQueue(cacheQueue);
139    }
140
141    @Override
142    public Serializable generateNewId() {
143        return rowMapper.generateNewId();
144    }
145
146    /*
147     * ----- Cache -----
148     */
149
150    protected static boolean isAbsent(Row row) {
151        return row.tableName == ABSENT; // == is ok
152    }
153
154    protected void cachePut(Row row) {
155        row = row.clone();
156        // for ACL collections, make sure the order is correct
157        // (without the cache, the query to get a list of collection does an
158        // ORDER BY pos, so users of the cache must get the same behavior)
159        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
160            row.values = sortACLRows((ACLRow[]) row.values);
161        }
162        cache.put(new RowId(row), row);
163    }
164
165    protected ACLRow[] sortACLRows(ACLRow[] acls) {
166        List<ACLRow> list = new ArrayList<>(Arrays.asList(acls));
167        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
168        ACLRow[] res = new ACLRow[acls.length];
169        return list.toArray(res);
170    }
171
172    protected void cachePutAbsent(RowId rowId) {
173        cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null));
174    }
175
176    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
177        if (row != null) {
178            cachePut(row);
179        } else {
180            cachePutAbsent(rowId);
181        }
182    }
183
184    protected void cachePutAbsentIfRowId(RowId rowId) {
185        if (rowId instanceof Row) {
186            cachePut((Row) rowId);
187        } else {
188            cachePutAbsent(rowId);
189        }
190    }
191
192    @SuppressWarnings("resource") // Time.Context closed by stop()
193    protected Row cacheGet(RowId rowId) {
194        final Timer.Context context = cacheGetTimer.time();
195        try {
196            Row row = cache.get(rowId);
197            if (row != null && !isAbsent(row)) {
198                row = row.clone();
199            }
200            if (row != null) {
201                cacheHitCount.inc();
202            }
203            return row;
204        } finally {
205            context.stop();
206        }
207    }
208
209    protected void cacheRemove(RowId rowId) {
210        cache.remove(rowId);
211    }
212
213    /*
214     * ----- Invalidations / Cache Management -----
215     */
216
217    @Override
218    public VCSInvalidations receiveInvalidations() {
219        // invalidations from the underlying mapper (cluster)
220        // already propagated to our invalidations queue
221        rowMapper.receiveInvalidations();
222
223        VCSInvalidations invalidations = cacheQueue.getInvalidations();
224
225        // invalidate our cache
226        if (invalidations.all) {
227            clearCache();
228        }
229        if (invalidations.modified != null) {
230            for (RowId rowId : invalidations.modified) {
231                cacheRemove(rowId);
232            }
233        }
234        if (invalidations.deleted != null) {
235            for (RowId rowId : invalidations.deleted) {
236                cachePutAbsent(rowId);
237            }
238        }
239
240        return invalidations.isEmpty() ? null : invalidations;
241    }
242
243    // propagate invalidations
244    @Override
245    public void sendInvalidations(VCSInvalidations invalidations) {
246        // add local invalidations
247        if (!localInvalidations.isEmpty()) {
248            if (invalidations == null) {
249                invalidations = new VCSInvalidations();
250            }
251            invalidations.add(localInvalidations);
252            localInvalidations.clear();
253        }
254
255        if (invalidations != null && !invalidations.isEmpty()) {
256            // send to underlying mapper
257            rowMapper.sendInvalidations(invalidations);
258
259            // queue to other local mappers' caches
260            cachePropagator.propagateInvalidations(invalidations, cacheQueue);
261        }
262    }
263
264    @Override
265    public void clearCache() {
266        cache.clear();
267        sorRows.dec(sorRows.getCount());
268        localInvalidations.clear();
269        rowMapper.clearCache();
270    }
271
272    @Override
273    public long getCacheSize() {
274        return cache.size();
275    }
276
277    @Override
278    public void rollback() {
279        try {
280            rowMapper.rollback();
281        } finally {
282            clearCache();
283        }
284    }
285
286    /*
287     * ----- Batch -----
288     */
289
290    /*
291     * Use those from the cache if available, read from the mapper for the rest.
292     */
293    @Override
294    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
295        List<RowId> res = new ArrayList<>(rowIds.size());
296        // find which are in cache, and which not
297        List<RowId> todo = new LinkedList<>();
298        for (RowId rowId : rowIds) {
299            Row row = cacheGet(rowId);
300            if (row == null) {
301                if (cacheOnly) {
302                    res.add(new RowId(rowId));
303                } else {
304                    todo.add(rowId);
305                }
306            } else if (isAbsent(row)) {
307                res.add(new RowId(rowId));
308            } else {
309                res.add(row);
310            }
311        }
312        if (!todo.isEmpty()) {
313            @SuppressWarnings("resource")
314            final Timer.Context context = sorGetTimer.time();
315            try {
316                // ask missing ones to underlying row mapper
317                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
318                // add them to the cache
319                for (RowId rowId : fetched) {
320                    cachePutAbsentIfRowId(rowId);
321                }
322                // merge results
323                res.addAll(fetched);
324                sorRows.inc(fetched.size());
325            } finally {
326                context.stop();
327            }
328        }
329        return res;
330    }
331
332    /*
333     * Save in the cache then pass all the writes to the mapper.
334     */
335    @Override
336    public void write(RowBatch batch) {
337        for (Row row : batch.creates) {
338            cachePut(row);
339            // we need to send modified invalidations for created
340            // fragments because other session's ABSENT fragments have
341            // to be invalidated
342            localInvalidations.addModified(new RowId(row));
343        }
344        for (RowUpdate rowu : batch.updates) {
345            cachePut(rowu.row);
346            localInvalidations.addModified(new RowId(rowu.row));
347        }
348        for (RowId rowId : batch.deletes) {
349            if (rowId instanceof Row) {
350                throw new AssertionError();
351            }
352            cachePutAbsent(rowId);
353            localInvalidations.addDeleted(rowId);
354        }
355        for (RowId rowId : batch.deletesDependent) {
356            if (rowId instanceof Row) {
357                throw new AssertionError();
358            }
359            cachePutAbsent(rowId);
360            localInvalidations.addDeleted(rowId);
361        }
362
363        // propagate to underlying mapper
364        rowMapper.write(batch);
365    }
366
367    /*
368     * ----- Read -----
369     */
370
371    @Override
372    public Row readSimpleRow(RowId rowId) {
373        Row row = cacheGet(rowId);
374        if (row == null) {
375            row = rowMapper.readSimpleRow(rowId);
376            cachePutAbsentIfNull(rowId, row);
377            return row;
378        } else if (isAbsent(row)) {
379            return null;
380        } else {
381            return row;
382        }
383    }
384
385    @Override
386    public Map<String, String> getBinaryFulltext(RowId rowId) {
387        return rowMapper.getBinaryFulltext(rowId);
388    }
389
390    @Override
391    public Serializable[] readCollectionRowArray(RowId rowId) {
392        Row row = cacheGet(rowId);
393        if (row == null) {
394            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
395            assert array != null;
396            row = new Row(rowId.tableName, rowId.id, array);
397            cachePut(row);
398            return row.values;
399        } else if (isAbsent(row)) {
400            return null;
401        } else {
402            return row.values;
403        }
404    }
405
406    @Override
407    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
408            Serializable criterion, boolean limitToOne) {
409        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
410        for (Row row : rows) {
411            cachePut(row);
412        }
413        return rows;
414    }
415
416    @Override
417    public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) {
418        return rowMapper.readSelectionsIds(selType, values);
419    }
420
421    /*
422     * ----- Copy -----
423     */
424
425    @Override
426    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow,
427            boolean excludeSpecialChildren, boolean excludeACL) {
428        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow, excludeSpecialChildren,
429                excludeACL);
430        VCSInvalidations invalidations = result.invalidations;
431        if (invalidations.modified != null) {
432            for (RowId rowId : invalidations.modified) {
433                cacheRemove(rowId);
434                localInvalidations.addModified(new RowId(rowId));
435            }
436        }
437        if (invalidations.deleted != null) {
438            for (RowId rowId : invalidations.deleted) {
439                cacheRemove(rowId);
440                localInvalidations.addDeleted(rowId);
441            }
442        }
443        return result;
444    }
445
446    @Override
447    public List<NodeInfo> getDescendantsInfo(Serializable rootId) {
448        return rowMapper.getDescendantsInfo(rootId);
449    }
450
451    @Override
452    public void remove(Serializable rootId, List<NodeInfo> nodeInfos) {
453        rowMapper.remove(rootId, nodeInfos);
454        for (NodeInfo info : nodeInfos) {
455            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info))) {
456                RowId rowId = new RowId(fragmentName, info.id);
457                cacheRemove(rowId);
458                localInvalidations.addDeleted(rowId);
459            }
460        }
461        // we only put as absent the root fragment, to avoid polluting the cache
462        // with lots of absent info. the rest is removed entirely
463        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId));
464    }
465
466}