001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.util.Calendar;
024import java.util.Collection;
025import java.util.Random;
026import java.util.concurrent.CopyOnWriteArrayList;
027
028import javax.naming.Reference;
029import javax.resource.ResourceException;
030import javax.resource.cci.ConnectionSpec;
031import javax.resource.cci.RecordFactory;
032import javax.resource.cci.ResourceAdapterMetaData;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.model.LockManager;
039import org.nuxeo.ecm.core.storage.DefaultFulltextParser;
040import org.nuxeo.ecm.core.storage.FulltextParser;
041import org.nuxeo.ecm.core.storage.lock.LockManagerService;
042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend;
044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.metrics.MetricsService;
047
048import com.codahale.metrics.Counter;
049import com.codahale.metrics.Gauge;
050import com.codahale.metrics.MetricRegistry;
051import com.codahale.metrics.SharedMetricRegistries;
052
053/**
054 * {@link Repository} implementation, to be extended by backend-specific initialization code.
055 *
056 * @see RepositoryBackend
057 */
058public class RepositoryImpl implements Repository {
059
060    private static final long serialVersionUID = 1L;
061
062    private static final Log log = LogFactory.getLog(RepositoryImpl.class);
063
064    private static final Random RANDOM = new Random();
065
066    protected final RepositoryDescriptor repositoryDescriptor;
067
068    protected final Class<? extends FulltextParser> fulltextParserClass;
069
070    private final RepositoryBackend backend;
071
072    private final Collection<SessionImpl> sessions;
073
074    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
075
076    protected final Counter repositoryUp;
077
078    protected final Counter sessionCount;
079
080    private LockManager lockManager;
081
082    /**
083     * @since 7.4 : used to know if the LockManager was provided by this repository or externally
084     */
085    protected boolean selfRegisteredLockManager = false;
086
087    /** Propagator of invalidations to all mappers' caches. */
088    protected final InvalidationsPropagator invalidationsPropagator;
089
090    private Model model;
091
092    /**
093     * Transient id for this repository assigned by the server on first connection. This is not persisted.
094     */
095    public String repositoryId;
096
097    public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) {
098        this.repositoryDescriptor = repositoryDescriptor;
099        sessions = new CopyOnWriteArrayList<SessionImpl>();
100        invalidationsPropagator = new InvalidationsPropagator();
101
102        String className = repositoryDescriptor.getFulltextDescriptor().getFulltextParser();
103        if (StringUtils.isBlank(className)) {
104            className = DefaultFulltextParser.class.getName();
105        }
106        Class<?> klass;
107        try {
108            klass = Thread.currentThread().getContextClassLoader().loadClass(className);
109        } catch (ClassNotFoundException e) {
110            throw new NuxeoException("Unknown fulltext parser class: " + className, e);
111        }
112        if (!FulltextParser.class.isAssignableFrom(klass)) {
113            throw new NuxeoException("Invalid fulltext parser class: " + className);
114        }
115        fulltextParserClass = (Class<? extends FulltextParser>) klass;
116
117        backend = createBackend();
118        repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
119                "instance-up"));
120        repositoryUp.inc();
121        sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
122                "sessions"));
123        createMetricsGauges();
124    }
125
126    protected void createMetricsGauges() {
127        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size");
128        registry.remove(gaugeName);
129        registry.register(gaugeName, new Gauge<Long>() {
130            @Override
131            public Long getValue() {
132                return getCacheSize();
133            }
134        });
135        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines");
136        registry.remove(gaugeName);
137        registry.register(gaugeName, new Gauge<Long>() {
138            @Override
139            public Long getValue() {
140                return getCachePristineSize();
141            }
142        });
143        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections");
144        registry.remove(gaugeName);
145        registry.register(gaugeName, new Gauge<Long>() {
146            @Override
147            public Long getValue() {
148                return getCacheSelectionSize();
149            }
150        });
151        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers");
152        registry.remove(gaugeName);
153        registry.register(gaugeName, new Gauge<Long>() {
154            @Override
155            public Long getValue() {
156                return getCacheMapperSize();
157            }
158        });
159    }
160
161    protected RepositoryBackend createBackend() {
162        Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass;
163        if (backendClass == null) {
164            backendClass = JDBCBackend.class;
165        }
166        try {
167            RepositoryBackend backend = backendClass.newInstance();
168            backend.initialize(this);
169            return backend;
170        } catch (ReflectiveOperationException e) {
171            throw new NuxeoException(e);
172        }
173    }
174
175    protected Mapper createCachingMapper(Model model, Mapper mapper) {
176        try {
177            Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
178            if (cachingMapperClass == null) {
179                return mapper;
180            }
181            CachingMapper cachingMapper = cachingMapperClass.newInstance();
182            cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator,
183                    repositoryDescriptor.cachingMapperProperties);
184            return cachingMapper;
185        } catch (ReflectiveOperationException e) {
186            throw new NuxeoException(e);
187        }
188    }
189
190    protected Class<? extends CachingMapper> getCachingMapperClass() {
191        if (!repositoryDescriptor.getCachingMapperEnabled()) {
192            return null;
193        }
194        Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass;
195        if (cachingMapperClass == null) {
196            // default cache
197            cachingMapperClass = SoftRefCachingMapper.class;
198        }
199        return cachingMapperClass;
200    }
201
202    public RepositoryDescriptor getRepositoryDescriptor() {
203        return repositoryDescriptor;
204    }
205
206    public LockManager getLockManager() {
207        return lockManager;
208    }
209
210    public Model getModel() {
211        return model;
212    }
213
214    public InvalidationsPropagator getInvalidationsPropagator() {
215        return invalidationsPropagator;
216    }
217
218    public Class<? extends FulltextParser> getFulltextParserClass() {
219        return fulltextParserClass;
220    }
221
222    /*
223     * ----- javax.resource.cci.ConnectionFactory -----
224     */
225
226    /**
227     * Gets a new connection.
228     *
229     * @param connectionSpec the parameters to use to connect (unused)
230     * @return the session
231     */
232    @Override
233    public SessionImpl getConnection(ConnectionSpec connectionSpec) {
234        return getConnection();
235    }
236
237    /**
238     * Gets a new connection.
239     *
240     * @return the session
241     */
242    @Override
243    public synchronized SessionImpl getConnection() {
244        if (Framework.getRuntime().isShuttingDown()) {
245            throw new IllegalStateException("Cannot open connection, runtime is shutting down");
246        }
247        if (model == null) {
248            initRepository();
249        }
250        SessionPathResolver pathResolver = new SessionPathResolver();
251        Mapper mapper = newMapper(pathResolver, true);
252        SessionImpl session = newSession(model, mapper);
253        pathResolver.setSession(session);
254        sessions.add(session);
255        sessionCount.inc();
256        return session;
257    }
258
259    /**
260     * Creates a new mapper.
261     *
262     * @param pathResolver the path resolver (for regular mappers)
263     * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager /
264     *            cluster invalidator)
265     * @return the new mapper.
266     * @since 7.4
267     */
268    public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) {
269        return backend.newMapper(model, pathResolver, useInvalidations);
270    }
271
272    protected void initRepository() {
273        log.debug("Initializing");
274        ModelSetup modelSetup = new ModelSetup();
275        modelSetup.repositoryDescriptor = repositoryDescriptor;
276        backend.initializeModelSetup(modelSetup);
277        model = new Model(modelSetup);
278        backend.initializeModel(model);
279        initLockManager();
280
281        // create the cluster invalidator
282        if (repositoryDescriptor.getClusteringEnabled()) {
283            initClusterInvalidator();
284        }
285
286        // log once which mapper cache is being used
287        Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
288        if (cachingMapperClass == null) {
289            log.warn("VCS Mapper cache is disabled.");
290        } else {
291            log.info("VCS Mapper cache using: " + cachingMapperClass.getName());
292        }
293    }
294
295    protected String getLockManagerName() {
296        // TODO configure in repo descriptor
297        return getName();
298    }
299
300    protected void initLockManager() {
301        String lockManagerName = getLockManagerName();
302        LockManagerService lockManagerService = Framework.getService(LockManagerService.class);
303        lockManager = lockManagerService.getLockManager(lockManagerName);
304        if (lockManager == null) {
305            // no descriptor
306            // default to a VCSLockManager
307            lockManager = new VCSLockManager(lockManagerName);
308            lockManagerService.registerLockManager(lockManagerName, lockManager);
309            selfRegisteredLockManager = true;
310        } else {
311            selfRegisteredLockManager = false;
312        }
313        log.info("Repository " + getName() + " using lock manager " + lockManager);
314    }
315
316    protected void initClusterInvalidator() {
317        String nodeId = repositoryDescriptor.getClusterNodeId();
318        if (StringUtils.isBlank(nodeId)) {
319            // need a smallish int because of SQL Server legacy node ids
320            nodeId = String.valueOf(RANDOM.nextInt(32768));
321            log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). "
322                    + "Using random cluster node id instead: " + nodeId);
323        } else {
324            nodeId = nodeId.trim();
325        }
326        ClusterInvalidator clusterInvalidator = createClusterInvalidator();
327        clusterInvalidator.initialize(nodeId, this);
328        backend.setClusterInvalidator(clusterInvalidator);
329    }
330
331    protected ClusterInvalidator createClusterInvalidator() {
332        Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass;
333        if (klass == null) {
334            klass = JDBCClusterInvalidator.class;
335        }
336        try {
337            return klass.newInstance();
338        } catch (ReflectiveOperationException e) {
339            throw new NuxeoException(e);
340        }
341    }
342
343    protected SessionImpl newSession(Model model, Mapper mapper) {
344        mapper = createCachingMapper(model, mapper);
345        return new SessionImpl(this, model, mapper);
346    }
347
348    public static class SessionPathResolver implements PathResolver {
349
350        private Session session;
351
352        protected void setSession(Session session) {
353            this.session = session;
354        }
355
356        @Override
357        public Serializable getIdForPath(String path) {
358            Node node = session.getNodeByPath(path, null);
359            return node == null ? null : node.getId();
360        }
361    }
362
363    /*
364     * -----
365     */
366
367    @Override
368    public ResourceAdapterMetaData getMetaData() {
369        throw new UnsupportedOperationException();
370    }
371
372    @Override
373    public RecordFactory getRecordFactory() {
374        throw new UnsupportedOperationException();
375    }
376
377    /*
378     * ----- javax.resource.Referenceable -----
379     */
380
381    private Reference reference;
382
383    @Override
384    public void setReference(Reference reference) {
385        this.reference = reference;
386    }
387
388    @Override
389    public Reference getReference() {
390        return reference;
391    }
392
393    /*
394     * ----- Repository -----
395     */
396
397    @Override
398    public synchronized void close() {
399        closeAllSessions();
400        model = null;
401        backend.shutdown();
402
403        registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size"));
404        registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size"));
405        registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size"));
406
407        if (selfRegisteredLockManager) {
408            LockManagerService lms = Framework.getService(LockManagerService.class);
409            if (lms != null) {
410                lms.unregisterLockManager(getLockManagerName());
411            }
412        }
413    }
414
415    protected synchronized void closeAllSessions() {
416        for (SessionImpl session : sessions) {
417            if (!session.isLive()) {
418                continue;
419            }
420            session.closeSession();
421        }
422        sessions.clear();
423        sessionCount.dec(sessionCount.getCount());
424        if (lockManager != null) {
425            lockManager.closeLockManager();
426        }
427    }
428
429    /*
430     * ----- RepositoryManagement -----
431     */
432
433    @Override
434    public String getName() {
435        return repositoryDescriptor.name;
436    }
437
438    @Override
439    public int getActiveSessionsCount() {
440        return sessions.size();
441    }
442
443    @Override
444    public int clearCaches() {
445        int n = 0;
446        for (SessionImpl session : sessions) {
447            n += session.clearCaches();
448        }
449        if (lockManager != null) {
450            lockManager.clearLockManagerCaches();
451        }
452        return n;
453    }
454
455    @Override
456    public long getCacheSize() {
457        long size = 0;
458        for (SessionImpl session : sessions) {
459            size += session.getCacheSize();
460        }
461        return size;
462    }
463
464    public long getCacheMapperSize() {
465        long size = 0;
466        for (SessionImpl session : sessions) {
467            size += session.getCacheMapperSize();
468        }
469        return size;
470    }
471
472    @Override
473    public long getCachePristineSize() {
474        long size = 0;
475        for (SessionImpl session : sessions) {
476            size += session.getCachePristineSize();
477        }
478        return size;
479    }
480
481    @Override
482    public long getCacheSelectionSize() {
483        long size = 0;
484        for (SessionImpl session : sessions) {
485            size += session.getCacheSelectionSize();
486        }
487        return size;
488    }
489
490    @Override
491    public void processClusterInvalidationsNext() {
492        // TODO pass through or something
493    }
494
495    @Override
496    public void markReferencedBinaries() {
497        try {
498            SessionImpl conn = getConnection();
499            try {
500                conn.markReferencedBinaries();
501            } finally {
502                conn.close();
503            }
504        } catch (ResourceException e) {
505            throw new RuntimeException(e);
506        }
507    }
508
509    @Override
510    public int cleanupDeletedDocuments(int max, Calendar beforeTime) {
511        if (!repositoryDescriptor.getSoftDeleteEnabled()) {
512            return 0;
513        }
514        try {
515            SessionImpl conn = getConnection();
516            try {
517                return conn.cleanupDeletedDocuments(max, beforeTime);
518            } finally {
519                conn.close();
520            }
521        } catch (ResourceException e) {
522            throw new RuntimeException(e);
523        }
524    }
525
526    /*
527     * ----- -----
528     */
529
530    // callback by session at close time
531    protected void closeSession(SessionImpl session) {
532        sessions.remove(session);
533        sessionCount.dec();
534    }
535
536}