001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029
030import javax.transaction.xa.XAException;
031import javax.transaction.xa.Xid;
032
033import org.apache.commons.collections.map.AbstractReferenceMap;
034import org.apache.commons.collections.map.ReferenceMap;
035import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
036import org.nuxeo.runtime.metrics.MetricsService;
037
038import com.codahale.metrics.Counter;
039import com.codahale.metrics.MetricRegistry;
040import com.codahale.metrics.SharedMetricRegistries;
041import com.codahale.metrics.Timer;
042
043/**
044 * A {@link RowMapper} that has an internal cache.
045 * <p>
046 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
047 */
048public class SoftRefCachingRowMapper implements RowMapper {
049
050    private static final String ABSENT = "__ABSENT__\0\0\0";
051
052    /**
053     * The cached rows. All held data is identical to what is present in the underlying {@link RowMapper} and could be
054     * refetched if needed.
055     * <p>
056     * The values are either {@link Row} for fragments present in the database, or a row with tableName {@link #ABSENT}
057     * to denote a fragment known to be absent from the database.
058     * <p>
059     * This cache is memory-sensitive (all values are soft-referenced), a fragment can always be refetched if the GC
060     * collects it.
061     */
062    // we use a new Row instance for the absent case to avoid keeping other
063    // references to it which would prevent its GCing
064    private final Map<RowId, Row> cache;
065
066    private Model model;
067
068    /**
069     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
070     */
071    private RowMapper rowMapper;
072
073    /**
074     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
075     * post-commit time.
076     */
077    private final Invalidations localInvalidations;
078
079    /**
080     * The queue of cache invalidations received from other session, to process at pre-transaction time.
081     */
082    // public for unit tests
083    public final InvalidationsQueue cacheQueue;
084
085    /**
086     * The propagator of invalidations to other mappers.
087     */
088    private InvalidationsPropagator cachePropagator;
089
090    /**
091     * Cache statistics
092     *
093     * @since 5.7
094     */
095    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
096
097    protected Counter cacheHitCount;
098
099    protected Timer cacheGetTimer;
100
101    // sor means system of record (database access)
102    protected Counter sorRows;
103
104    protected Timer sorGetTimer;
105
106    @SuppressWarnings("unchecked")
107    public SoftRefCachingRowMapper() {
108        cache = new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.SOFT);
109        localInvalidations = new Invalidations();
110        cacheQueue = new InvalidationsQueue("mapper-" + this);
111    }
112
113    public void initialize(String repositoryName, Model model, RowMapper rowMapper,
114            InvalidationsPropagator cachePropagator, Map<String, String> properties) {
115        this.model = model;
116        this.rowMapper = rowMapper;
117        this.cachePropagator = cachePropagator;
118        cachePropagator.addQueue(cacheQueue);
119        setMetrics(repositoryName);
120    }
121
122    protected void setMetrics(String repositoryName) {
123        cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
124                "soft-ref", "hits"));
125        cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
126                "soft-ref", "get"));
127        sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref",
128                "sor", "rows"));
129        sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "soft-ref",
130                "sor", "get"));
131    }
132
133    public void close() {
134        clearCache();
135        cachePropagator.removeQueue(cacheQueue);
136    }
137
138    @Override
139    public Serializable generateNewId() {
140        return rowMapper.generateNewId();
141    }
142
143    /*
144     * ----- Cache -----
145     */
146
147    protected static boolean isAbsent(Row row) {
148        return row.tableName == ABSENT; // == is ok
149    }
150
151    protected void cachePut(Row row) {
152        row = row.clone();
153        // for ACL collections, make sure the order is correct
154        // (without the cache, the query to get a list of collection does an
155        // ORDER BY pos, so users of the cache must get the same behavior)
156        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
157            row.values = sortACLRows((ACLRow[]) row.values);
158        }
159        cache.put(new RowId(row), row);
160    }
161
162    protected ACLRow[] sortACLRows(ACLRow[] acls) {
163        List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls));
164        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
165        ACLRow[] res = new ACLRow[acls.length];
166        return list.toArray(res);
167    }
168
169    protected void cachePutAbsent(RowId rowId) {
170        cache.put(new RowId(rowId), new Row(ABSENT, (Serializable) null));
171    }
172
173    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
174        if (row != null) {
175            cachePut(row);
176        } else {
177            cachePutAbsent(rowId);
178        }
179    }
180
181    protected void cachePutAbsentIfRowId(RowId rowId) {
182        if (rowId instanceof Row) {
183            cachePut((Row) rowId);
184        } else {
185            cachePutAbsent(rowId);
186        }
187    }
188
189    protected Row cacheGet(RowId rowId) {
190        final Timer.Context context = cacheGetTimer.time();
191        try {
192            Row row = cache.get(rowId);
193            if (row != null && !isAbsent(row)) {
194                row = row.clone();
195            }
196            if (row != null) {
197                cacheHitCount.inc();
198            }
199            return row;
200        } finally {
201            context.stop();
202        }
203    }
204
205    protected void cacheRemove(RowId rowId) {
206        cache.remove(rowId);
207    }
208
209    /*
210     * ----- Invalidations / Cache Management -----
211     */
212
213    @Override
214    public Invalidations receiveInvalidations() {
215        // invalidations from the underlying mapper (cluster)
216        // already propagated to our invalidations queue
217        rowMapper.receiveInvalidations();
218
219        Invalidations invalidations = cacheQueue.getInvalidations();
220
221        // invalidate our cache
222        if (invalidations.all) {
223            clearCache();
224        }
225        if (invalidations.modified != null) {
226            for (RowId rowId : invalidations.modified) {
227                cacheRemove(rowId);
228            }
229        }
230        if (invalidations.deleted != null) {
231            for (RowId rowId : invalidations.deleted) {
232                cachePutAbsent(rowId);
233            }
234        }
235
236        return invalidations.isEmpty() ? null : invalidations;
237    }
238
239    // propagate invalidations
240    @Override
241    public void sendInvalidations(Invalidations invalidations) {
242        // add local invalidations
243        if (!localInvalidations.isEmpty()) {
244            if (invalidations == null) {
245                invalidations = new Invalidations();
246            }
247            invalidations.add(localInvalidations);
248            localInvalidations.clear();
249        }
250
251        if (invalidations != null && !invalidations.isEmpty()) {
252            // send to underlying mapper
253            rowMapper.sendInvalidations(invalidations);
254
255            // queue to other local mappers' caches
256            cachePropagator.propagateInvalidations(invalidations, cacheQueue);
257        }
258    }
259
260    @Override
261    public void clearCache() {
262        cache.clear();
263        sorRows.dec(sorRows.getCount());
264        localInvalidations.clear();
265        rowMapper.clearCache();
266    }
267
268    @Override
269    public long getCacheSize() {
270        return cache.size();
271    }
272
273    @Override
274    public void rollback(Xid xid) throws XAException {
275        try {
276            rowMapper.rollback(xid);
277        } finally {
278            clearCache();
279        }
280    }
281
282    /*
283     * ----- Batch -----
284     */
285
286    /*
287     * Use those from the cache if available, read from the mapper for the rest.
288     */
289    @Override
290    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
291        List<RowId> res = new ArrayList<RowId>(rowIds.size());
292        // find which are in cache, and which not
293        List<RowId> todo = new LinkedList<RowId>();
294        for (RowId rowId : rowIds) {
295            Row row = cacheGet(rowId);
296            if (row == null) {
297                if (cacheOnly) {
298                    res.add(new RowId(rowId));
299                } else {
300                    todo.add(rowId);
301                }
302            } else if (isAbsent(row)) {
303                res.add(new RowId(rowId));
304            } else {
305                res.add(row);
306            }
307        }
308        if (!todo.isEmpty()) {
309            final Timer.Context context = sorGetTimer.time();
310            try {
311                // ask missing ones to underlying row mapper
312                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
313                // add them to the cache
314                for (RowId rowId : fetched) {
315                    cachePutAbsentIfRowId(rowId);
316                }
317                // merge results
318                res.addAll(fetched);
319                sorRows.inc(fetched.size());
320            } finally {
321                context.stop();
322            }
323        }
324        return res;
325    }
326
327    /*
328     * Save in the cache then pass all the writes to the mapper.
329     */
330    @Override
331    public void write(RowBatch batch) {
332        for (Row row : batch.creates) {
333            cachePut(row);
334            // we need to send modified invalidations for created
335            // fragments because other session's ABSENT fragments have
336            // to be invalidated
337            localInvalidations.addModified(new RowId(row));
338        }
339        for (RowUpdate rowu : batch.updates) {
340            cachePut(rowu.row);
341            localInvalidations.addModified(new RowId(rowu.row));
342        }
343        for (RowId rowId : batch.deletes) {
344            if (rowId instanceof Row) {
345                throw new AssertionError();
346            }
347            cachePutAbsent(rowId);
348            localInvalidations.addDeleted(rowId);
349        }
350        for (RowId rowId : batch.deletesDependent) {
351            if (rowId instanceof Row) {
352                throw new AssertionError();
353            }
354            cachePutAbsent(rowId);
355            localInvalidations.addDeleted(rowId);
356        }
357
358        // propagate to underlying mapper
359        rowMapper.write(batch);
360    }
361
362    /*
363     * ----- Read -----
364     */
365
366    @Override
367    public Row readSimpleRow(RowId rowId) {
368        Row row = cacheGet(rowId);
369        if (row == null) {
370            row = rowMapper.readSimpleRow(rowId);
371            cachePutAbsentIfNull(rowId, row);
372            return row;
373        } else if (isAbsent(row)) {
374            return null;
375        } else {
376            return row;
377        }
378    }
379
380    @Override
381    public Map<String, String> getBinaryFulltext(RowId rowId) {
382        return rowMapper.getBinaryFulltext(rowId);
383    }
384
385    @Override
386    public Serializable[] readCollectionRowArray(RowId rowId) {
387        Row row = cacheGet(rowId);
388        if (row == null) {
389            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
390            assert array != null;
391            row = new Row(rowId.tableName, rowId.id, array);
392            cachePut(row);
393            return row.values;
394        } else if (isAbsent(row)) {
395            return null;
396        } else {
397            return row.values;
398        }
399    }
400
401    @Override
402    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
403            Serializable criterion, boolean limitToOne) {
404        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
405        for (Row row : rows) {
406            cachePut(row);
407        }
408        return rows;
409    }
410
411    /*
412     * ----- Copy -----
413     */
414
415    @Override
416    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) {
417        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow);
418        Invalidations invalidations = result.invalidations;
419        if (invalidations.modified != null) {
420            for (RowId rowId : invalidations.modified) {
421                cacheRemove(rowId);
422                localInvalidations.addModified(new RowId(rowId));
423            }
424        }
425        if (invalidations.deleted != null) {
426            for (RowId rowId : invalidations.deleted) {
427                cacheRemove(rowId);
428                localInvalidations.addDeleted(rowId);
429            }
430        }
431        return result;
432    }
433
434    @Override
435    public List<NodeInfo> remove(NodeInfo rootInfo) {
436        List<NodeInfo> infos = rowMapper.remove(rootInfo);
437        for (NodeInfo info : infos) {
438            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) {
439                RowId rowId = new RowId(fragmentName, info.id);
440                cacheRemove(rowId);
441                localInvalidations.addDeleted(rowId);
442            }
443        }
444        // we only put as absent the root fragment, to avoid polluting the cache
445        // with lots of absent info. the rest is removed entirely
446        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id));
447        return infos;
448    }
449
450}