001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.SortedMap;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import javax.management.MBeanServer;
035import javax.transaction.SystemException;
036import javax.transaction.Transaction;
037import javax.transaction.TransactionManager;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.storage.sql.ACLRow.ACLRowPositionComparator;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.management.ServerLocator;
044import org.nuxeo.runtime.metrics.MetricsService;
045
046import io.dropwizard.metrics5.Counter;
047import io.dropwizard.metrics5.Gauge;
048import io.dropwizard.metrics5.MetricName;
049import io.dropwizard.metrics5.MetricRegistry;
050import io.dropwizard.metrics5.SharedMetricRegistries;
051import io.dropwizard.metrics5.Timer;
052import io.dropwizard.metrics5.Timer.Context;
053
054import net.sf.ehcache.Cache;
055import net.sf.ehcache.CacheManager;
056import net.sf.ehcache.Element;
057import net.sf.ehcache.management.ManagementService;
058import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
059
060/**
061 * A {@link RowMapper} that use an unified ehcache.
062 * <p>
063 * The cache only holds {@link Row}s that are known to be identical to what's in the underlying {@link RowMapper}.
064 */
065public class UnifiedCachingRowMapper implements RowMapper {
066
067    private static final Log log = LogFactory.getLog(UnifiedCachingRowMapper.class);
068
069    private static final String ABSENT = "__ABSENT__\0\0\0";
070
071    private static CacheManager cacheManager = null;
072
073    protected static boolean isXA;
074
075    private Cache cache;
076
077    private Model model;
078
079    /**
080     * The {@link RowMapper} to which operations that cannot be processed from the cache are delegated.
081     */
082    private RowMapper rowMapper;
083
084    /**
085     * The local invalidations due to writes through this mapper that should be propagated to other sessions at
086     * post-commit time.
087     */
088    private final VCSInvalidations localInvalidations;
089
090    /**
091     * The queue of invalidations received from other session or from the cluster invalidator, to process at
092     * pre-transaction time.
093     */
094    private final VCSInvalidationsQueue invalidationsQueue;
095
096    /**
097     * The propagator of invalidations to other mappers.
098     */
099    private VCSInvalidationsPropagator invalidationsPropagator;
100
101    private static final String CACHE_NAME = "unifiedVCSCache";
102
103    private static final String EHCACHE_FILE_PROP = "ehcacheFilePath";
104
105    private static AtomicInteger rowMapperCount = new AtomicInteger();
106
107    /**
108     * Cache statistics
109     *
110     * @since 5.7
111     */
112    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
113
114    protected Counter cacheHitCount;
115
116    protected Timer cacheGetTimer;
117
118    // sor means system of record (database access)
119    protected Counter sorRows;
120
121    protected Timer sorGetTimer;
122
123    public UnifiedCachingRowMapper() {
124        localInvalidations = new VCSInvalidations();
125        invalidationsQueue = new VCSInvalidationsQueue("mapper-" + this);
126    }
127
128    synchronized public void initialize(String repositoryName, Model model, RowMapper rowMapper,
129            VCSInvalidationsPropagator invalidationsPropagator, Map<String, String> properties) {
130        this.model = model;
131        this.rowMapper = rowMapper;
132        this.invalidationsPropagator = invalidationsPropagator;
133        invalidationsPropagator.addQueue(invalidationsQueue);
134        if (cacheManager == null) {
135            if (properties.containsKey(EHCACHE_FILE_PROP)) {
136                String value = properties.get(EHCACHE_FILE_PROP);
137                log.info("Creating ehcache manager for VCS, using ehcache file: " + value);
138                cacheManager = CacheManager.create(value);
139            } else {
140                log.info("Creating ehcache manager for VCS, No ehcache file provided");
141                cacheManager = CacheManager.create();
142            }
143            isXA = cacheManager.getConfiguration().getCacheConfigurations().get(CACHE_NAME).isXaTransactional();
144            // Exposes cache to JMX
145            MBeanServer mBeanServer = Framework.getService(ServerLocator.class).lookupServer();
146            ManagementService.registerMBeans(cacheManager, mBeanServer, true, true, true, true);
147        }
148        rowMapperCount.incrementAndGet();
149        cache = cacheManager.getCache(CACHE_NAME);
150        setMetrics(repositoryName);
151    }
152
153    protected void setMetrics(String repositoryName) {
154        cacheHitCount = registry.counter(
155                MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "hit")
156                          .tagged("repository", repositoryName));
157        cacheGetTimer = registry.timer(
158                MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "timer")
159                          .tagged("repository", repositoryName));
160        sorRows = registry.counter(
161                MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "sor", "rows")
162                          .tagged("repository", repositoryName));
163        sorGetTimer = registry.timer(
164                MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "sor", "timer")
165                          .tagged("repository", repositoryName));
166        MetricName gaugeName = MetricName.build("nuxeo", "repositories", "repository", "cache", "unified", "size");
167        @SuppressWarnings("rawtypes")
168        SortedMap<MetricName, Gauge> gauges = registry.getGauges();
169        if (!gauges.containsKey(gaugeName)) {
170            registry.register(gaugeName, new Gauge<Integer>() {
171                @Override
172                public Integer getValue() {
173                    if (cacheManager != null) {
174                        return cacheManager.getCache(CACHE_NAME).getSize();
175                    }
176                    return 0;
177                }
178            });
179        }
180    }
181
182    public void close() {
183        invalidationsPropagator.removeQueue(invalidationsQueue);
184        rowMapperCount.decrementAndGet();
185    }
186
187    @Override
188    public Serializable generateNewId() {
189        return rowMapper.generateNewId();
190    }
191
192    /*
193     * ----- ehcache -----
194     */
195
196    protected boolean hasTransaction() {
197        TransactionManagerLookup transactionManagerLookup = cache.getTransactionManagerLookup();
198        if (transactionManagerLookup == null) {
199            return false;
200        }
201        TransactionManager transactionManager = transactionManagerLookup.getTransactionManager();
202        if (transactionManager == null) {
203            return false;
204        }
205        Transaction transaction;
206        try {
207            transaction = transactionManager.getTransaction();
208        } catch (SystemException e) {
209            throw new RuntimeException(e);
210        }
211        return transaction != null;
212    }
213
214    protected boolean useEhCache() {
215        return !isXA || hasTransaction();
216    }
217
218    protected void ehCachePut(Element element) {
219        if (useEhCache()) {
220            cache.put(element);
221        }
222    }
223
224    protected Element ehCacheGet(Serializable key) {
225        if (useEhCache()) {
226            return cache.get(key);
227        }
228        return null;
229    }
230
231    protected int ehCacheGetSize() {
232        if (useEhCache()) {
233            return cache.getSize();
234        }
235        return 0;
236    }
237
238    protected boolean ehCacheRemove(Serializable key) {
239        if (useEhCache()) {
240            return cache.remove(key);
241        }
242        return false;
243    }
244
245    protected void ehCacheRemoveAll() {
246        if (useEhCache()) {
247            cache.removeAll();
248        }
249    }
250
251    /*
252     * ----- Cache -----
253     */
254
255    protected static boolean isAbsent(Row row) {
256        return row.tableName == ABSENT; // == is ok
257    }
258
259    protected void cachePut(Row row) {
260        row = row.clone();
261        // for ACL collections, make sure the order is correct
262        // (without the cache, the query to get a list of collection does an
263        // ORDER BY pos, so users of the cache must get the same behavior)
264        if (row.isCollection() && row.values.length > 0 && row.values[0] instanceof ACLRow) {
265            row.values = sortACLRows((ACLRow[]) row.values);
266        }
267        Element element = new Element(new RowId(row), row);
268        ehCachePut(element);
269    }
270
271    protected ACLRow[] sortACLRows(ACLRow[] acls) {
272        List<ACLRow> list = new ArrayList<>(Arrays.asList(acls));
273        Collections.sort(list, ACLRowPositionComparator.INSTANCE);
274        ACLRow[] res = new ACLRow[acls.length];
275        return list.toArray(res);
276    }
277
278    protected void cachePutAbsent(RowId rowId) {
279        Element element = new Element(new RowId(rowId), new Row(ABSENT, (Serializable) null));
280        ehCachePut(element);
281    }
282
283    protected void cachePutAbsentIfNull(RowId rowId, Row row) {
284        if (row != null) {
285            cachePut(row);
286        } else {
287            cachePutAbsent(rowId);
288        }
289    }
290
291    protected void cachePutAbsentIfRowId(RowId rowId) {
292        if (rowId instanceof Row) {
293            cachePut((Row) rowId);
294        } else {
295            cachePutAbsent(rowId);
296        }
297    }
298
299    @SuppressWarnings("resource") // Time.Context closed by stop()
300    protected Row cacheGet(RowId rowId) {
301        final Context context = cacheGetTimer.time();
302        try {
303            Element element = ehCacheGet(rowId);
304            Row row = null;
305            if (element != null) {
306                row = (Row) element.getObjectValue();
307            }
308            if (row != null && !isAbsent(row)) {
309                row = row.clone();
310            }
311            if (row != null) {
312                cacheHitCount.inc();
313            }
314            return row;
315        } finally {
316            context.stop();
317        }
318    }
319
320    protected void cacheRemove(RowId rowId) {
321        ehCacheRemove(rowId);
322    }
323
324    /*
325     * ----- Invalidations / Cache Management -----
326     */
327
328    @Override
329    public VCSInvalidations receiveInvalidations() {
330        // invalidations from the underlying mapper (cluster)
331        // already propagated to our invalidations queue
332        VCSInvalidations remoteInvals = rowMapper.receiveInvalidations();
333
334        VCSInvalidations ret = invalidationsQueue.getInvalidations();
335
336        if (remoteInvals != null) {
337            if (!ret.all) {
338                // only handle remote invalidations, the cache is shared and transactional
339                if (remoteInvals.modified != null) {
340                    for (RowId rowId : remoteInvals.modified) {
341                        cacheRemove(rowId);
342                    }
343                }
344                if (remoteInvals.deleted != null) {
345                    for (RowId rowId : remoteInvals.deleted) {
346                        cachePutAbsent(rowId);
347                    }
348                }
349            }
350        }
351
352        // invalidate our cache
353        if (ret.all) {
354            clearCache();
355        }
356
357        return ret.isEmpty() ? null : ret;
358    }
359
360    // propagate invalidations
361    @Override
362    public void sendInvalidations(VCSInvalidations invalidations) {
363        // add local invalidations
364        if (!localInvalidations.isEmpty()) {
365            if (invalidations == null) {
366                invalidations = new VCSInvalidations();
367            }
368            invalidations.add(localInvalidations);
369            localInvalidations.clear();
370        }
371
372        if (invalidations != null && !invalidations.isEmpty()) {
373            // send to underlying mapper
374            rowMapper.sendInvalidations(invalidations);
375
376            // queue to other mappers' caches
377            invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue);
378        }
379    }
380
381    @Override
382    public void clearCache() {
383        ehCacheRemoveAll();
384        localInvalidations.clear();
385        rowMapper.clearCache();
386    }
387
388    @Override
389    public void rollback() {
390        try {
391            rowMapper.rollback();
392        } finally {
393            ehCacheRemoveAll();
394            localInvalidations.clear();
395        }
396    }
397
398    /*
399     * ----- Batch -----
400     */
401
402    /*
403     * Use those from the cache if available, read from the mapper for the rest.
404     */
405    @Override
406    public List<? extends RowId> read(Collection<RowId> rowIds, boolean cacheOnly) {
407        List<RowId> res = new ArrayList<>(rowIds.size());
408        // find which are in cache, and which not
409        List<RowId> todo = new LinkedList<>();
410        for (RowId rowId : rowIds) {
411            Row row = cacheGet(rowId);
412            if (row == null) {
413                if (cacheOnly) {
414                    res.add(new RowId(rowId));
415                } else {
416                    todo.add(rowId);
417                }
418            } else if (isAbsent(row)) {
419                res.add(new RowId(rowId));
420            } else {
421                res.add(row);
422            }
423        }
424        if (!todo.isEmpty()) {
425            @SuppressWarnings("resource")
426            final Context context = sorGetTimer.time();
427            try {
428                // ask missing ones to underlying row mapper
429                List<? extends RowId> fetched = rowMapper.read(todo, cacheOnly);
430                // add them to the cache
431                for (RowId rowId : fetched) {
432                    cachePutAbsentIfRowId(rowId);
433                }
434                // merge results
435                res.addAll(fetched);
436                sorRows.inc(fetched.size());
437            } finally {
438                context.stop();
439            }
440        }
441        return res;
442    }
443
444    /*
445     * Save in the cache then pass all the writes to the mapper.
446     */
447    @Override
448    public void write(RowBatch batch) {
449        // we avoid gathering invalidations for a write-only table: fulltext
450        for (Row row : batch.creates) {
451            cachePut(row);
452            if (!Model.FULLTEXT_TABLE_NAME.equals(row.tableName)) {
453                // we need to send modified invalidations for created
454                // fragments because other session's ABSENT fragments have
455                // to be invalidated
456                localInvalidations.addModified(new RowId(row));
457            }
458        }
459        for (RowUpdate rowu : batch.updates) {
460            cachePut(rowu.row);
461            if (!Model.FULLTEXT_TABLE_NAME.equals(rowu.row.tableName)) {
462                localInvalidations.addModified(new RowId(rowu.row));
463            }
464        }
465        for (RowId rowId : batch.deletes) {
466            if (rowId instanceof Row) {
467                throw new AssertionError();
468            }
469            cachePutAbsent(rowId);
470            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
471                localInvalidations.addDeleted(rowId);
472            }
473        }
474        for (RowId rowId : batch.deletesDependent) {
475            if (rowId instanceof Row) {
476                throw new AssertionError();
477            }
478            cachePutAbsent(rowId);
479            if (!Model.FULLTEXT_TABLE_NAME.equals(rowId.tableName)) {
480                localInvalidations.addDeleted(rowId);
481            }
482        }
483
484        // propagate to underlying mapper
485        rowMapper.write(batch);
486    }
487
488    /*
489     * ----- Read -----
490     */
491
492    @Override
493    public Row readSimpleRow(RowId rowId) {
494        Row row = cacheGet(rowId);
495        if (row == null) {
496            row = rowMapper.readSimpleRow(rowId);
497            cachePutAbsentIfNull(rowId, row);
498            return row;
499        } else if (isAbsent(row)) {
500            return null;
501        } else {
502            return row;
503        }
504    }
505
506    @Override
507    public Map<String, String> getBinaryFulltext(RowId rowId) {
508        return rowMapper.getBinaryFulltext(rowId);
509    }
510
511    @Override
512    public Serializable[] readCollectionRowArray(RowId rowId) {
513        Row row = cacheGet(rowId);
514        if (row == null) {
515            Serializable[] array = rowMapper.readCollectionRowArray(rowId);
516            assert array != null;
517            row = new Row(rowId.tableName, rowId.id, array);
518            cachePut(row);
519            return row.values;
520        } else if (isAbsent(row)) {
521            return null;
522        } else {
523            return row.values;
524        }
525    }
526
527    @Override
528    public List<Row> readSelectionRows(SelectionType selType, Serializable selId, Serializable filter,
529            Serializable criterion, boolean limitToOne) {
530        List<Row> rows = rowMapper.readSelectionRows(selType, selId, filter, criterion, limitToOne);
531        for (Row row : rows) {
532            cachePut(row);
533        }
534        return rows;
535    }
536
537    @Override
538    public Set<Serializable> readSelectionsIds(SelectionType selType, List<Serializable> values) {
539        return rowMapper.readSelectionsIds(selType, values);
540    }
541
542    /*
543     * ----- Copy -----
544     */
545
546    @Override
547    public CopyResult copy(IdWithTypes source, Serializable destParentId, String destName, Row overwriteRow,
548            boolean excludeSpecialChildren, boolean excludeACL) {
549        CopyResult result = rowMapper.copy(source, destParentId, destName, overwriteRow, excludeSpecialChildren,
550                excludeACL);
551        VCSInvalidations invalidations = result.invalidations;
552        if (invalidations.modified != null) {
553            for (RowId rowId : invalidations.modified) {
554                cacheRemove(rowId);
555                localInvalidations.addModified(new RowId(rowId));
556            }
557        }
558        if (invalidations.deleted != null) {
559            for (RowId rowId : invalidations.deleted) {
560                cacheRemove(rowId);
561                localInvalidations.addDeleted(rowId);
562            }
563        }
564        return result;
565    }
566
567    @Override
568    public List<NodeInfo> getDescendantsInfo(Serializable rootId) {
569        return rowMapper.getDescendantsInfo(rootId);
570    }
571
572    @Override
573    public void remove(Serializable rootId, List<NodeInfo> nodeInfos) {
574        rowMapper.remove(rootId, nodeInfos);
575        for (NodeInfo info : nodeInfos) {
576            for (String fragmentName : model.getTypeFragments(new IdWithTypes(info))) {
577                RowId rowId = new RowId(fragmentName, info.id);
578                cacheRemove(rowId);
579                localInvalidations.addDeleted(rowId);
580            }
581        }
582        // we only put as absent the root fragment, to avoid polluting the cache
583        // with lots of absent info. the rest is removed entirely
584        cachePutAbsent(new RowId(Model.HIER_TABLE_NAME, rootId));
585    }
586
587    @Override
588    public long getCacheSize() {
589        // The unified cache is reported by the cache-size gauge
590        return 0;
591    }
592
593}