001/*
002 * (C) Copyright 2006-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.security.SecureRandom;
024import java.util.Calendar;
025import java.util.Collection;
026import java.util.Random;
027import java.util.concurrent.CopyOnWriteArrayList;
028
029import javax.naming.Reference;
030import javax.resource.ResourceException;
031import javax.resource.cci.ConnectionSpec;
032import javax.resource.cci.RecordFactory;
033import javax.resource.cci.ResourceAdapterMetaData;
034
035import org.apache.commons.lang3.StringUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.ecm.core.api.repository.FulltextConfiguration;
040import org.nuxeo.ecm.core.model.LockManager;
041import org.nuxeo.ecm.core.storage.lock.LockManagerService;
042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend;
044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.metrics.MetricsService;
047
048import com.codahale.metrics.Counter;
049import com.codahale.metrics.Gauge;
050import com.codahale.metrics.MetricRegistry;
051import com.codahale.metrics.SharedMetricRegistries;
052
053/**
054 * {@link Repository} implementation, to be extended by backend-specific initialization code.
055 *
056 * @see RepositoryBackend
057 */
058public class RepositoryImpl implements Repository {
059
060    private static final long serialVersionUID = 1L;
061
062    private static final Log log = LogFactory.getLog(RepositoryImpl.class);
063
064    private static final Random RANDOM = new SecureRandom();
065
066    protected final RepositoryDescriptor repositoryDescriptor;
067
068    private RepositoryBackend backend;
069
070    private final Collection<SessionImpl> sessions;
071
072    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
073
074    protected final Counter repositoryUp;
075
076    protected final Counter sessionCount;
077
078    private LockManager lockManager;
079
080    /**
081     * @since 7.4 : used to know if the LockManager was provided by this repository or externally
082     */
083    protected boolean selfRegisteredLockManager = false;
084
085    /** Propagator of invalidations to all mappers' caches. */
086    protected final InvalidationsPropagator invalidationsPropagator;
087
088    private Model model;
089
090    /**
091     * Transient id for this repository assigned by the server on first connection. This is not persisted.
092     */
093    public String repositoryId;
094
095    public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) {
096        this.repositoryDescriptor = repositoryDescriptor;
097        sessions = new CopyOnWriteArrayList<>();
098        invalidationsPropagator = new InvalidationsPropagator();
099
100        repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
101                "instance-up"));
102        repositoryUp.inc();
103        sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
104                "sessions"));
105        createMetricsGauges();
106
107        initRepository();
108    }
109
110    protected void createMetricsGauges() {
111        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size");
112        registry.remove(gaugeName);
113        registry.register(gaugeName, new Gauge<Long>() {
114            @Override
115            public Long getValue() {
116                return getCacheSize();
117            }
118        });
119        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines");
120        registry.remove(gaugeName);
121        registry.register(gaugeName, new Gauge<Long>() {
122            @Override
123            public Long getValue() {
124                return getCachePristineSize();
125            }
126        });
127        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections");
128        registry.remove(gaugeName);
129        registry.register(gaugeName, new Gauge<Long>() {
130            @Override
131            public Long getValue() {
132                return getCacheSelectionSize();
133            }
134        });
135        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers");
136        registry.remove(gaugeName);
137        registry.register(gaugeName, new Gauge<Long>() {
138            @Override
139            public Long getValue() {
140                return getCacheMapperSize();
141            }
142        });
143    }
144
145    protected RepositoryBackend createBackend() {
146        Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass;
147        if (backendClass == null) {
148            backendClass = JDBCBackend.class;
149        }
150        try {
151            RepositoryBackend backend = backendClass.newInstance();
152            return backend;
153        } catch (ReflectiveOperationException e) {
154            throw new NuxeoException(e);
155        }
156    }
157
158    protected Mapper createCachingMapper(Model model, Mapper mapper) {
159        try {
160            Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
161            if (cachingMapperClass == null) {
162                return mapper;
163            }
164            CachingMapper cachingMapper = cachingMapperClass.newInstance();
165            cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator,
166                    repositoryDescriptor.cachingMapperProperties);
167            return cachingMapper;
168        } catch (ReflectiveOperationException e) {
169            throw new NuxeoException(e);
170        }
171    }
172
173    protected Class<? extends CachingMapper> getCachingMapperClass() {
174        if (!repositoryDescriptor.getCachingMapperEnabled()) {
175            return null;
176        }
177        Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass;
178        if (cachingMapperClass == null) {
179            // default cache
180            cachingMapperClass = SoftRefCachingMapper.class;
181        }
182        return cachingMapperClass;
183    }
184
185    public RepositoryDescriptor getRepositoryDescriptor() {
186        return repositoryDescriptor;
187    }
188
189    public LockManager getLockManager() {
190        return lockManager;
191    }
192
193    public Model getModel() {
194        return model;
195    }
196
197    public InvalidationsPropagator getInvalidationsPropagator() {
198        return invalidationsPropagator;
199    }
200
201    public boolean isChangeTokenEnabled() {
202        return repositoryDescriptor.isChangeTokenEnabled();
203    }
204
205    /*
206     * ----- javax.resource.cci.ConnectionFactory -----
207     */
208
209    /**
210     * Gets a new connection.
211     *
212     * @param connectionSpec the parameters to use to connect (unused)
213     * @return the session
214     */
215    @Override
216    public SessionImpl getConnection(ConnectionSpec connectionSpec) {
217        return getConnection();
218    }
219
220    /**
221     * Gets a new connection.
222     *
223     * @return the session
224     */
225    @Override
226    public synchronized SessionImpl getConnection() {
227        if (Framework.getRuntime().isShuttingDown()) {
228            throw new IllegalStateException("Cannot open connection, runtime is shutting down");
229        }
230        SessionPathResolver pathResolver = new SessionPathResolver();
231        Mapper mapper = newMapper(pathResolver, true);
232        SessionImpl session = newSession(model, mapper);
233        pathResolver.setSession(session);
234        sessions.add(session);
235        sessionCount.inc();
236        return session;
237    }
238
239    /**
240     * Creates a new mapper.
241     *
242     * @param pathResolver the path resolver (for regular mappers)
243     * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager /
244     *            cluster invalidator)
245     * @return the new mapper.
246     * @since 7.4
247     */
248    public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) {
249        return backend.newMapper(pathResolver, useInvalidations);
250    }
251
252    protected void initRepository() {
253        log.debug("Initializing");
254        backend = createBackend();
255        model = backend.initialize(this);
256        initLockManager();
257
258        // create the cluster invalidator
259        if (repositoryDescriptor.getClusteringEnabled()) {
260            initClusterInvalidator();
261        }
262
263        // log once which mapper cache is being used
264        Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
265        if (cachingMapperClass == null) {
266            log.warn("VCS Mapper cache is disabled.");
267        } else {
268            log.info("VCS Mapper cache using: " + cachingMapperClass.getName());
269        }
270
271        initRootNode();
272    }
273
274    protected void initRootNode() {
275        try {
276            // access a session once so that SessionImpl.computeRootNode can create the root node
277            getConnection().close();
278        } catch (ResourceException e) {
279            throw new RuntimeException(e);
280        }
281    }
282
283    protected String getLockManagerName() {
284        // TODO configure in repo descriptor
285        return getName();
286    }
287
288    protected void initLockManager() {
289        String lockManagerName = getLockManagerName();
290        LockManagerService lockManagerService = Framework.getService(LockManagerService.class);
291        lockManager = lockManagerService.getLockManager(lockManagerName);
292        if (lockManager == null) {
293            // no descriptor
294            // default to a VCSLockManager
295            lockManager = new VCSLockManager(this);
296            lockManagerService.registerLockManager(lockManagerName, lockManager);
297            selfRegisteredLockManager = true;
298        } else {
299            selfRegisteredLockManager = false;
300        }
301        log.info("Repository " + getName() + " using lock manager " + lockManager);
302    }
303
304    protected void initClusterInvalidator() {
305        String nodeId = repositoryDescriptor.getClusterNodeId();
306        if (StringUtils.isBlank(nodeId)) {
307            // need a smallish int because of SQL Server legacy node ids
308            nodeId = String.valueOf(RANDOM.nextInt(32768));
309            log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). "
310                    + "Using random cluster node id instead: " + nodeId);
311        } else {
312            nodeId = nodeId.trim();
313        }
314        ClusterInvalidator clusterInvalidator = createClusterInvalidator();
315        clusterInvalidator.initialize(nodeId, this);
316        backend.setClusterInvalidator(clusterInvalidator);
317    }
318
319    protected ClusterInvalidator createClusterInvalidator() {
320        Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass;
321        if (klass == null) {
322            klass = JDBCClusterInvalidator.class;
323        }
324        try {
325            return klass.newInstance();
326        } catch (ReflectiveOperationException e) {
327            throw new NuxeoException(e);
328        }
329    }
330
331    protected SessionImpl newSession(Model model, Mapper mapper) {
332        mapper = createCachingMapper(model, mapper);
333        return new SessionImpl(this, model, mapper);
334    }
335
336    public static class SessionPathResolver implements PathResolver {
337
338        private Session session;
339
340        protected void setSession(Session session) {
341            this.session = session;
342        }
343
344        @Override
345        public Serializable getIdForPath(String path) {
346            Node node = session.getNodeByPath(path, null);
347            return node == null ? null : node.getId();
348        }
349    }
350
351    /*
352     * -----
353     */
354
355    @Override
356    public ResourceAdapterMetaData getMetaData() {
357        throw new UnsupportedOperationException();
358    }
359
360    @Override
361    public RecordFactory getRecordFactory() {
362        throw new UnsupportedOperationException();
363    }
364
365    /*
366     * ----- javax.resource.Referenceable -----
367     */
368
369    private Reference reference;
370
371    @Override
372    public void setReference(Reference reference) {
373        this.reference = reference;
374    }
375
376    @Override
377    public Reference getReference() {
378        return reference;
379    }
380
381    /*
382     * ----- Repository -----
383     */
384
385    @Override
386    public synchronized void close() {
387        closeAllSessions();
388        model = null;
389        backend.shutdown();
390
391        registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size"));
392        registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size"));
393        registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size"));
394
395        if (selfRegisteredLockManager) {
396            LockManagerService lms = Framework.getService(LockManagerService.class);
397            if (lms != null) {
398                lms.unregisterLockManager(getLockManagerName());
399            }
400        }
401    }
402
403    protected synchronized void closeAllSessions() {
404        for (SessionImpl session : sessions) {
405            if (!session.isLive()) {
406                continue;
407            }
408            session.closeSession();
409        }
410        sessions.clear();
411        sessionCount.dec(sessionCount.getCount());
412        if (lockManager != null) {
413            lockManager.closeLockManager();
414        }
415    }
416
417    /*
418     * ----- RepositoryManagement -----
419     */
420
421    @Override
422    public String getName() {
423        return repositoryDescriptor.name;
424    }
425
426    @Override
427    public int getActiveSessionsCount() {
428        return sessions.size();
429    }
430
431    @Override
432    public int clearCaches() {
433        int n = 0;
434        for (SessionImpl session : sessions) {
435            n += session.clearCaches();
436        }
437        if (lockManager != null) {
438            lockManager.clearLockManagerCaches();
439        }
440        return n;
441    }
442
443    @Override
444    public long getCacheSize() {
445        long size = 0;
446        for (SessionImpl session : sessions) {
447            size += session.getCacheSize();
448        }
449        return size;
450    }
451
452    public long getCacheMapperSize() {
453        long size = 0;
454        for (SessionImpl session : sessions) {
455            size += session.getCacheMapperSize();
456        }
457        return size;
458    }
459
460    @Override
461    public long getCachePristineSize() {
462        long size = 0;
463        for (SessionImpl session : sessions) {
464            size += session.getCachePristineSize();
465        }
466        return size;
467    }
468
469    @Override
470    public long getCacheSelectionSize() {
471        long size = 0;
472        for (SessionImpl session : sessions) {
473            size += session.getCacheSelectionSize();
474        }
475        return size;
476    }
477
478    @Override
479    public void processClusterInvalidationsNext() {
480        // TODO pass through or something
481    }
482
483    @Override
484    public void markReferencedBinaries() {
485        try {
486            SessionImpl conn = getConnection();
487            try {
488                conn.markReferencedBinaries();
489            } finally {
490                conn.close();
491            }
492        } catch (ResourceException e) {
493            throw new RuntimeException(e);
494        }
495    }
496
497    @Override
498    public int cleanupDeletedDocuments(int max, Calendar beforeTime) {
499        if (!repositoryDescriptor.getSoftDeleteEnabled()) {
500            return 0;
501        }
502        try {
503            SessionImpl conn = getConnection();
504            try {
505                return conn.cleanupDeletedDocuments(max, beforeTime);
506            } finally {
507                conn.close();
508            }
509        } catch (ResourceException e) {
510            throw new RuntimeException(e);
511        }
512    }
513
514    @Override
515    public FulltextConfiguration getFulltextConfiguration() {
516        return model.getFulltextConfiguration();
517    }
518
519    /*
520     * ----- -----
521     */
522
523    // callback by session at close time
524    protected void closeSession(SessionImpl session) {
525        sessions.remove(session);
526        sessionCount.dec();
527    }
528
529}