001/*
002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.ecm.core.storage.sql;
013
014import java.io.Serializable;
015import java.util.ArrayList;
016import java.util.Arrays;
017import java.util.Collection;
018import java.util.Collections;
019import java.util.LinkedList;
020import java.util.List;
021import java.util.Map;
022
023import javax.transaction.xa.XAException;
024import javax.transaction.xa.Xid;
025
026import org.apache.commons.collections.map.AbstractReferenceMap;
027import org.apache.commons.collections.map.ReferenceMap;
028import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
029import org.nuxeo.runtime.metrics.MetricsService;
030
031import com.codahale.metrics.Counter;
032import com.codahale.metrics.MetricRegistry;
033import com.codahale.metrics.SharedMetricRegistries;
034import com.codahale.metrics.Timer;
035
036/**
037 * A {@link RowMapper} that has an internal cache.
038 * <p>
039 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
040 */
041public class SoftRefCachingRowMapper implements RowMapper {
042
043    private static final String ABSENT = "__ABSENT__\0\0\0";
044
045    /**
046     * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be
047     * refetched if needed.
048     * <p>
049     * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT}
050     * to denote a fragment known to be absent from the database.
051     * <p>
052     * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC
053     * collects it.
054     */
055    // we use a new Row instance for the absent case to avoid keeping other
056    // references to it which would prevent its GCing
057    private final Map<RowId, Row> cache;
058
059    private Model model;
060
061    /**
062     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
063     */
064    private RowMapper rowMapper;
065
066    /**
067     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
068     * post-commit time.
069     */
070    private final Invalidations localInvalidations;
071
072    /**
073     * The queue of cache invalidations received from other session, to process at pre-transaction time.
074     */
075    // public for unit tests
076    public final InvalidationsQueue cacheQueue;
077
078    /**
079     * The propagator of invalidations to other mappers.
080     */
081    private InvalidationsPropagator cachePropagator;
082
083    /**
084     * Cache statistics
085     *
086     * @since 5.7
087     */
088    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
089
090    protected Counter cacheHitCount;
091
092    protected Timer cacheGetTimer;
093
094    // sor means system of record (database access)
095    protected Counter sorRows;
096
097    protected Timer sorGetTimer;
098
099    @SuppressWarnings("unchecked")
100    public SoftRefCachingRowMapper() {
101        cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT);
102        localInvalidations = new Invalidations();
103        cacheQueue = new InvalidationsQueue("mapper-" + this);
104    }
105
106    public void initialize(String repositoryName, Model model, RowMapper rowMapper,
107            InvalidationsPropagator cachePropagator, Map<String, String> properties) {
108        this.model = model;
109        this.rowMapper = rowMapper;
110        this.cachePropagator = cachePropagator;
111        cachePropagator.addQueue(cacheQueue);
112        setMetrics(repositoryName);
113    }
114
115    protected void setMetrics(String repositoryName) {
116        cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
117                "soft-ref", "hits"));
118        cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
119                "soft-ref", "get"));
120        sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref",
121                "sor", "rows"));
122        sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref",
123                "sor", "get"));
124    }
125
126    public void close() {
127        clearCache();
128        cachePropagator.removeQueue(cacheQueue);
129    }
130
131    @Override
132    public Serializable generateNewId() {
133        return rowMapper.generateNewId();
134    }
135
136    /*
137     * ----- Cache -----
138     */
139
140    protected static boolean isAbsent(Row row) {
141        return row.tableName == ABSENT; // == is ok
142    }
143
144    protected void cachePut(Row row) {
145        row = row.clone();
146        // for ACL collections, make sure the order is correct
147        // (without the cache, the query to get a list of collection does an
148        // ORDER BY pos, so users of the cache must get the same behavior)
149        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
150            row.values = sortACLRows((ACLRow[]) row.values);
151        }
152        cache.put(new RowId(row), row);
153    }
154
155    protected ACLRow[] sortACLRows(ACLRow[] acls) {
156        List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls));
157        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
158        ACLRow[] res = new ACLRow[acls.length];
159        return list.toArray(res);
160    }
161
162    protected void cachePutAbsent(RowId rowId) {
163        cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null));
164    }
165
166    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
167        if (row != null) {
168            cachePut(row);
169        } else {
170            cachePutAbsent(rowId);
171        }
172    }
173
174    protected void cachePutAbsentIfRowId(RowId rowId) {
175        if (rowId instanceof Row) {
176            cachePut((Row) rowId);
177        } else {
178            cachePutAbsent(rowId);
179        }
180    }
181
182    protected Row cacheGet(RowId rowId) {
183        final Timer.Context context = cacheGetTimer.time();
184        try {
185            Row row = cache.get(rowId);
186            if (row != null && !isAbsent(row)) {
187                row = row.clone();
188            }
189            if (row != null) {
190                cacheHitCount.inc();
191            }
192            return row;
193        } finally {
194            context.stop();
195        }
196    }
197
198    protected void cacheRemove(RowId rowId) {
199        cache.remove(rowId);
200    }
201
202    /*
203     * ----- Invalidations / Cache Management -----
204     */
205
206    @Override
207    public Invalidations receiveInvalidations() {
208        // invalidations from the underlying mapper (cluster)
209        // already propagated to our invalidations queue
210        rowMapper.receiveInvalidations();
211
212        Invalidations invalidations = cacheQueue.getInvalidations();
213
214        // invalidate our cache
215        if (invalidations.all) {
216            clearCache();
217        }
218        if (invalidations.modified != null) {
219            for (RowId rowId : invalidations.modified) {
220                cacheRemove(rowId);
221            }
222        }
223        if (invalidations.deleted != null) {
224            for (RowId rowId : invalidations.deleted) {
225                cachePutAbsent(rowId);
226            }
227        }
228
229        return invalidations.isEmpty() ? null : invalidations;
230    }
231
232    // propagate invalidations
233    @Override
234    public void sendInvalidations(Invalidations invalidations) {
235        // add local invalidations
236        if (!localInvalidations.isEmpty()) {
237            if (invalidations == null) {
238                invalidations = new Invalidations();
239            }
240            invalidations.add(localInvalidations);
241            localInvalidations.clear();
242        }
243
244        if (invalidations != null && !invalidations.isEmpty()) {
245            // send to underlying mapper
246            rowMapper.sendInvalidations(invalidations);
247
248            // queue to other local mappers' caches
249            cachePropagator.propagateInvalidations(invalidations, cacheQueue);
250        }
251    }
252
253    @Override
254    public void clearCache() {
255        cache.clear();
256        sorRows.dec(sorRows.getCount());
257        localInvalidations.clear();
258        rowMapper.clearCache();
259    }
260
261    @Override
262    public long getCacheSize() {
263        return cache.size();
264    }
265
266    @Override
267    public void rollback(Xid xid) throws XAException {
268        try {
269            rowMapper.rollback(xid);
270        } finally {
271            clearCache();
272        }
273    }
274
275    /*
276     * ----- Batch -----
277     */
278
279    /*
280     * Use those from the cache if available, read from the mapper for the rest.
281     */
282    @Override
283    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
284        List<RowId> res = new ArrayList<RowId>(rowIds.size());
285        // find which are in cache, and which not
286        List<RowId> todo = new LinkedList<RowId>();
287        for (RowId rowId : rowIds) {
288            Row row = cacheGet(rowId);
289            if (row == null) {
290                if (cacheOnly) {
291                    res.add(new RowId(rowId));
292                } else {
293                    todo.add(rowId);
294                }
295            } else if (isAbsent(row)) {
296                res.add(new RowId(rowId));
297            } else {
298                res.add(row);
299            }
300        }
301        if (!todo.isEmpty()) {
302            final Timer.Context context = sorGetTimer.time();
303            try {
304                // ask missing ones to underlying row mapper
305                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
306                // add them to the cache
307                for (RowId rowId : fetched) {
308                    cachePutAbsentIfRowId(rowId);
309                }
310                // merge results
311                res.addAll(fetched);
312                sorRows.inc(fetched.size());
313            } finally {
314                context.stop();
315            }
316        }
317        return res;
318    }
319
320    /*
321     * Save in the cache then pass all the writes to the mapper.
322     */
323    @Override
324    public void write(RowBatch batch) {
325        for (Row row : batch.creates) {
326            cachePut(row);
327            // we need to send modified invalidations for created
328            // fragments because other session's ABSENT fragments have
329            // to be invalidated
330            localInvalidations.addModified(new RowId(row));
331        }
332        for (RowUpdate rowu : batch.updates) {
333            cachePut(rowu.row);
334            localInvalidations.addModified(new RowId(rowu.row));
335        }
336        for (RowId rowId : batch.deletes) {
337            if (rowId instanceof Row) {
338                throw new AssertionError();
339            }
340            cachePutAbsent(rowId);
341            localInvalidations.addDeleted(rowId);
342        }
343        for (RowId rowId : batch.deletesDependent) {
344            if (rowId instanceof Row) {
345                throw new AssertionError();
346            }
347            cachePutAbsent(rowId);
348            localInvalidations.addDeleted(rowId);
349        }
350
351        // propagate to underlying mapper
352        rowMapper.write(batch);
353    }
354
355    /*
356     * ----- Read -----
357     */
358
359    @Override
360    public Row readSimpleRow(RowId rowId) {
361        Row row = cacheGet(rowId);
362        if (row == null) {
363            row = rowMapper.readSimpleRow(rowId);
364            cachePutAbsentIfNull(rowId, row);
365            return row;
366        } else if (isAbsent(row)) {
367            return null;
368        } else {
369            return row;
370        }
371    }
372
373    @Override
374    public Map<String, String> getBinaryFulltext(RowId rowId) {
375        return rowMapper.getBinaryFulltext(rowId);
376    }
377
378    @Override
379    public Serializable[] readCollectionRowArray(RowId rowId) {
380        Row row = cacheGet(rowId);
381        if (row == null) {
382            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
383            assert array != null;
384            row = new Row(rowId.tableName, rowId.id, array);
385            cachePut(row);
386            return row.values;
387        } else if (isAbsent(row)) {
388            return null;
389        } else {
390            return row.values;
391        }
392    }
393
394    @Override
395    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
396            Serializable criterion, boolean limitToOne) {
397        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
398        for (Row row : rows) {
399            cachePut(row);
400        }
401        return rows;
402    }
403
404    /*
405     * ----- Copy -----
406     */
407
408    @Override
409    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) {
410        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow);
411        Invalidations invalidations = result.invalidations;
412        if (invalidations.modified != null) {
413            for (RowId rowId : invalidations.modified) {
414                cacheRemove(rowId);
415                localInvalidations.addModified(new RowId(rowId));
416            }
417        }
418        if (invalidations.deleted != null) {
419            for (RowId rowId : invalidations.deleted) {
420                cacheRemove(rowId);
421                localInvalidations.addDeleted(rowId);
422            }
423        }
424        return result;
425    }
426
427    @Override
428    public List<NodeInfo> remove(NodeInfo rootInfo) {
429        List<NodeInfo> infos = rowMapper.remove(rootInfo);
430        for (NodeInfo info : infos) {
431            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) {
432                RowId rowId = new RowId(fragmentName, info.id);
433                cacheRemove(rowId);
434                localInvalidations.addDeleted(rowId);
435            }
436        }
437        // we only put as absent the root fragment, to avoid polluting the cache
438        // with lots of absent info. the rest is removed entirely
439        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id));
440        return infos;
441    }
442
443}