001/*
002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 *     Benoit Delbosc
012 */
013package org.nuxeo.ecm.core.storage.sql;
014
015import java.io.Serializable;
016import java.util.ArrayList;
017import java.util.Arrays;
018import java.util.Collection;
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Map;
023import java.util.SortedMap;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import javax.management.MBeanServer;
027import javax.transaction.SystemException;
028import javax.transaction.Transaction;
029import javax.transaction.TransactionManager;
030import javax.transaction.xa.XAException;
031import javax.transaction.xa.Xid;
032
033import net.sf.ehcache.Cache;
034import net.sf.ehcache.CacheManager;
035import net.sf.ehcache.Element;
036import net.sf.ehcache.management.ManagementService;
037import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.management.ServerLocator;
044import org.nuxeo.runtime.metrics.MetricsService;
045
046import com.codahale.metrics.Counter;
047import com.codahale.metrics.Gauge;
048import com.codahale.metrics.MetricRegistry;
049import com.codahale.metrics.SharedMetricRegistries;
050import com.codahale.metrics.Timer;
051import com.codahale.metrics.Timer.Context;
052
053/**
054 * A {@link RowMapper} that use an unified ehcache.
055 * <p>
056 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
057 */
058public class UnifiedCachingRowMapper implements RowMapper {
059
060    private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class);
061
062    private static final String ABSENT = "__ABSENT__\0\0\0";
063
064    private static CacheManager cacheManager = null;
065
066    protected static boolean isXA;
067
068    private Cache cache;
069
070    private Model model;
071
072    /**
073     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
074     */
075    private RowMapper rowMapper;
076
077    /**
078     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
079     * post-commit time.
080     */
081    private final Invalidations localInvalidations;
082
083    /**
084     * The queue of invalidations received from other session or from the cluster invalidator, to process at
085     * pre-transaction time.
086     */
087    private final InvalidationsQueue invalidationsQueue;
088
089    /**
090     * The propagator of invalidations to other mappers.
091     */
092    private InvalidationsPropagator invalidationsPropagator;
093
094    private static final String CACHE_NAME = "unifiedVCSCache";
095
096    private static final String EHCACHE_FILE_PROP = "ehcacheFilePath";
097
098    private static AtomicInteger rowMapperCount = new AtomicInteger();
099
100    /**
101     * Cache statistics
102     *
103     * @since 5.7
104     */
105    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
106
107    protected Counter cacheHitCount;
108
109    protected Timer cacheGetTimer;
110
111    // sor means system of record (database access)
112    protected Counter sorRows;
113
114    protected Timer sorGetTimer;
115
116    public UnifiedCachingRowMapper() {
117        localInvalidations = new Invalidations();
118        invalidationsQueue = new InvalidationsQueue("mapper-" + this);
119    }
120
121    synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper,
122            InvalidationsPropagator invalidationsPropagator, Map<String, String> properties) {
123        this.model = model;
124        this.rowMapper = rowMapper;
125        this.invalidationsPropagator = invalidationsPropagator;
126        invalidationsPropagator.addQueue(invalidationsQueue);
127        if (cacheManager == null) {
128            if (properties.containsKey(EHCACHE_FILE_PROP)) {
129                String value = properties.get(EHCACHE_FILE_PROP);
130                log.info("Creating ehcache manager for VCS, using ehcache file: " + value);
131                cacheManager = CacheManager.create(value);
132            } else {
133                log.info("Creating ehcache manager for VCS, No ehcache file provided");
134                cacheManager = CacheManager.create();
135            }
136            isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional();
137            // Exposes cache to JMX
138            MBeanServer mBeanServer = Framework.getLocalService(ServerLocator.class).lookupServer();
139            ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true);
140        }
141        rowMapperCount.incrementAndGet();
142        cache = cacheManager.getCache(CACHE_NAME);
143        setMetrics(repositoryName);
144    }
145
146    protected void setMetrics(String repositoryName) {
147        cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
148                "unified", "hits"));
149        cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
150                "unified", "get"));
151        sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
152                "sor", "rows"));
153        sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
154                "sor", "get"));
155        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
156                "cache-size");
157        SortedMap<String, Gauge> gauges = registry.getGauges();
158        if (!gauges.containsKey(gaugeName)) {
159            registry.register(gaugeName, new Gauge<Integer>() {
160                @Override
161                public Integer getValue() {
162                    if (cacheManager != null) {
163                        return cacheManager.getCache(CACHE_NAME).getSize();
164                    }
165                    return 0;
166                }
167            });
168        }
169    }
170
171    public void close() {
172        invalidationsPropagator.removeQueue(invalidationsQueue);
173        rowMapperCount.decrementAndGet();
174    }
175
176    @Override
177    public Serializable generateNewId() {
178        return rowMapper.generateNewId();
179    }
180
181    /*
182     * ----- ehcache -----
183     */
184
185    protected boolean hasTransaction() {
186        TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup();
187        if (transactionManagerLookup == null) {
188            return false;
189        }
190        TransactionManager transactionManager = transactionManagerLookup.getTransactionManager();
191        if (transactionManager == null) {
192            return false;
193        }
194        Transaction transaction;
195        try {
196            transaction = transactionManager.getTransaction();
197        } catch (SystemException e) {
198            throw new RuntimeException(e);
199        }
200        return transaction != null;
201    }
202
203    protected boolean useEhCache() {
204        return !isXA || hasTransaction();
205    }
206
207    protected void ehCachePut(Element element) {
208        if (useEhCache()) {
209            cache.put(element);
210        }
211    }
212
213    protected Element ehCacheGet(Serializable key) {
214        if (useEhCache()) {
215            return cache.get(key);
216        }
217        return null;
218    }
219
220    protected int ehCacheGetSize() {
221        if (useEhCache()) {
222            return cache.getSize();
223        }
224        return 0;
225    }
226
227    protected boolean ehCacheRemove(Serializable key) {
228        if (useEhCache()) {
229            return cache.remove(key);
230        }
231        return false;
232    }
233
234    protected void ehCacheRemoveAll() {
235        if (useEhCache()) {
236            cache.removeAll();
237        }
238    }
239
240    /*
241     * ----- Cache -----
242     */
243
244    protected static boolean isAbsent(Row row) {
245        return row.tableName == ABSENT; // == is ok
246    }
247
248    protected void cachePut(Row row) {
249        row = row.clone();
250        // for ACL collections, make sure the order is correct
251        // (without the cache, the query to get a list of collection does an
252        // ORDER BY pos, so users of the cache must get the same behavior)
253        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
254            row.values = sortACLRows((ACLRow[]) row.values);
255        }
256        Element element = new Element(new RowId(row), row);
257        ehCachePut(element);
258    }
259
260    protected ACLRow[] sortACLRows(ACLRow[] acls) {
261        List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls));
262        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
263        ACLRow[] res = new ACLRow[acls.length];
264        return list.toArray(res);
265    }
266
267    protected void cachePutAbsent(RowId rowId) {
268        Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null));
269        ehCachePut(element);
270    }
271
272    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
273        if (row != null) {
274            cachePut(row);
275        } else {
276            cachePutAbsent(rowId);
277        }
278    }
279
280    protected void cachePutAbsentIfRowId(RowId rowId) {
281        if (rowId instanceof Row) {
282            cachePut((Row) rowId);
283        } else {
284            cachePutAbsent(rowId);
285        }
286    }
287
288    protected Row cacheGet(RowId rowId) {
289        final Context context = cacheGetTimer.time();
290        try {
291            Element element = ehCacheGet(rowId);
292            Row row = null;
293            if (element != null) {
294                row = (Row) element.getObjectValue();
295            }
296            if (row != null && !isAbsent(row)) {
297                row = row.clone();
298            }
299            if (row != null) {
300                cacheHitCount.inc();
301            }
302            return row;
303        } finally {
304            context.stop();
305        }
306    }
307
308    protected void cacheRemove(RowId rowId) {
309        ehCacheRemove(rowId);
310    }
311
312    /*
313     * ----- Invalidations / Cache Management -----
314     */
315
316    @Override
317    public Invalidations receiveInvalidations() {
318        // invalidations from the underlying mapper (cluster)
319        // already propagated to our invalidations queue
320        Invalidations remoteInvals = rowMapper.receiveInvalidations();
321
322        Invalidations ret = invalidationsQueue.getInvalidations();
323
324        if (remoteInvals != null) {
325            if (!ret.all) {
326                // only handle remote invalidations, the cache is shared and transactional
327                if (remoteInvals.modified != null) {
328                    for (RowId rowId : remoteInvals.modified) {
329                        cacheRemove(rowId);
330                    }
331                }
332                if (remoteInvals.deleted != null) {
333                    for (RowId rowId : remoteInvals.deleted) {
334                        cachePutAbsent(rowId);
335                    }
336                }
337            }
338        }
339
340        // invalidate our cache
341        if (ret.all) {
342            clearCache();
343        }
344
345        return ret.isEmpty() ? null : ret;
346    }
347
348    // propagate invalidations
349    @Override
350    public void sendInvalidations(Invalidations invalidations) {
351        // add local invalidations
352        if (!localInvalidations.isEmpty()) {
353            if (invalidations == null) {
354                invalidations = new Invalidations();
355            }
356            invalidations.add(localInvalidations);
357            localInvalidations.clear();
358        }
359
360        if (invalidations != null && !invalidations.isEmpty()) {
361            // send to underlying mapper
362            rowMapper.sendInvalidations(invalidations);
363
364            // queue to other mappers' caches
365            invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue);
366        }
367    }
368
369    @Override
370    public void clearCache() {
371        ehCacheRemoveAll();
372        localInvalidations.clear();
373        rowMapper.clearCache();
374    }
375
376    @Override
377    public void rollback(Xid xid) throws XAException {
378        try {
379            rowMapper.rollback(xid);
380        } finally {
381            ehCacheRemoveAll();
382            localInvalidations.clear();
383        }
384    }
385
386    /*
387     * ----- Batch -----
388     */
389
390    /*
391     * Use those from the cache if available, read from the mapper for the rest.
392     */
393    @Override
394    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
395        List<RowId> res = new ArrayList<RowId>(rowIds.size());
396        // find which are in cache, and which not
397        List<RowId> todo = new LinkedList<RowId>();
398        for (RowId rowId : rowIds) {
399            Row row = cacheGet(rowId);
400            if (row == null) {
401                if (cacheOnly) {
402                    res.add(new RowId(rowId));
403                } else {
404                    todo.add(rowId);
405                }
406            } else if (isAbsent(row)) {
407                res.add(new RowId(rowId));
408            } else {
409                res.add(row);
410            }
411        }
412        if (!todo.isEmpty()) {
413            final Context context = sorGetTimer.time();
414            try {
415                // ask missing ones to underlying row mapper
416                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
417                // add them to the cache
418                for (RowId rowId : fetched) {
419                    cachePutAbsentIfRowId(rowId);
420                }
421                // merge results
422                res.addAll(fetched);
423                sorRows.inc(fetched.size());
424            } finally {
425                context.stop();
426            }
427        }
428        return res;
429    }
430
431    /*
432     * Save in the cache then pass all the writes to the mapper.
433     */
434    @Override
435    public void write(RowBatch batch) {
436        // we avoid gathering invalidations for a write-only table: fulltext
437        for (Row row : batch.creates) {
438            cachePut(row);
439            if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) {
440                // we need to send modified invalidations for created
441                // fragments because other session's ABSENT fragments have
442                // to be invalidated
443                localInvalidations.addModified(new RowId(row));
444            }
445        }
446        for (RowUpdate rowu : batch.updates) {
447            cachePut(rowu.row);
448            if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) {
449                localInvalidations.addModified(new RowId(rowu.row));
450            }
451        }
452        for (RowId rowId : batch.deletes) {
453            if (rowId instanceof Row) {
454                throw new AssertionError();
455            }
456            cachePutAbsent(rowId);
457            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
458                localInvalidations.addDeleted(rowId);
459            }
460        }
461        for (RowId rowId : batch.deletesDependent) {
462            if (rowId instanceof Row) {
463                throw new AssertionError();
464            }
465            cachePutAbsent(rowId);
466            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
467                localInvalidations.addDeleted(rowId);
468            }
469        }
470
471        // propagate to underlying mapper
472        rowMapper.write(batch);
473    }
474
475    /*
476     * ----- Read -----
477     */
478
479    @Override
480    public Row readSimpleRow(RowId rowId) {
481        Row row = cacheGet(rowId);
482        if (row == null) {
483            row = rowMapper.readSimpleRow(rowId);
484            cachePutAbsentIfNull(rowId, row);
485            return row;
486        } else if (isAbsent(row)) {
487            return null;
488        } else {
489            return row;
490        }
491    }
492
493    @Override
494    public Map<String, String> getBinaryFulltext(RowId rowId) {
495        return rowMapper.getBinaryFulltext(rowId);
496    }
497
498    @Override
499    public Serializable[] readCollectionRowArray(RowId rowId) {
500        Row row = cacheGet(rowId);
501        if (row == null) {
502            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
503            assert array != null;
504            row = new Row(rowId.tableName, rowId.id, array);
505            cachePut(row);
506            return row.values;
507        } else if (isAbsent(row)) {
508            return null;
509        } else {
510            return row.values;
511        }
512    }
513
514    @Override
515    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
516            Serializable criterion, boolean limitToOne) {
517        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
518        for (Row row : rows) {
519            cachePut(row);
520        }
521        return rows;
522    }
523
524    /*
525     * ----- Copy -----
526     */
527
528    @Override
529    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) {
530        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow);
531        Invalidations invalidations = result.invalidations;
532        if (invalidations.modified != null) {
533            for (RowId rowId : invalidations.modified) {
534                cacheRemove(rowId);
535                localInvalidations.addModified(new RowId(rowId));
536            }
537        }
538        if (invalidations.deleted != null) {
539            for (RowId rowId : invalidations.deleted) {
540                cacheRemove(rowId);
541                localInvalidations.addDeleted(rowId);
542            }
543        }
544        return result;
545    }
546
547    @Override
548    public List<NodeInfo> remove(NodeInfo rootInfo) {
549        List<NodeInfo> infos = rowMapper.remove(rootInfo);
550        for (NodeInfo info : infos) {
551            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) {
552                RowId rowId = new RowId(fragmentName, info.id);
553                cacheRemove(rowId);
554                localInvalidations.addDeleted(rowId);
555            }
556        }
557        // we only put as absent the root fragment, to avoid polluting the cache
558        // with lots of absent info. the rest is removed entirely
559        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootInfo.id));
560        return infos;
561    }
562
563    @Override
564    public long getCacheSize() {
565        // The unified cache is reported by the cache-size gauge
566        return 0;
567    }
568
569}