001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.util.Calendar;
024import java.util.Collection;
025import java.util.Random;
026import java.util.concurrent.CopyOnWriteArrayList;
027
028import javax.naming.Reference;
029import javax.resource.ResourceException;
030import javax.resource.cci.ConnectionSpec;
031import javax.resource.cci.RecordFactory;
032import javax.resource.cci.ResourceAdapterMetaData;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.model.LockManager;
039import org.nuxeo.ecm.core.storage.DefaultFulltextParser;
040import org.nuxeo.ecm.core.storage.FulltextParser;
041import org.nuxeo.ecm.core.storage.lock.LockManagerService;
042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend;
044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.metrics.MetricsService;
047
048import com.codahale.metrics.Counter;
049import com.codahale.metrics.Gauge;
050import com.codahale.metrics.MetricRegistry;
051import com.codahale.metrics.SharedMetricRegistries;
052
053/**
054 * {@link Repository} implementation, to be extended by backend-specific initialization code.
055 *
056 * @see RepositoryBackend
057 */
058public class RepositoryImpl implements Repository {
059
060    private static final long serialVersionUID = 1L;
061
062    private static final Log log = LogFactory.getLog(RepositoryImpl.class);
063
064    private static final Random RANDOM = new Random();
065
066    protected final RepositoryDescriptor repositoryDescriptor;
067
068    protected final Class<? extends FulltextParser> fulltextParserClass;
069
070    private RepositoryBackend backend;
071
072    private final Collection<SessionImpl> sessions;
073
074    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
075
076    protected final Counter repositoryUp;
077
078    protected final Counter sessionCount;
079
080    private LockManager lockManager;
081
082    /**
083     * @since 7.4 : used to know if the LockManager was provided by this repository or externally
084     */
085    protected boolean selfRegisteredLockManager = false;
086
087    /** Propagator of invalidations to all mappers' caches. */
088    protected final InvalidationsPropagator invalidationsPropagator;
089
090    private Model model;
091
092    /**
093     * Transient id for this repository assigned by the server on first connection. This is not persisted.
094     */
095    public String repositoryId;
096
097    public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) {
098        this.repositoryDescriptor = repositoryDescriptor;
099        sessions = new CopyOnWriteArrayList<SessionImpl>();
100        invalidationsPropagator = new InvalidationsPropagator();
101
102        String className = repositoryDescriptor.getFulltextDescriptor().getFulltextParser();
103        if (StringUtils.isBlank(className)) {
104            className = DefaultFulltextParser.class.getName();
105        }
106        Class<?> klass;
107        try {
108            klass = Thread.currentThread().getContextClassLoader().loadClass(className);
109        } catch (ClassNotFoundException e) {
110            throw new NuxeoException("Unknown fulltext parser class: " + className, e);
111        }
112        if (!FulltextParser.class.isAssignableFrom(klass)) {
113            throw new NuxeoException("Invalid fulltext parser class: " + className);
114        }
115        fulltextParserClass = (Class<? extends FulltextParser>) klass;
116
117        repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
118                "instance-up"));
119        repositoryUp.inc();
120        sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
121                "sessions"));
122        createMetricsGauges();
123
124        initRepository();
125    }
126
127    protected void createMetricsGauges() {
128        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size");
129        registry.remove(gaugeName);
130        registry.register(gaugeName, new Gauge<Long>() {
131            @Override
132            public Long getValue() {
133                return getCacheSize();
134            }
135        });
136        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines");
137        registry.remove(gaugeName);
138        registry.register(gaugeName, new Gauge<Long>() {
139            @Override
140            public Long getValue() {
141                return getCachePristineSize();
142            }
143        });
144        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections");
145        registry.remove(gaugeName);
146        registry.register(gaugeName, new Gauge<Long>() {
147            @Override
148            public Long getValue() {
149                return getCacheSelectionSize();
150            }
151        });
152        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers");
153        registry.remove(gaugeName);
154        registry.register(gaugeName, new Gauge<Long>() {
155            @Override
156            public Long getValue() {
157                return getCacheMapperSize();
158            }
159        });
160    }
161
162    protected RepositoryBackend createBackend() {
163        Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass;
164        if (backendClass == null) {
165            backendClass = JDBCBackend.class;
166        }
167        try {
168            RepositoryBackend backend = backendClass.newInstance();
169            return backend;
170        } catch (ReflectiveOperationException e) {
171            throw new NuxeoException(e);
172        }
173    }
174
175    protected Mapper createCachingMapper(Model model, Mapper mapper) {
176        try {
177            Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
178            if (cachingMapperClass == null) {
179                return mapper;
180            }
181            CachingMapper cachingMapper = cachingMapperClass.newInstance();
182            cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator,
183                    repositoryDescriptor.cachingMapperProperties);
184            return cachingMapper;
185        } catch (ReflectiveOperationException e) {
186            throw new NuxeoException(e);
187        }
188    }
189
190    protected Class<? extends CachingMapper> getCachingMapperClass() {
191        if (!repositoryDescriptor.getCachingMapperEnabled()) {
192            return null;
193        }
194        Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass;
195        if (cachingMapperClass == null) {
196            // default cache
197            cachingMapperClass = SoftRefCachingMapper.class;
198        }
199        return cachingMapperClass;
200    }
201
202    public RepositoryDescriptor getRepositoryDescriptor() {
203        return repositoryDescriptor;
204    }
205
206    public LockManager getLockManager() {
207        return lockManager;
208    }
209
210    public Model getModel() {
211        return model;
212    }
213
214    public InvalidationsPropagator getInvalidationsPropagator() {
215        return invalidationsPropagator;
216    }
217
218    public Class<? extends FulltextParser> getFulltextParserClass() {
219        return fulltextParserClass;
220    }
221
222    public boolean isChangeTokenEnabled() {
223        return repositoryDescriptor.isChangeTokenEnabled();
224    }
225
226    /*
227     * ----- javax.resource.cci.ConnectionFactory -----
228     */
229
230    /**
231     * Gets a new connection.
232     *
233     * @param connectionSpec the parameters to use to connect (unused)
234     * @return the session
235     */
236    @Override
237    public SessionImpl getConnection(ConnectionSpec connectionSpec) {
238        return getConnection();
239    }
240
241    /**
242     * Gets a new connection.
243     *
244     * @return the session
245     */
246    @Override
247    public synchronized SessionImpl getConnection() {
248        if (Framework.getRuntime().isShuttingDown()) {
249            throw new IllegalStateException("Cannot open connection, runtime is shutting down");
250        }
251        SessionPathResolver pathResolver = new SessionPathResolver();
252        Mapper mapper = newMapper(pathResolver, true);
253        SessionImpl session = newSession(model, mapper);
254        pathResolver.setSession(session);
255        sessions.add(session);
256        sessionCount.inc();
257        return session;
258    }
259
260    /**
261     * Creates a new mapper.
262     *
263     * @param pathResolver the path resolver (for regular mappers)
264     * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager /
265     *            cluster invalidator)
266     * @return the new mapper.
267     * @since 7.4
268     */
269    public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) {
270        return backend.newMapper(pathResolver, useInvalidations);
271    }
272
273    protected void initRepository() {
274        log.debug("Initializing");
275        backend = createBackend();
276        model = backend.initialize(this);
277        initLockManager();
278
279        // create the cluster invalidator
280        if (repositoryDescriptor.getClusteringEnabled()) {
281            initClusterInvalidator();
282        }
283
284        // log once which mapper cache is being used
285        Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
286        if (cachingMapperClass == null) {
287            log.warn("VCS Mapper cache is disabled.");
288        } else {
289            log.info("VCS Mapper cache using: " + cachingMapperClass.getName());
290        }
291
292        initRootNode();
293    }
294
295    protected void initRootNode() {
296        try {
297            // access a session once so that SessionImpl.computeRootNode can create the root node
298            getConnection().close();
299        } catch (ResourceException e) {
300            throw new RuntimeException(e);
301        }
302    }
303
304    protected String getLockManagerName() {
305        // TODO configure in repo descriptor
306        return getName();
307    }
308
309    protected void initLockManager() {
310        String lockManagerName = getLockManagerName();
311        LockManagerService lockManagerService = Framework.getService(LockManagerService.class);
312        lockManager = lockManagerService.getLockManager(lockManagerName);
313        if (lockManager == null) {
314            // no descriptor
315            // default to a VCSLockManager
316            lockManager = new VCSLockManager(this);
317            lockManagerService.registerLockManager(lockManagerName, lockManager);
318            selfRegisteredLockManager = true;
319        } else {
320            selfRegisteredLockManager = false;
321        }
322        log.info("Repository " + getName() + " using lock manager " + lockManager);
323    }
324
325    protected void initClusterInvalidator() {
326        String nodeId = repositoryDescriptor.getClusterNodeId();
327        if (StringUtils.isBlank(nodeId)) {
328            // need a smallish int because of SQL Server legacy node ids
329            nodeId = String.valueOf(RANDOM.nextInt(32768));
330            log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). "
331                    + "Using random cluster node id instead: " + nodeId);
332        } else {
333            nodeId = nodeId.trim();
334        }
335        ClusterInvalidator clusterInvalidator = createClusterInvalidator();
336        clusterInvalidator.initialize(nodeId, this);
337        backend.setClusterInvalidator(clusterInvalidator);
338    }
339
340    protected ClusterInvalidator createClusterInvalidator() {
341        Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass;
342        if (klass == null) {
343            klass = JDBCClusterInvalidator.class;
344        }
345        try {
346            return klass.newInstance();
347        } catch (ReflectiveOperationException e) {
348            throw new NuxeoException(e);
349        }
350    }
351
352    protected SessionImpl newSession(Model model, Mapper mapper) {
353        mapper = createCachingMapper(model, mapper);
354        return new SessionImpl(this, model, mapper);
355    }
356
357    public static class SessionPathResolver implements PathResolver {
358
359        private Session session;
360
361        protected void setSession(Session session) {
362            this.session = session;
363        }
364
365        @Override
366        public Serializable getIdForPath(String path) {
367            Node node = session.getNodeByPath(path, null);
368            return node == null ? null : node.getId();
369        }
370    }
371
372    /*
373     * -----
374     */
375
376    @Override
377    public ResourceAdapterMetaData getMetaData() {
378        throw new UnsupportedOperationException();
379    }
380
381    @Override
382    public RecordFactory getRecordFactory() {
383        throw new UnsupportedOperationException();
384    }
385
386    /*
387     * ----- javax.resource.Referenceable -----
388     */
389
390    private Reference reference;
391
392    @Override
393    public void setReference(Reference reference) {
394        this.reference = reference;
395    }
396
397    @Override
398    public Reference getReference() {
399        return reference;
400    }
401
402    /*
403     * ----- Repository -----
404     */
405
406    @Override
407    public synchronized void close() {
408        closeAllSessions();
409        model = null;
410        backend.shutdown();
411
412        registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size"));
413        registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size"));
414        registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size"));
415
416        if (selfRegisteredLockManager) {
417            LockManagerService lms = Framework.getService(LockManagerService.class);
418            if (lms != null) {
419                lms.unregisterLockManager(getLockManagerName());
420            }
421        }
422    }
423
424    protected synchronized void closeAllSessions() {
425        for (SessionImpl session : sessions) {
426            if (!session.isLive()) {
427                continue;
428            }
429            session.closeSession();
430        }
431        sessions.clear();
432        sessionCount.dec(sessionCount.getCount());
433        if (lockManager != null) {
434            lockManager.closeLockManager();
435        }
436    }
437
438    /*
439     * ----- RepositoryManagement -----
440     */
441
442    @Override
443    public String getName() {
444        return repositoryDescriptor.name;
445    }
446
447    @Override
448    public int getActiveSessionsCount() {
449        return sessions.size();
450    }
451
452    @Override
453    public int clearCaches() {
454        int n = 0;
455        for (SessionImpl session : sessions) {
456            n += session.clearCaches();
457        }
458        if (lockManager != null) {
459            lockManager.clearLockManagerCaches();
460        }
461        return n;
462    }
463
464    @Override
465    public long getCacheSize() {
466        long size = 0;
467        for (SessionImpl session : sessions) {
468            size += session.getCacheSize();
469        }
470        return size;
471    }
472
473    public long getCacheMapperSize() {
474        long size = 0;
475        for (SessionImpl session : sessions) {
476            size += session.getCacheMapperSize();
477        }
478        return size;
479    }
480
481    @Override
482    public long getCachePristineSize() {
483        long size = 0;
484        for (SessionImpl session : sessions) {
485            size += session.getCachePristineSize();
486        }
487        return size;
488    }
489
490    @Override
491    public long getCacheSelectionSize() {
492        long size = 0;
493        for (SessionImpl session : sessions) {
494            size += session.getCacheSelectionSize();
495        }
496        return size;
497    }
498
499    @Override
500    public void processClusterInvalidationsNext() {
501        // TODO pass through or something
502    }
503
504    @Override
505    public void markReferencedBinaries() {
506        try {
507            SessionImpl conn = getConnection();
508            try {
509                conn.markReferencedBinaries();
510            } finally {
511                conn.close();
512            }
513        } catch (ResourceException e) {
514            throw new RuntimeException(e);
515        }
516    }
517
518    @Override
519    public int cleanupDeletedDocuments(int max, Calendar beforeTime) {
520        if (!repositoryDescriptor.getSoftDeleteEnabled()) {
521            return 0;
522        }
523        try {
524            SessionImpl conn = getConnection();
525            try {
526                return conn.cleanupDeletedDocuments(max, beforeTime);
527            } finally {
528                conn.close();
529            }
530        } catch (ResourceException e) {
531            throw new RuntimeException(e);
532        }
533    }
534
535    /*
536     * ----- -----
537     */
538
539    // callback by session at close time
540    protected void closeSession(SessionImpl session) {
541        sessions.remove(session);
542        sessionCount.dec();
543    }
544
545}