001/*
002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012
013package org.nuxeo.ecm.core.storage.sql;
014
015import java.io.Serializable;
016import java.util.Calendar;
017import java.util.Collection;
018import java.util.Random;
019import java.util.concurrent.CopyOnWriteArrayList;
020
021import javax.naming.Reference;
022import javax.resource.ResourceException;
023import javax.resource.cci.ConnectionSpec;
024import javax.resource.cci.RecordFactory;
025import javax.resource.cci.ResourceAdapterMetaData;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.ecm.core.api.NuxeoException;
031import org.nuxeo.ecm.core.model.LockManager;
032import org.nuxeo.ecm.core.storage.DefaultFulltextParser;
033import org.nuxeo.ecm.core.storage.FulltextParser;
034import org.nuxeo.ecm.core.storage.lock.LockManagerService;
035import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
036import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend;
037import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.metrics.MetricsService;
040
041import com.codahale.metrics.Counter;
042import com.codahale.metrics.Gauge;
043import com.codahale.metrics.MetricRegistry;
044import com.codahale.metrics.SharedMetricRegistries;
045
046/**
047 * {@link Repository} implementation, to be extended by backend-specific initialization code.
048 *
049 * @see RepositoryBackend
050 */
051public class RepositoryImpl implements Repository {
052
053    private static final long serialVersionUID = 1L;
054
055    private static final Log log = LogFactory.getLog(RepositoryImpl.class);
056
057    private static final Random RANDOM = new Random();
058
059    protected final RepositoryDescriptor repositoryDescriptor;
060
061    protected final Class<? extends FulltextParser> fulltextParserClass;
062
063    private final RepositoryBackend backend;
064
065    private final Collection<SessionImpl> sessions;
066
067    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
068
069    protected final Counter repositoryUp;
070
071    protected final Counter sessionCount;
072
073    private LockManager lockManager;
074
075    /**
076     * @since 7.4 : used to know if the LockManager was provided by this repository or externally
077     */
078    protected boolean selfRegisteredLockManager = false;
079
080    /** Propagator of invalidations to all mappers' caches. */
081    protected final InvalidationsPropagator invalidationsPropagator;
082
083    private Model model;
084
085    /**
086     * Transient id for this repository assigned by the server on first connection. This is not persisted.
087     */
088    public String repositoryId;
089
090    public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) {
091        this.repositoryDescriptor = repositoryDescriptor;
092        sessions = new CopyOnWriteArrayList<SessionImpl>();
093        invalidationsPropagator = new InvalidationsPropagator();
094
095        String className = repositoryDescriptor.fulltextParser;
096        if (StringUtils.isBlank(className)) {
097            className = DefaultFulltextParser.class.getName();
098        }
099        Class<?> klass;
100        try {
101            klass = Thread.currentThread().getContextClassLoader().loadClass(className);
102        } catch (ClassNotFoundException e) {
103            throw new NuxeoException("Unknown fulltext parser class: " + className, e);
104        }
105        if (!FulltextParser.class.isAssignableFrom(klass)) {
106            throw new NuxeoException("Invalid fulltext parser class: " + className);
107        }
108        fulltextParserClass = (Class<? extends FulltextParser>) klass;
109
110        backend = createBackend();
111        repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
112                "instance-up"));
113        repositoryUp.inc();
114        sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
115                "sessions"));
116        createMetricsGauges();
117    }
118
119    protected void createMetricsGauges() {
120        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size");
121        registry.remove(gaugeName);
122        registry.register(gaugeName, new Gauge<Long>() {
123            @Override
124            public Long getValue() {
125                return getCacheSize();
126            }
127        });
128        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines");
129        registry.remove(gaugeName);
130        registry.register(gaugeName, new Gauge<Long>() {
131            @Override
132            public Long getValue() {
133                return getCachePristineSize();
134            }
135        });
136        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections");
137        registry.remove(gaugeName);
138        registry.register(gaugeName, new Gauge<Long>() {
139            @Override
140            public Long getValue() {
141                return getCacheSelectionSize();
142            }
143        });
144        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers");
145        registry.remove(gaugeName);
146        registry.register(gaugeName, new Gauge<Long>() {
147            @Override
148            public Long getValue() {
149                return getCacheMapperSize();
150            }
151        });
152    }
153
154    protected RepositoryBackend createBackend() {
155        Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass;
156        if (backendClass == null) {
157            backendClass = JDBCBackend.class;
158        }
159        try {
160            RepositoryBackend backend = backendClass.newInstance();
161            backend.initialize(this);
162            return backend;
163        } catch (ReflectiveOperationException e) {
164            throw new NuxeoException(e);
165        }
166    }
167
168    protected Mapper createCachingMapper(Model model, Mapper mapper) {
169        try {
170            Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
171            if (cachingMapperClass == null) {
172                return mapper;
173            }
174            CachingMapper cachingMapper = cachingMapperClass.newInstance();
175            cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator,
176                    repositoryDescriptor.cachingMapperProperties);
177            return cachingMapper;
178        } catch (ReflectiveOperationException e) {
179            throw new NuxeoException(e);
180        }
181    }
182
183    protected Class<? extends CachingMapper> getCachingMapperClass() {
184        if (!repositoryDescriptor.getCachingMapperEnabled()) {
185            return null;
186        }
187        Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass;
188        if (cachingMapperClass == null) {
189            // default cache
190            cachingMapperClass = SoftRefCachingMapper.class;
191        }
192        return cachingMapperClass;
193    }
194
195    public RepositoryDescriptor getRepositoryDescriptor() {
196        return repositoryDescriptor;
197    }
198
199    public LockManager getLockManager() {
200        return lockManager;
201    }
202
203    public Model getModel() {
204        return model;
205    }
206
207    public InvalidationsPropagator getInvalidationsPropagator() {
208        return invalidationsPropagator;
209    }
210
211    public Class<? extends FulltextParser> getFulltextParserClass() {
212        return fulltextParserClass;
213    }
214
215    /*
216     * ----- javax.resource.cci.ConnectionFactory -----
217     */
218
219    /**
220     * Gets a new connection.
221     *
222     * @param connectionSpec the parameters to use to connect (unused)
223     * @return the session
224     */
225    @Override
226    public SessionImpl getConnection(ConnectionSpec connectionSpec) {
227        return getConnection();
228    }
229
230    /**
231     * Gets a new connection.
232     *
233     * @return the session
234     */
235    @Override
236    public synchronized SessionImpl getConnection() {
237        if (Framework.getRuntime().isShuttingDown()) {
238            throw new IllegalStateException("Cannot open connection, runtime is shutting down");
239        }
240        if (model == null) {
241            initRepository();
242        }
243        SessionPathResolver pathResolver = new SessionPathResolver();
244        Mapper mapper = newMapper(pathResolver, true);
245        SessionImpl session = newSession(model, mapper);
246        pathResolver.setSession(session);
247        sessions.add(session);
248        sessionCount.inc();
249        return session;
250    }
251
252    /**
253     * Creates a new mapper.
254     *
255     * @param pathResolver the path resolver (for regular mappers)
256     * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager /
257     *            cluster invalidator)
258     * @return the new mapper.
259     * @since 7.4
260     */
261    public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) {
262        return backend.newMapper(model, pathResolver, useInvalidations);
263    }
264
265    protected void initRepository() {
266        log.debug("Initializing");
267        ModelSetup modelSetup = new ModelSetup();
268        modelSetup.repositoryDescriptor = repositoryDescriptor;
269        backend.initializeModelSetup(modelSetup);
270        model = new Model(modelSetup);
271        backend.initializeModel(model);
272        initLockManager();
273
274        // create the cluster invalidator
275        if (repositoryDescriptor.getClusteringEnabled()) {
276            initClusterInvalidator();
277        }
278
279        // log once which mapper cache is being used
280        Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
281        if (cachingMapperClass == null) {
282            log.warn("VCS Mapper cache is disabled.");
283        } else {
284            log.info("VCS Mapper cache using: " + cachingMapperClass.getName());
285        }
286    }
287
288    protected String getLockManagerName() {
289        // TODO configure in repo descriptor
290        return getName();
291    }
292
293    protected void initLockManager() {
294        String lockManagerName = getLockManagerName();
295        LockManagerService lockManagerService = Framework.getService(LockManagerService.class);
296        lockManager = lockManagerService.getLockManager(lockManagerName);
297        if (lockManager == null) {
298            // no descriptor
299            // default to a VCSLockManager
300            lockManager = new VCSLockManager(lockManagerName);
301            lockManagerService.registerLockManager(lockManagerName, lockManager);
302            selfRegisteredLockManager = true;
303        } else {
304            selfRegisteredLockManager = false;
305        }
306        log.info("Repository " + getName() + " using lock manager " + lockManager);
307    }
308
309    protected void initClusterInvalidator() {
310        String nodeId = repositoryDescriptor.getClusterNodeId();
311        if (StringUtils.isBlank(nodeId)) {
312            // need a smallish int because of SQL Server legacy node ids
313            nodeId = String.valueOf(RANDOM.nextInt(32768));
314            log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). "
315                    + "Using random cluster node id instead: " + nodeId);
316        } else {
317            nodeId = nodeId.trim();
318        }
319        ClusterInvalidator clusterInvalidator = createClusterInvalidator();
320        clusterInvalidator.initialize(nodeId, this);
321        backend.setClusterInvalidator(clusterInvalidator);
322    }
323
324    protected ClusterInvalidator createClusterInvalidator() {
325        Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass;
326        if (klass == null) {
327            klass = JDBCClusterInvalidator.class;
328        }
329        try {
330            return klass.newInstance();
331        } catch (ReflectiveOperationException e) {
332            throw new NuxeoException(e);
333        }
334    }
335
336    protected SessionImpl newSession(Model model, Mapper mapper) {
337        mapper = createCachingMapper(model, mapper);
338        return new SessionImpl(this, model, mapper);
339    }
340
341    public static class SessionPathResolver implements PathResolver {
342
343        private Session session;
344
345        protected void setSession(Session session) {
346            this.session = session;
347        }
348
349        @Override
350        public Serializable getIdForPath(String path) {
351            Node node = session.getNodeByPath(path, null);
352            return node == null ? null : node.getId();
353        }
354    }
355
356    /*
357     * -----
358     */
359
360    @Override
361    public ResourceAdapterMetaData getMetaData() {
362        throw new UnsupportedOperationException();
363    }
364
365    @Override
366    public RecordFactory getRecordFactory() {
367        throw new UnsupportedOperationException();
368    }
369
370    /*
371     * ----- javax.resource.Referenceable -----
372     */
373
374    private Reference reference;
375
376    @Override
377    public void setReference(Reference reference) {
378        this.reference = reference;
379    }
380
381    @Override
382    public Reference getReference() {
383        return reference;
384    }
385
386    /*
387     * ----- Repository -----
388     */
389
390    @Override
391    public synchronized void close() {
392        closeAllSessions();
393        model = null;
394        backend.shutdown();
395
396        registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size"));
397        registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size"));
398        registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size"));
399
400        if (selfRegisteredLockManager) {
401            LockManagerService lms = Framework.getService(LockManagerService.class);
402            if (lms != null) {
403                lms.unregisterLockManager(getLockManagerName());
404            }
405        }
406    }
407
408    protected synchronized void closeAllSessions() {
409        for (SessionImpl session : sessions) {
410            if (!session.isLive()) {
411                continue;
412            }
413            session.closeSession();
414        }
415        sessions.clear();
416        sessionCount.dec(sessionCount.getCount());
417        if (lockManager != null) {
418            lockManager.closeLockManager();
419        }
420    }
421
422    /*
423     * ----- RepositoryManagement -----
424     */
425
426    @Override
427    public String getName() {
428        return repositoryDescriptor.name;
429    }
430
431    @Override
432    public int getActiveSessionsCount() {
433        return sessions.size();
434    }
435
436    @Override
437    public int clearCaches() {
438        int n = 0;
439        for (SessionImpl session : sessions) {
440            n += session.clearCaches();
441        }
442        if (lockManager != null) {
443            lockManager.clearLockManagerCaches();
444        }
445        return n;
446    }
447
448    @Override
449    public long getCacheSize() {
450        long size = 0;
451        for (SessionImpl session : sessions) {
452            size += session.getCacheSize();
453        }
454        return size;
455    }
456
457    public long getCacheMapperSize() {
458        long size = 0;
459        for (SessionImpl session : sessions) {
460            size += session.getCacheMapperSize();
461        }
462        return size;
463    }
464
465    @Override
466    public long getCachePristineSize() {
467        long size = 0;
468        for (SessionImpl session : sessions) {
469            size += session.getCachePristineSize();
470        }
471        return size;
472    }
473
474    @Override
475    public long getCacheSelectionSize() {
476        long size = 0;
477        for (SessionImpl session : sessions) {
478            size += session.getCacheSelectionSize();
479        }
480        return size;
481    }
482
483    @Override
484    public void processClusterInvalidationsNext() {
485        // TODO pass through or something
486    }
487
488    @Override
489    public void markReferencedBinaries() {
490        try {
491            SessionImpl conn = getConnection();
492            try {
493                conn.markReferencedBinaries();
494            } finally {
495                conn.close();
496            }
497        } catch (ResourceException e) {
498            throw new RuntimeException(e);
499        }
500    }
501
502    @Override
503    public int cleanupDeletedDocuments(int max, Calendar beforeTime) {
504        if (!repositoryDescriptor.getSoftDeleteEnabled()) {
505            return 0;
506        }
507        try {
508            SessionImpl conn = getConnection();
509            try {
510                return conn.cleanupDeletedDocuments(max, beforeTime);
511            } finally {
512                conn.close();
513            }
514        } catch (ResourceException e) {
515            throw new RuntimeException(e);
516        }
517    }
518
519    /*
520     * ----- -----
521     */
522
523    // callback by session at close time
524    protected void closeSession(SessionImpl session) {
525        sessions.remove(session);
526        sessionCount.dec();
527    }
528
529}