001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.SortedMap;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import javax.management.MBeanServer;
034import javax.transaction.SystemException;
035import javax.transaction.Transaction;
036import javax.transaction.TransactionManager;
037import javax.transaction.xa.XAException;
038import javax.transaction.xa.Xid;
039
040import net.sf.ehcache.Cache;
041import net.sf.ehcache.CacheManager;
042import net.sf.ehcache.Element;
043import net.sf.ehcache.management.ManagementService;
044import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
045
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
049import org.nuxeo.runtime.api.Framework;
050import org.nuxeo.runtime.management.ServerLocator;
051import org.nuxeo.runtime.metrics.MetricsService;
052
053import com.codahale.metrics.Counter;
054import com.codahale.metrics.Gauge;
055import com.codahale.metrics.MetricRegistry;
056import com.codahale.metrics.SharedMetricRegistries;
057import com.codahale.metrics.Timer;
058import com.codahale.metrics.Timer.Context;
059
060/**
061 * A {@link RowMapper} that use an unified ehcache.
062 * <p>
063 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
064 */
065public class UnifiedCachingRowMapper implements RowMapper {
066
067    private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class);
068
069    private static final String ABSENT = "__ABSENT__\0\0\0";
070
071    private static CacheManager cacheManager = null;
072
073    protected static boolean isXA;
074
075    private Cache cache;
076
077    private Model model;
078
079    /**
080     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
081     */
082    private RowMapper rowMapper;
083
084    /**
085     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
086     * post-commit time.
087     */
088    private final Invalidations localInvalidations;
089
090    /**
091     * The queue of invalidations received from other session or from the cluster invalidator, to process at
092     * pre-transaction time.
093     */
094    private final InvalidationsQueue invalidationsQueue;
095
096    /**
097     * The propagator of invalidations to other mappers.
098     */
099    private InvalidationsPropagator invalidationsPropagator;
100
101    private static final String CACHE_NAME = "unifiedVCSCache";
102
103    private static final String EHCACHE_FILE_PROP = "ehcacheFilePath";
104
105    private static AtomicInteger rowMapperCount = new AtomicInteger();
106
107    /**
108     * Cache statistics
109     *
110     * @since 5.7
111     */
112    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
113
114    protected Counter cacheHitCount;
115
116    protected Timer cacheGetTimer;
117
118    // sor means system of record (database access)
119    protected Counter sorRows;
120
121    protected Timer sorGetTimer;
122
123    public UnifiedCachingRowMapper() {
124        localInvalidations = new Invalidations();
125        invalidationsQueue = new InvalidationsQueue("mapper-" + this);
126    }
127
128    synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper,
129            InvalidationsPropagator invalidationsPropagator, Map<String, String> properties) {
130        this.model = model;
131        this.rowMapper = rowMapper;
132        this.invalidationsPropagator = invalidationsPropagator;
133        invalidationsPropagator.addQueue(invalidationsQueue);
134        if (cacheManager == null) {
135            if (properties.containsKey(EHCACHE_FILE_PROP)) {
136                String value = properties.get(EHCACHE_FILE_PROP);
137                log.info("Creating ehcache manager for VCS, using ehcache file: " + value);
138                cacheManager = CacheManager.create(value);
139            } else {
140                log.info("Creating ehcache manager for VCS, No ehcache file provided");
141                cacheManager = CacheManager.create();
142            }
143            isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional();
144            // Exposes cache to JMX
145            MBeanServer mBeanServer = Framework.getLocalService(ServerLocator.class).lookupServer();
146            ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true);
147        }
148        rowMapperCount.incrementAndGet();
149        cache = cacheManager.getCache(CACHE_NAME);
150        setMetrics(repositoryName);
151    }
152
153    protected void setMetrics(String repositoryName) {
154        cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
155                "unified", "hits"));
156        cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
157                "unified", "get"));
158        sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
159                "sor", "rows"));
160        sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
161                "sor", "get"));
162        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
163                "cache-size");
164        SortedMap<String, Gauge> gauges = registry.getGauges();
165        if (!gauges.containsKey(gaugeName)) {
166            registry.register(gaugeName, new Gauge<Integer>() {
167                @Override
168                public Integer getValue() {
169                    if (cacheManager != null) {
170                        return cacheManager.getCache(CACHE_NAME).getSize();
171                    }
172                    return 0;
173                }
174            });
175        }
176    }
177
178    public void close() {
179        invalidationsPropagator.removeQueue(invalidationsQueue);
180        rowMapperCount.decrementAndGet();
181    }
182
183    @Override
184    public Serializable generateNewId() {
185        return rowMapper.generateNewId();
186    }
187
188    /*
189     * ----- ehcache -----
190     */
191
192    protected boolean hasTransaction() {
193        TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup();
194        if (transactionManagerLookup == null) {
195            return false;
196        }
197        TransactionManager transactionManager = transactionManagerLookup.getTransactionManager();
198        if (transactionManager == null) {
199            return false;
200        }
201        Transaction transaction;
202        try {
203            transaction = transactionManager.getTransaction();
204        } catch (SystemException e) {
205            throw new RuntimeException(e);
206        }
207        return transaction != null;
208    }
209
210    protected boolean useEhCache() {
211        return !isXA || hasTransaction();
212    }
213
214    protected void ehCachePut(Element element) {
215        if (useEhCache()) {
216            cache.put(element);
217        }
218    }
219
220    protected Element ehCacheGet(Serializable key) {
221        if (useEhCache()) {
222            return cache.get(key);
223        }
224        return null;
225    }
226
227    protected int ehCacheGetSize() {
228        if (useEhCache()) {
229            return cache.getSize();
230        }
231        return 0;
232    }
233
234    protected boolean ehCacheRemove(Serializable key) {
235        if (useEhCache()) {
236            return cache.remove(key);
237        }
238        return false;
239    }
240
241    protected void ehCacheRemoveAll() {
242        if (useEhCache()) {
243            cache.removeAll();
244        }
245    }
246
247    /*
248     * ----- Cache -----
249     */
250
251    protected static boolean isAbsent(Row row) {
252        return row.tableName == ABSENT; // == is ok
253    }
254
255    protected void cachePut(Row row) {
256        row = row.clone();
257        // for ACL collections, make sure the order is correct
258        // (without the cache, the query to get a list of collection does an
259        // ORDER BY pos, so users of the cache must get the same behavior)
260        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
261            row.values = sortACLRows((ACLRow[]) row.values);
262        }
263        Element element = new Element(new RowId(row), row);
264        ehCachePut(element);
265    }
266
267    protected ACLRow[] sortACLRows(ACLRow[] acls) {
268        List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls));
269        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
270        ACLRow[] res = new ACLRow[acls.length];
271        return list.toArray(res);
272    }
273
274    protected void cachePutAbsent(RowId rowId) {
275        Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null));
276        ehCachePut(element);
277    }
278
279    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
280        if (row != null) {
281            cachePut(row);
282        } else {
283            cachePutAbsent(rowId);
284        }
285    }
286
287    protected void cachePutAbsentIfRowId(RowId rowId) {
288        if (rowId instanceof Row) {
289            cachePut((Row) rowId);
290        } else {
291            cachePutAbsent(rowId);
292        }
293    }
294
295    protected Row cacheGet(RowId rowId) {
296        final Context context = cacheGetTimer.time();
297        try {
298            Element element = ehCacheGet(rowId);
299            Row row = null;
300            if (element != null) {
301                row = (Row) element.getObjectValue();
302            }
303            if (row != null && !isAbsent(row)) {
304                row = row.clone();
305            }
306            if (row != null) {
307                cacheHitCount.inc();
308            }
309            return row;
310        } finally {
311            context.stop();
312        }
313    }
314
315    protected void cacheRemove(RowId rowId) {
316        ehCacheRemove(rowId);
317    }
318
319    /*
320     * ----- Invalidations / Cache Management -----
321     */
322
323    @Override
324    public Invalidations receiveInvalidations() {
325        // invalidations from the underlying mapper (cluster)
326        // already propagated to our invalidations queue
327        Invalidations remoteInvals = rowMapper.receiveInvalidations();
328
329        Invalidations ret = invalidationsQueue.getInvalidations();
330
331        if (remoteInvals != null) {
332            if (!ret.all) {
333                // only handle remote invalidations, the cache is shared and transactional
334                if (remoteInvals.modified != null) {
335                    for (RowId rowId : remoteInvals.modified) {
336                        cacheRemove(rowId);
337                    }
338                }
339                if (remoteInvals.deleted != null) {
340                    for (RowId rowId : remoteInvals.deleted) {
341                        cachePutAbsent(rowId);
342                    }
343                }
344            }
345        }
346
347        // invalidate our cache
348        if (ret.all) {
349            clearCache();
350        }
351
352        return ret.isEmpty() ? null : ret;
353    }
354
355    // propagate invalidations
356    @Override
357    public void sendInvalidations(Invalidations invalidations) {
358        // add local invalidations
359        if (!localInvalidations.isEmpty()) {
360            if (invalidations == null) {
361                invalidations = new Invalidations();
362            }
363            invalidations.add(localInvalidations);
364            localInvalidations.clear();
365        }
366
367        if (invalidations != null && !invalidations.isEmpty()) {
368            // send to underlying mapper
369            rowMapper.sendInvalidations(invalidations);
370
371            // queue to other mappers' caches
372            invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue);
373        }
374    }
375
376    @Override
377    public void clearCache() {
378        ehCacheRemoveAll();
379        localInvalidations.clear();
380        rowMapper.clearCache();
381    }
382
383    @Override
384    public void rollback(Xid xid) throws XAException {
385        try {
386            rowMapper.rollback(xid);
387        } finally {
388            ehCacheRemoveAll();
389            localInvalidations.clear();
390        }
391    }
392
393    /*
394     * ----- Batch -----
395     */
396
397    /*
398     * Use those from the cache if available, read from the mapper for the rest.
399     */
400    @Override
401    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
402        List<RowId> res = new ArrayList<RowId>(rowIds.size());
403        // find which are in cache, and which not
404        List<RowId> todo = new LinkedList<RowId>();
405        for (RowId rowId : rowIds) {
406            Row row = cacheGet(rowId);
407            if (row == null) {
408                if (cacheOnly) {
409                    res.add(new RowId(rowId));
410                } else {
411                    todo.add(rowId);
412                }
413            } else if (isAbsent(row)) {
414                res.add(new RowId(rowId));
415            } else {
416                res.add(row);
417            }
418        }
419        if (!todo.isEmpty()) {
420            final Context context = sorGetTimer.time();
421            try {
422                // ask missing ones to underlying row mapper
423                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
424                // add them to the cache
425                for (RowId rowId : fetched) {
426                    cachePutAbsentIfRowId(rowId);
427                }
428                // merge results
429                res.addAll(fetched);
430                sorRows.inc(fetched.size());
431            } finally {
432                context.stop();
433            }
434        }
435        return res;
436    }
437
438    /*
439     * Save in the cache then pass all the writes to the mapper.
440     */
441    @Override
442    public void write(RowBatch batch) {
443        // we avoid gathering invalidations for a write-only table: fulltext
444        for (Row row : batch.creates) {
445            cachePut(row);
446            if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) {
447                // we need to send modified invalidations for created
448                // fragments because other session's ABSENT fragments have
449                // to be invalidated
450                localInvalidations.addModified(new RowId(row));
451            }
452        }
453        for (RowUpdate rowu : batch.updates) {
454            cachePut(rowu.row);
455            if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) {
456                localInvalidations.addModified(new RowId(rowu.row));
457            }
458        }
459        for (RowId rowId : batch.deletes) {
460            if (rowId instanceof Row) {
461                throw new AssertionError();
462            }
463            cachePutAbsent(rowId);
464            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
465                localInvalidations.addDeleted(rowId);
466            }
467        }
468        for (RowId rowId : batch.deletesDependent) {
469            if (rowId instanceof Row) {
470                throw new AssertionError();
471            }
472            cachePutAbsent(rowId);
473            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
474                localInvalidations.addDeleted(rowId);
475            }
476        }
477
478        // propagate to underlying mapper
479        rowMapper.write(batch);
480    }
481
482    /*
483     * ----- Read -----
484     */
485
486    @Override
487    public Row readSimpleRow(RowId rowId) {
488        Row row = cacheGet(rowId);
489        if (row == null) {
490            row = rowMapper.readSimpleRow(rowId);
491            cachePutAbsentIfNull(rowId, row);
492            return row;
493        } else if (isAbsent(row)) {
494            return null;
495        } else {
496            return row;
497        }
498    }
499
500    @Override
501    public Map<String, String> getBinaryFulltext(RowId rowId) {
502        return rowMapper.getBinaryFulltext(rowId);
503    }
504
505    @Override
506    public Serializable[] readCollectionRowArray(RowId rowId) {
507        Row row = cacheGet(rowId);
508        if (row == null) {
509            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
510            assert array != null;
511            row = new Row(rowId.tableName, rowId.id, array);
512            cachePut(row);
513            return row.values;
514        } else if (isAbsent(row)) {
515            return null;
516        } else {
517            return row.values;
518        }
519    }
520
521    @Override
522    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
523            Serializable criterion, boolean limitToOne) {
524        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
525        for (Row row : rows) {
526            cachePut(row);
527        }
528        return rows;
529    }
530
531    /*
532     * ----- Copy -----
533     */
534
535    @Override
536    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) {
537        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow);
538        Invalidations invalidations = result.invalidations;
539        if (invalidations.modified != null) {
540            for (RowId rowId : invalidations.modified) {
541                cacheRemove(rowId);
542                localInvalidations.addModified(new RowId(rowId));
543            }
544        }
545        if (invalidations.deleted != null) {
546            for (RowId rowId : invalidations.deleted) {
547                cacheRemove(rowId);
548                localInvalidations.addDeleted(rowId);
549            }
550        }
551        return result;
552    }
553
554    @Override
555    public List<NodeInfo> remove(NodeInfo rootInfo) {
556        List<NodeInfo> infos = rowMapper.remove(rootInfo);
557        for (NodeInfo info : infos) {
558            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) {
559                RowId rowId = new RowId(fragmentName, info.id);
560                cacheRemove(rowId);
561                localInvalidations.addDeleted(rowId);
562            }
563        }
564        // we only put as absent the root fragment, to avoid polluting the cache
565        // with lots of absent info. the rest is removed entirely
566        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id));
567        return infos;
568    }
569
570    @Override
571    public long getCacheSize() {
572        // The unified cache is reported by the cache-size gauge
573        return 0;
574    }
575
576}