001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import javax.transaction.xa.XAException;
032import javax.transaction.xa.Xid;
033
034import org.apache.commons.collections.map.AbstractReferenceMap;
035import org.apache.commons.collections.map.ReferenceMap;
036import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
037import org.nuxeo.runtime.metrics.MetricsService;
038
039import com.codahale.metrics.Counter;
040import com.codahale.metrics.MetricRegistry;
041import com.codahale.metrics.SharedMetricRegistries;
042import com.codahale.metrics.Timer;
043
044/**
045 * A {@link RowMapper} that has an internal cache.
046 * <p>
047 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
048 */
049public class SoftRefCachingRowMapper implements RowMapper {
050
051    private static final String ABSENT = "__ABSENT__\0\0\0";
052
053    /**
054     * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be
055     * refetched if needed.
056     * <p>
057     * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT}
058     * to denote a fragment known to be absent from the database.
059     * <p>
060     * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC
061     * collects it.
062     */
063    // we use a new Row instance for the absent case to avoid keeping other
064    // references to it which would prevent its GCing
065    private final Map<RowId, Row> cache;
066
067    private Model model;
068
069    /**
070     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
071     */
072    private RowMapper rowMapper;
073
074    /**
075     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
076     * post-commit time.
077     */
078    private final Invalidations localInvalidations;
079
080    /**
081     * The queue of cache invalidations received from other session, to process at pre-transaction time.
082     */
083    // public for unit tests
084    public final InvalidationsQueue cacheQueue;
085
086    /**
087     * The propagator of invalidations to other mappers.
088     */
089    private InvalidationsPropagator cachePropagator;
090
091    /**
092     * Cache statistics
093     *
094     * @since 5.7
095     */
096    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
097
098    protected Counter cacheHitCount;
099
100    protected Timer cacheGetTimer;
101
102    // sor means system of record (database access)
103    protected Counter sorRows;
104
105    protected Timer sorGetTimer;
106
107    @SuppressWarnings("unchecked")
108    public SoftRefCachingRowMapper() {
109        cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT);
110        localInvalidations = new Invalidations();
111        cacheQueue = new InvalidationsQueue("mapper-" + this);
112    }
113
114    public void initialize(String repositoryName, Model model, RowMapper rowMapper,
115            InvalidationsPropagator cachePropagator, Map<String, String> properties) {
116        this.model = model;
117        this.rowMapper = rowMapper;
118        this.cachePropagator = cachePropagator;
119        cachePropagator.addQueue(cacheQueue);
120        setMetrics(repositoryName);
121    }
122
123    protected void setMetrics(String repositoryName) {
124        cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
125                "soft-ref", "hits"));
126        cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
127                "soft-ref", "get"));
128        sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref",
129                "sor", "rows"));
130        sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref",
131                "sor", "get"));
132    }
133
134    public void close() {
135        clearCache();
136        cachePropagator.removeQueue(cacheQueue);
137    }
138
139    @Override
140    public Serializable generateNewId() {
141        return rowMapper.generateNewId();
142    }
143
144    /*
145     * ----- Cache -----
146     */
147
148    protected static boolean isAbsent(Row row) {
149        return row.tableName == ABSENT; // == is ok
150    }
151
152    protected void cachePut(Row row) {
153        row = row.clone();
154        // for ACL collections, make sure the order is correct
155        // (without the cache, the query to get a list of collection does an
156        // ORDER BY pos, so users of the cache must get the same behavior)
157        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
158            row.values = sortACLRows((ACLRow[]) row.values);
159        }
160        cache.put(new RowId(row), row);
161    }
162
163    protected ACLRow[] sortACLRows(ACLRow[] acls) {
164        List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls));
165        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
166        ACLRow[] res = new ACLRow[acls.length];
167        return list.toArray(res);
168    }
169
170    protected void cachePutAbsent(RowId rowId) {
171        cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null));
172    }
173
174    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
175        if (row != null) {
176            cachePut(row);
177        } else {
178            cachePutAbsent(rowId);
179        }
180    }
181
182    protected void cachePutAbsentIfRowId(RowId rowId) {
183        if (rowId instanceof Row) {
184            cachePut((Row) rowId);
185        } else {
186            cachePutAbsent(rowId);
187        }
188    }
189
190    protected Row cacheGet(RowId rowId) {
191        final Timer.Context context = cacheGetTimer.time();
192        try {
193            Row row = cache.get(rowId);
194            if (row != null && !isAbsent(row)) {
195                row = row.clone();
196            }
197            if (row != null) {
198                cacheHitCount.inc();
199            }
200            return row;
201        } finally {
202            context.stop();
203        }
204    }
205
206    protected void cacheRemove(RowId rowId) {
207        cache.remove(rowId);
208    }
209
210    /*
211     * ----- Invalidations / Cache Management -----
212     */
213
214    @Override
215    public Invalidations receiveInvalidations() {
216        // invalidations from the underlying mapper (cluster)
217        // already propagated to our invalidations queue
218        rowMapper.receiveInvalidations();
219
220        Invalidations invalidations = cacheQueue.getInvalidations();
221
222        // invalidate our cache
223        if (invalidations.all) {
224            clearCache();
225        }
226        if (invalidations.modified != null) {
227            for (RowId rowId : invalidations.modified) {
228                cacheRemove(rowId);
229            }
230        }
231        if (invalidations.deleted != null) {
232            for (RowId rowId : invalidations.deleted) {
233                cachePutAbsent(rowId);
234            }
235        }
236
237        return invalidations.isEmpty() ? null : invalidations;
238    }
239
240    // propagate invalidations
241    @Override
242    public void sendInvalidations(Invalidations invalidations) {
243        // add local invalidations
244        if (!localInvalidations.isEmpty()) {
245            if (invalidations == null) {
246                invalidations = new Invalidations();
247            }
248            invalidations.add(localInvalidations);
249            localInvalidations.clear();
250        }
251
252        if (invalidations != null && !invalidations.isEmpty()) {
253            // send to underlying mapper
254            rowMapper.sendInvalidations(invalidations);
255
256            // queue to other local mappers' caches
257            cachePropagator.propagateInvalidations(invalidations, cacheQueue);
258        }
259    }
260
261    @Override
262    public void clearCache() {
263        cache.clear();
264        sorRows.dec(sorRows.getCount());
265        localInvalidations.clear();
266        rowMapper.clearCache();
267    }
268
269    @Override
270    public long getCacheSize() {
271        return cache.size();
272    }
273
274    @Override
275    public void rollback(Xid xid) throws XAException {
276        try {
277            rowMapper.rollback(xid);
278        } finally {
279            clearCache();
280        }
281    }
282
283    /*
284     * ----- Batch -----
285     */
286
287    /*
288     * Use those from the cache if available, read from the mapper for the rest.
289     */
290    @Override
291    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
292        List<RowId> res = new ArrayList<RowId>(rowIds.size());
293        // find which are in cache, and which not
294        List<RowId> todo = new LinkedList<RowId>();
295        for (RowId rowId : rowIds) {
296            Row row = cacheGet(rowId);
297            if (row == null) {
298                if (cacheOnly) {
299                    res.add(new RowId(rowId));
300                } else {
301                    todo.add(rowId);
302                }
303            } else if (isAbsent(row)) {
304                res.add(new RowId(rowId));
305            } else {
306                res.add(row);
307            }
308        }
309        if (!todo.isEmpty()) {
310            final Timer.Context context = sorGetTimer.time();
311            try {
312                // ask missing ones to underlying row mapper
313                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
314                // add them to the cache
315                for (RowId rowId : fetched) {
316                    cachePutAbsentIfRowId(rowId);
317                }
318                // merge results
319                res.addAll(fetched);
320                sorRows.inc(fetched.size());
321            } finally {
322                context.stop();
323            }
324        }
325        return res;
326    }
327
328    /*
329     * Save in the cache then pass all the writes to the mapper.
330     */
331    @Override
332    public void write(RowBatch batch) {
333        for (Row row : batch.creates) {
334            cachePut(row);
335            // we need to send modified invalidations for created
336            // fragments because other session's ABSENT fragments have
337            // to be invalidated
338            localInvalidations.addModified(new RowId(row));
339        }
340        for (RowUpdate rowu : batch.updates) {
341            cachePut(rowu.row);
342            localInvalidations.addModified(new RowId(rowu.row));
343        }
344        for (RowId rowId : batch.deletes) {
345            if (rowId instanceof Row) {
346                throw new AssertionError();
347            }
348            cachePutAbsent(rowId);
349            localInvalidations.addDeleted(rowId);
350        }
351        for (RowId rowId : batch.deletesDependent) {
352            if (rowId instanceof Row) {
353                throw new AssertionError();
354            }
355            cachePutAbsent(rowId);
356            localInvalidations.addDeleted(rowId);
357        }
358
359        // propagate to underlying mapper
360        rowMapper.write(batch);
361    }
362
363    /*
364     * ----- Read -----
365     */
366
367    @Override
368    public Row readSimpleRow(RowId rowId) {
369        Row row = cacheGet(rowId);
370        if (row == null) {
371            row = rowMapper.readSimpleRow(rowId);
372            cachePutAbsentIfNull(rowId, row);
373            return row;
374        } else if (isAbsent(row)) {
375            return null;
376        } else {
377            return row;
378        }
379    }
380
381    @Override
382    public Map<String, String> getBinaryFulltext(RowId rowId) {
383        return rowMapper.getBinaryFulltext(rowId);
384    }
385
386    @Override
387    public Serializable[] readCollectionRowArray(RowId rowId) {
388        Row row = cacheGet(rowId);
389        if (row == null) {
390            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
391            assert array != null;
392            row = new Row(rowId.tableName, rowId.id, array);
393            cachePut(row);
394            return row.values;
395        } else if (isAbsent(row)) {
396            return null;
397        } else {
398            return row.values;
399        }
400    }
401
402    @Override
403    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
404            Serializable criterion, boolean limitToOne) {
405        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
406        for (Row row : rows) {
407            cachePut(row);
408        }
409        return rows;
410    }
411
412    @Override
413    public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) {
414        return rowMapper.readSelectionsIds(selType, values);
415    }
416
417    /*
418     * ----- Copy -----
419     */
420
421    @Override
422    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) {
423        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow);
424        Invalidations invalidations = result.invalidations;
425        if (invalidations.modified != null) {
426            for (RowId rowId : invalidations.modified) {
427                cacheRemove(rowId);
428                localInvalidations.addModified(new RowId(rowId));
429            }
430        }
431        if (invalidations.deleted != null) {
432            for (RowId rowId : invalidations.deleted) {
433                cacheRemove(rowId);
434                localInvalidations.addDeleted(rowId);
435            }
436        }
437        return result;
438    }
439
440    @Override
441    public List<NodeInfo> getDescendantsInfo(Serializable rootId) {
442        return rowMapper.getDescendantsInfo(rootId);
443    }
444
445    @Override
446    public void remove(Serializable rootId, List<NodeInfo> nodeInfos) {
447        rowMapper.remove(rootId, nodeInfos);
448        for (NodeInfo info : nodeInfos) {
449            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) {
450                RowId rowId = new RowId(fragmentName, info.id);
451                cacheRemove(rowId);
452                localInvalidations.addDeleted(rowId);
453            }
454        }
455        // we only put as absent the root fragment, to avoid polluting the cache
456        // with lots of absent info. the rest is removed entirely
457        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId));
458    }
459
460}