001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.SortedMap;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import javax.management.MBeanServer;
035import javax.transaction.SystemException;
036import javax.transaction.Transaction;
037import javax.transaction.TransactionManager;
038import javax.transaction.xa.XAException;
039import javax.transaction.xa.Xid;
040
041import net.sf.ehcache.Cache;
042import net.sf.ehcache.CacheManager;
043import net.sf.ehcache.Element;
044import net.sf.ehcache.management.ManagementService;
045import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
046
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
050import org.nuxeo.runtime.api.Framework;
051import org.nuxeo.runtime.management.ServerLocator;
052import org.nuxeo.runtime.metrics.MetricsService;
053
054import com.codahale.metrics.Counter;
055import com.codahale.metrics.Gauge;
056import com.codahale.metrics.MetricRegistry;
057import com.codahale.metrics.SharedMetricRegistries;
058import com.codahale.metrics.Timer;
059import com.codahale.metrics.Timer.Context;
060
061/**
062 * A {@link RowMapper} that use an unified ehcache.
063 * <p>
064 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
065 */
066public class UnifiedCachingRowMapper implements RowMapper {
067
068    private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class);
069
070    private static final String ABSENT = "__ABSENT__\0\0\0";
071
072    private static CacheManager cacheManager = null;
073
074    protected static boolean isXA;
075
076    private Cache cache;
077
078    private Model model;
079
080    /**
081     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
082     */
083    private RowMapper rowMapper;
084
085    /**
086     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
087     * post-commit time.
088     */
089    private final Invalidations localInvalidations;
090
091    /**
092     * The queue of invalidations received from other session or from the cluster invalidator, to process at
093     * pre-transaction time.
094     */
095    private final InvalidationsQueue invalidationsQueue;
096
097    /**
098     * The propagator of invalidations to other mappers.
099     */
100    private InvalidationsPropagator invalidationsPropagator;
101
102    private static final String CACHE_NAME = "unifiedVCSCache";
103
104    private static final String EHCACHE_FILE_PROP = "ehcacheFilePath";
105
106    private static AtomicInteger rowMapperCount = new AtomicInteger();
107
108    /**
109     * Cache statistics
110     *
111     * @since 5.7
112     */
113    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
114
115    protected Counter cacheHitCount;
116
117    protected Timer cacheGetTimer;
118
119    // sor means system of record (database access)
120    protected Counter sorRows;
121
122    protected Timer sorGetTimer;
123
124    public UnifiedCachingRowMapper() {
125        localInvalidations = new Invalidations();
126        invalidationsQueue = new InvalidationsQueue("mapper-" + this);
127    }
128
129    synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper,
130            InvalidationsPropagator invalidationsPropagator, Map<String, String> properties) {
131        this.model = model;
132        this.rowMapper = rowMapper;
133        this.invalidationsPropagator = invalidationsPropagator;
134        invalidationsPropagator.addQueue(invalidationsQueue);
135        if (cacheManager == null) {
136            if (properties.containsKey(EHCACHE_FILE_PROP)) {
137                String value = properties.get(EHCACHE_FILE_PROP);
138                log.info("Creating ehcache manager for VCS, using ehcache file: " + value);
139                cacheManager = CacheManager.create(value);
140            } else {
141                log.info("Creating ehcache manager for VCS, No ehcache file provided");
142                cacheManager = CacheManager.create();
143            }
144            isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional();
145            // Exposes cache to JMX
146            MBeanServer mBeanServer = Framework.getLocalService(ServerLocator.class).lookupServer();
147            ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true);
148        }
149        rowMapperCount.incrementAndGet();
150        cache = cacheManager.getCache(CACHE_NAME);
151        setMetrics(repositoryName);
152    }
153
154    protected void setMetrics(String repositoryName) {
155        cacheHitCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
156                "unified", "hits"));
157        cacheGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches",
158                "unified", "get"));
159        sorRows = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
160                "sor", "rows"));
161        sorGetTimer = registry.timer(MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
162                "sor", "get"));
163        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryName, "caches", "unified",
164                "cache-size");
165        SortedMap<String, Gauge> gauges = registry.getGauges();
166        if (!gauges.containsKey(gaugeName)) {
167            registry.register(gaugeName, new Gauge<Integer>() {
168                @Override
169                public Integer getValue() {
170                    if (cacheManager != null) {
171                        return cacheManager.getCache(CACHE_NAME).getSize();
172                    }
173                    return 0;
174                }
175            });
176        }
177    }
178
179    public void close() {
180        invalidationsPropagator.removeQueue(invalidationsQueue);
181        rowMapperCount.decrementAndGet();
182    }
183
184    @Override
185    public Serializable generateNewId() {
186        return rowMapper.generateNewId();
187    }
188
189    /*
190     * ----- ehcache -----
191     */
192
193    protected boolean hasTransaction() {
194        TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup();
195        if (transactionManagerLookup == null) {
196            return false;
197        }
198        TransactionManager transactionManager = transactionManagerLookup.getTransactionManager();
199        if (transactionManager == null) {
200            return false;
201        }
202        Transaction transaction;
203        try {
204            transaction = transactionManager.getTransaction();
205        } catch (SystemException e) {
206            throw new RuntimeException(e);
207        }
208        return transaction != null;
209    }
210
211    protected boolean useEhCache() {
212        return !isXA || hasTransaction();
213    }
214
215    protected void ehCachePut(Element element) {
216        if (useEhCache()) {
217            cache.put(element);
218        }
219    }
220
221    protected Element ehCacheGet(Serializable key) {
222        if (useEhCache()) {
223            return cache.get(key);
224        }
225        return null;
226    }
227
228    protected int ehCacheGetSize() {
229        if (useEhCache()) {
230            return cache.getSize();
231        }
232        return 0;
233    }
234
235    protected boolean ehCacheRemove(Serializable key) {
236        if (useEhCache()) {
237            return cache.remove(key);
238        }
239        return false;
240    }
241
242    protected void ehCacheRemoveAll() {
243        if (useEhCache()) {
244            cache.removeAll();
245        }
246    }
247
248    /*
249     * ----- Cache -----
250     */
251
252    protected static boolean isAbsent(Row row) {
253        return row.tableName == ABSENT; // == is ok
254    }
255
256    protected void cachePut(Row row) {
257        row = row.clone();
258        // for ACL collections, make sure the order is correct
259        // (without the cache, the query to get a list of collection does an
260        // ORDER BY pos, so users of the cache must get the same behavior)
261        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
262            row.values = sortACLRows((ACLRow[]) row.values);
263        }
264        Element element = new Element(new RowId(row), row);
265        ehCachePut(element);
266    }
267
268    protected ACLRow[] sortACLRows(ACLRow[] acls) {
269        List<ACLRow> list = new ArrayList<ACLRow>(Arrays.asList(acls));
270        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
271        ACLRow[] res = new ACLRow[acls.length];
272        return list.toArray(res);
273    }
274
275    protected void cachePutAbsent(RowId rowId) {
276        Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null));
277        ehCachePut(element);
278    }
279
280    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
281        if (row != null) {
282            cachePut(row);
283        } else {
284            cachePutAbsent(rowId);
285        }
286    }
287
288    protected void cachePutAbsentIfRowId(RowId rowId) {
289        if (rowId instanceof Row) {
290            cachePut((Row) rowId);
291        } else {
292            cachePutAbsent(rowId);
293        }
294    }
295
296    protected Row cacheGet(RowId rowId) {
297        final Context context = cacheGetTimer.time();
298        try {
299            Element element = ehCacheGet(rowId);
300            Row row = null;
301            if (element != null) {
302                row = (Row) element.getObjectValue();
303            }
304            if (row != null && !isAbsent(row)) {
305                row = row.clone();
306            }
307            if (row != null) {
308                cacheHitCount.inc();
309            }
310            return row;
311        } finally {
312            context.stop();
313        }
314    }
315
316    protected void cacheRemove(RowId rowId) {
317        ehCacheRemove(rowId);
318    }
319
320    /*
321     * ----- Invalidations / Cache Management -----
322     */
323
324    @Override
325    public Invalidations receiveInvalidations() {
326        // invalidations from the underlying mapper (cluster)
327        // already propagated to our invalidations queue
328        Invalidations remoteInvals = rowMapper.receiveInvalidations();
329
330        Invalidations ret = invalidationsQueue.getInvalidations();
331
332        if (remoteInvals != null) {
333            if (!ret.all) {
334                // only handle remote invalidations, the cache is shared and transactional
335                if (remoteInvals.modified != null) {
336                    for (RowId rowId : remoteInvals.modified) {
337                        cacheRemove(rowId);
338                    }
339                }
340                if (remoteInvals.deleted != null) {
341                    for (RowId rowId : remoteInvals.deleted) {
342                        cachePutAbsent(rowId);
343                    }
344                }
345            }
346        }
347
348        // invalidate our cache
349        if (ret.all) {
350            clearCache();
351        }
352
353        return ret.isEmpty() ? null : ret;
354    }
355
356    // propagate invalidations
357    @Override
358    public void sendInvalidations(Invalidations invalidations) {
359        // add local invalidations
360        if (!localInvalidations.isEmpty()) {
361            if (invalidations == null) {
362                invalidations = new Invalidations();
363            }
364            invalidations.add(localInvalidations);
365            localInvalidations.clear();
366        }
367
368        if (invalidations != null && !invalidations.isEmpty()) {
369            // send to underlying mapper
370            rowMapper.sendInvalidations(invalidations);
371
372            // queue to other mappers' caches
373            invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue);
374        }
375    }
376
377    @Override
378    public void clearCache() {
379        ehCacheRemoveAll();
380        localInvalidations.clear();
381        rowMapper.clearCache();
382    }
383
384    @Override
385    public void rollback(Xid xid) throws XAException {
386        try {
387            rowMapper.rollback(xid);
388        } finally {
389            ehCacheRemoveAll();
390            localInvalidations.clear();
391        }
392    }
393
394    /*
395     * ----- Batch -----
396     */
397
398    /*
399     * Use those from the cache if available, read from the mapper for the rest.
400     */
401    @Override
402    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
403        List<RowId> res = new ArrayList<RowId>(rowIds.size());
404        // find which are in cache, and which not
405        List<RowId> todo = new LinkedList<RowId>();
406        for (RowId rowId : rowIds) {
407            Row row = cacheGet(rowId);
408            if (row == null) {
409                if (cacheOnly) {
410                    res.add(new RowId(rowId));
411                } else {
412                    todo.add(rowId);
413                }
414            } else if (isAbsent(row)) {
415                res.add(new RowId(rowId));
416            } else {
417                res.add(row);
418            }
419        }
420        if (!todo.isEmpty()) {
421            final Context context = sorGetTimer.time();
422            try {
423                // ask missing ones to underlying row mapper
424                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
425                // add them to the cache
426                for (RowId rowId : fetched) {
427                    cachePutAbsentIfRowId(rowId);
428                }
429                // merge results
430                res.addAll(fetched);
431                sorRows.inc(fetched.size());
432            } finally {
433                context.stop();
434            }
435        }
436        return res;
437    }
438
439    /*
440     * Save in the cache then pass all the writes to the mapper.
441     */
442    @Override
443    public void write(RowBatch batch) {
444        // we avoid gathering invalidations for a write-only table: fulltext
445        for (Row row : batch.creates) {
446            cachePut(row);
447            if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) {
448                // we need to send modified invalidations for created
449                // fragments because other session's ABSENT fragments have
450                // to be invalidated
451                localInvalidations.addModified(new RowId(row));
452            }
453        }
454        for (RowUpdate rowu : batch.updates) {
455            cachePut(rowu.row);
456            if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) {
457                localInvalidations.addModified(new RowId(rowu.row));
458            }
459        }
460        for (RowId rowId : batch.deletes) {
461            if (rowId instanceof Row) {
462                throw new AssertionError();
463            }
464            cachePutAbsent(rowId);
465            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
466                localInvalidations.addDeleted(rowId);
467            }
468        }
469        for (RowId rowId : batch.deletesDependent) {
470            if (rowId instanceof Row) {
471                throw new AssertionError();
472            }
473            cachePutAbsent(rowId);
474            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
475                localInvalidations.addDeleted(rowId);
476            }
477        }
478
479        // propagate to underlying mapper
480        rowMapper.write(batch);
481    }
482
483    /*
484     * ----- Read -----
485     */
486
487    @Override
488    public Row readSimpleRow(RowId rowId) {
489        Row row = cacheGet(rowId);
490        if (row == null) {
491            row = rowMapper.readSimpleRow(rowId);
492            cachePutAbsentIfNull(rowId, row);
493            return row;
494        } else if (isAbsent(row)) {
495            return null;
496        } else {
497            return row;
498        }
499    }
500
501    @Override
502    public Map<String, String> getBinaryFulltext(RowId rowId) {
503        return rowMapper.getBinaryFulltext(rowId);
504    }
505
506    @Override
507    public Serializable[] readCollectionRowArray(RowId rowId) {
508        Row row = cacheGet(rowId);
509        if (row == null) {
510            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
511            assert array != null;
512            row = new Row(rowId.tableName, rowId.id, array);
513            cachePut(row);
514            return row.values;
515        } else if (isAbsent(row)) {
516            return null;
517        } else {
518            return row.values;
519        }
520    }
521
522    @Override
523    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
524            Serializable criterion, boolean limitToOne) {
525        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
526        for (Row row : rows) {
527            cachePut(row);
528        }
529        return rows;
530    }
531
532    @Override
533    public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) {
534        return rowMapper.readSelectionsIds(selType, values);
535    }
536
537    /*
538     * ----- Copy -----
539     */
540
541    @Override
542    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow) {
543        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow);
544        Invalidations invalidations = result.invalidations;
545        if (invalidations.modified != null) {
546            for (RowId rowId : invalidations.modified) {
547                cacheRemove(rowId);
548                localInvalidations.addModified(new RowId(rowId));
549            }
550        }
551        if (invalidations.deleted != null) {
552            for (RowId rowId : invalidations.deleted) {
553                cacheRemove(rowId);
554                localInvalidations.addDeleted(rowId);
555            }
556        }
557        return result;
558    }
559
560    @Override
561    public List<NodeInfo> getDescendantsInfo(Serializable rootId) {
562        return rowMapper.getDescendantsInfo(rootId);
563    }
564
565    @Override
566    public void remove(Serializable rootId, List<NodeInfo> nodeInfos) {
567        rowMapper.remove(rootId, nodeInfos);
568        for (NodeInfo info : nodeInfos) {
569            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info.id, info.primaryType, null))) {
570                RowId rowId = new RowId(fragmentName, info.id);
571                cacheRemove(rowId);
572                localInvalidations.addDeleted(rowId);
573            }
574        }
575        // we only put as absent the root fragment, to avoid polluting the cache
576        // with lots of absent info. the rest is removed entirely
577        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId));
578    }
579
580    @Override
581    public long getCacheSize() {
582        // The unified cache is reported by the cache-size gauge
583        return 0;
584    }
585
586}