001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019
020package org.nuxeo.ecm.core.storage.sql;
021
022import java.io.Serializable;
023import java.util.Calendar;
024import java.util.Collection;
025import java.util.Random;
026import java.util.concurrent.CopyOnWriteArrayList;
027
028import javax.naming.Reference;
029import javax.resource.ResourceException;
030import javax.resource.cci.ConnectionSpec;
031import javax.resource.cci.RecordFactory;
032import javax.resource.cci.ResourceAdapterMetaData;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.model.LockManager;
039import org.nuxeo.ecm.core.storage.DefaultFulltextParser;
040import org.nuxeo.ecm.core.storage.FulltextParser;
041import org.nuxeo.ecm.core.storage.lock.LockManagerService;
042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend;
044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.metrics.MetricsService;
047
048import com.codahale.metrics.Counter;
049import com.codahale.metrics.Gauge;
050import com.codahale.metrics.MetricRegistry;
051import com.codahale.metrics.SharedMetricRegistries;
052
053/**
054 * {@link Repository} implementation, to be extended by backend-specific initialization code.
055 *
056 * @see RepositoryBackend
057 */
058public class RepositoryImpl implements Repository {
059
060    private static final long serialVersionUID = 1L;
061
062    private static final Log log = LogFactory.getLog(RepositoryImpl.class);
063
064    private static final Random RANDOM = new Random();
065
066    protected final RepositoryDescriptor repositoryDescriptor;
067
068    protected final Class<? extends FulltextParser> fulltextParserClass;
069
070    private final RepositoryBackend backend;
071
072    private final Collection<SessionImpl> sessions;
073
074    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
075
076    protected final Counter repositoryUp;
077
078    protected final Counter sessionCount;
079
080    private LockManager lockManager;
081
082    /**
083     * @since 7.4 : used to know if the LockManager was provided by this repository or externally
084     */
085    protected boolean selfRegisteredLockManager = false;
086
087    /** Propagator of invalidations to all mappers' caches. */
088    protected final InvalidationsPropagator invalidationsPropagator;
089
090    private Model model;
091
092    /**
093     * Transient id for this repository assigned by the server on first connection. This is not persisted.
094     */
095    public String repositoryId;
096
097    public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) {
098        this.repositoryDescriptor = repositoryDescriptor;
099        sessions = new CopyOnWriteArrayList<SessionImpl>();
100        invalidationsPropagator = new InvalidationsPropagator();
101
102        String className = repositoryDescriptor.getFulltextDescriptor().getFulltextParser();
103        if (StringUtils.isBlank(className)) {
104            className = DefaultFulltextParser.class.getName();
105        }
106        Class<?> klass;
107        try {
108            klass = Thread.currentThread().getContextClassLoader().loadClass(className);
109        } catch (ClassNotFoundException e) {
110            throw new NuxeoException("Unknown fulltext parser class: " + className, e);
111        }
112        if (!FulltextParser.class.isAssignableFrom(klass)) {
113            throw new NuxeoException("Invalid fulltext parser class: " + className);
114        }
115        fulltextParserClass = (Class<? extends FulltextParser>) klass;
116
117        backend = createBackend();
118        repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
119                "instance-up"));
120        repositoryUp.inc();
121        sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name,
122                "sessions"));
123        createMetricsGauges();
124    }
125
126    protected void createMetricsGauges() {
127        String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size");
128        registry.remove(gaugeName);
129        registry.register(gaugeName, new Gauge<Long>() {
130            @Override
131            public Long getValue() {
132                return getCacheSize();
133            }
134        });
135        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines");
136        registry.remove(gaugeName);
137        registry.register(gaugeName, new Gauge<Long>() {
138            @Override
139            public Long getValue() {
140                return getCachePristineSize();
141            }
142        });
143        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections");
144        registry.remove(gaugeName);
145        registry.register(gaugeName, new Gauge<Long>() {
146            @Override
147            public Long getValue() {
148                return getCacheSelectionSize();
149            }
150        });
151        gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers");
152        registry.remove(gaugeName);
153        registry.register(gaugeName, new Gauge<Long>() {
154            @Override
155            public Long getValue() {
156                return getCacheMapperSize();
157            }
158        });
159    }
160
161    protected RepositoryBackend createBackend() {
162        Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass;
163        if (backendClass == null) {
164            backendClass = JDBCBackend.class;
165        }
166        try {
167            RepositoryBackend backend = backendClass.newInstance();
168            backend.initialize(this);
169            return backend;
170        } catch (ReflectiveOperationException e) {
171            throw new NuxeoException(e);
172        }
173    }
174
175    protected Mapper createCachingMapper(Model model, Mapper mapper) {
176        try {
177            Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
178            if (cachingMapperClass == null) {
179                return mapper;
180            }
181            CachingMapper cachingMapper = cachingMapperClass.newInstance();
182            cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator,
183                    repositoryDescriptor.cachingMapperProperties);
184            return cachingMapper;
185        } catch (ReflectiveOperationException e) {
186            throw new NuxeoException(e);
187        }
188    }
189
190    protected Class<? extends CachingMapper> getCachingMapperClass() {
191        if (!repositoryDescriptor.getCachingMapperEnabled()) {
192            return null;
193        }
194        Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass;
195        if (cachingMapperClass == null) {
196            // default cache
197            cachingMapperClass = SoftRefCachingMapper.class;
198        }
199        return cachingMapperClass;
200    }
201
202    public RepositoryDescriptor getRepositoryDescriptor() {
203        return repositoryDescriptor;
204    }
205
206    public LockManager getLockManager() {
207        return lockManager;
208    }
209
210    public Model getModel() {
211        return model;
212    }
213
214    public InvalidationsPropagator getInvalidationsPropagator() {
215        return invalidationsPropagator;
216    }
217
218    public Class<? extends FulltextParser> getFulltextParserClass() {
219        return fulltextParserClass;
220    }
221
222    public boolean isChangeTokenEnabled() {
223        return repositoryDescriptor.isChangeTokenEnabled();
224    }
225
226    /*
227     * ----- javax.resource.cci.ConnectionFactory -----
228     */
229
230    /**
231     * Gets a new connection.
232     *
233     * @param connectionSpec the parameters to use to connect (unused)
234     * @return the session
235     */
236    @Override
237    public SessionImpl getConnection(ConnectionSpec connectionSpec) {
238        return getConnection();
239    }
240
241    /**
242     * Gets a new connection.
243     *
244     * @return the session
245     */
246    @Override
247    public synchronized SessionImpl getConnection() {
248        if (Framework.getRuntime().isShuttingDown()) {
249            throw new IllegalStateException("Cannot open connection, runtime is shutting down");
250        }
251        if (model == null) {
252            initRepository();
253        }
254        SessionPathResolver pathResolver = new SessionPathResolver();
255        Mapper mapper = newMapper(pathResolver, true);
256        SessionImpl session = newSession(model, mapper);
257        pathResolver.setSession(session);
258        sessions.add(session);
259        sessionCount.inc();
260        return session;
261    }
262
263    /**
264     * Creates a new mapper.
265     *
266     * @param pathResolver the path resolver (for regular mappers)
267     * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager /
268     *            cluster invalidator)
269     * @return the new mapper.
270     * @since 7.4
271     */
272    public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) {
273        return backend.newMapper(model, pathResolver, useInvalidations);
274    }
275
276    protected void initRepository() {
277        log.debug("Initializing");
278        ModelSetup modelSetup = new ModelSetup();
279        modelSetup.repositoryDescriptor = repositoryDescriptor;
280        backend.initializeModelSetup(modelSetup);
281        model = new Model(modelSetup);
282        backend.initializeModel(model);
283        initLockManager();
284
285        // create the cluster invalidator
286        if (repositoryDescriptor.getClusteringEnabled()) {
287            initClusterInvalidator();
288        }
289
290        // log once which mapper cache is being used
291        Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass();
292        if (cachingMapperClass == null) {
293            log.warn("VCS Mapper cache is disabled.");
294        } else {
295            log.info("VCS Mapper cache using: " + cachingMapperClass.getName());
296        }
297    }
298
299    protected String getLockManagerName() {
300        // TODO configure in repo descriptor
301        return getName();
302    }
303
304    protected void initLockManager() {
305        String lockManagerName = getLockManagerName();
306        LockManagerService lockManagerService = Framework.getService(LockManagerService.class);
307        lockManager = lockManagerService.getLockManager(lockManagerName);
308        if (lockManager == null) {
309            // no descriptor
310            // default to a VCSLockManager
311            lockManager = new VCSLockManager(lockManagerName);
312            lockManagerService.registerLockManager(lockManagerName, lockManager);
313            selfRegisteredLockManager = true;
314        } else {
315            selfRegisteredLockManager = false;
316        }
317        log.info("Repository " + getName() + " using lock manager " + lockManager);
318    }
319
320    protected void initClusterInvalidator() {
321        String nodeId = repositoryDescriptor.getClusterNodeId();
322        if (StringUtils.isBlank(nodeId)) {
323            // need a smallish int because of SQL Server legacy node ids
324            nodeId = String.valueOf(RANDOM.nextInt(32768));
325            log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). "
326                    + "Using random cluster node id instead: " + nodeId);
327        } else {
328            nodeId = nodeId.trim();
329        }
330        ClusterInvalidator clusterInvalidator = createClusterInvalidator();
331        clusterInvalidator.initialize(nodeId, this);
332        backend.setClusterInvalidator(clusterInvalidator);
333    }
334
335    protected ClusterInvalidator createClusterInvalidator() {
336        Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass;
337        if (klass == null) {
338            klass = JDBCClusterInvalidator.class;
339        }
340        try {
341            return klass.newInstance();
342        } catch (ReflectiveOperationException e) {
343            throw new NuxeoException(e);
344        }
345    }
346
347    protected SessionImpl newSession(Model model, Mapper mapper) {
348        mapper = createCachingMapper(model, mapper);
349        return new SessionImpl(this, model, mapper);
350    }
351
352    public static class SessionPathResolver implements PathResolver {
353
354        private Session session;
355
356        protected void setSession(Session session) {
357            this.session = session;
358        }
359
360        @Override
361        public Serializable getIdForPath(String path) {
362            Node node = session.getNodeByPath(path, null);
363            return node == null ? null : node.getId();
364        }
365    }
366
367    /*
368     * -----
369     */
370
371    @Override
372    public ResourceAdapterMetaData getMetaData() {
373        throw new UnsupportedOperationException();
374    }
375
376    @Override
377    public RecordFactory getRecordFactory() {
378        throw new UnsupportedOperationException();
379    }
380
381    /*
382     * ----- javax.resource.Referenceable -----
383     */
384
385    private Reference reference;
386
387    @Override
388    public void setReference(Reference reference) {
389        this.reference = reference;
390    }
391
392    @Override
393    public Reference getReference() {
394        return reference;
395    }
396
397    /*
398     * ----- Repository -----
399     */
400
401    @Override
402    public synchronized void close() {
403        closeAllSessions();
404        model = null;
405        backend.shutdown();
406
407        registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size"));
408        registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size"));
409        registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size"));
410
411        if (selfRegisteredLockManager) {
412            LockManagerService lms = Framework.getService(LockManagerService.class);
413            if (lms != null) {
414                lms.unregisterLockManager(getLockManagerName());
415            }
416        }
417    }
418
419    protected synchronized void closeAllSessions() {
420        for (SessionImpl session : sessions) {
421            if (!session.isLive()) {
422                continue;
423            }
424            session.closeSession();
425        }
426        sessions.clear();
427        sessionCount.dec(sessionCount.getCount());
428        if (lockManager != null) {
429            lockManager.closeLockManager();
430        }
431    }
432
433    /*
434     * ----- RepositoryManagement -----
435     */
436
437    @Override
438    public String getName() {
439        return repositoryDescriptor.name;
440    }
441
442    @Override
443    public int getActiveSessionsCount() {
444        return sessions.size();
445    }
446
447    @Override
448    public int clearCaches() {
449        int n = 0;
450        for (SessionImpl session : sessions) {
451            n += session.clearCaches();
452        }
453        if (lockManager != null) {
454            lockManager.clearLockManagerCaches();
455        }
456        return n;
457    }
458
459    @Override
460    public long getCacheSize() {
461        long size = 0;
462        for (SessionImpl session : sessions) {
463            size += session.getCacheSize();
464        }
465        return size;
466    }
467
468    public long getCacheMapperSize() {
469        long size = 0;
470        for (SessionImpl session : sessions) {
471            size += session.getCacheMapperSize();
472        }
473        return size;
474    }
475
476    @Override
477    public long getCachePristineSize() {
478        long size = 0;
479        for (SessionImpl session : sessions) {
480            size += session.getCachePristineSize();
481        }
482        return size;
483    }
484
485    @Override
486    public long getCacheSelectionSize() {
487        long size = 0;
488        for (SessionImpl session : sessions) {
489            size += session.getCacheSelectionSize();
490        }
491        return size;
492    }
493
494    @Override
495    public void processClusterInvalidationsNext() {
496        // TODO pass through or something
497    }
498
499    @Override
500    public void markReferencedBinaries() {
501        try {
502            SessionImpl conn = getConnection();
503            try {
504                conn.markReferencedBinaries();
505            } finally {
506                conn.close();
507            }
508        } catch (ResourceException e) {
509            throw new RuntimeException(e);
510        }
511    }
512
513    @Override
514    public int cleanupDeletedDocuments(int max, Calendar beforeTime) {
515        if (!repositoryDescriptor.getSoftDeleteEnabled()) {
516            return 0;
517        }
518        try {
519            SessionImpl conn = getConnection();
520            try {
521                return conn.cleanupDeletedDocuments(max, beforeTime);
522            } finally {
523                conn.close();
524            }
525        } catch (ResourceException e) {
526            throw new RuntimeException(e);
527        }
528    }
529
530    /*
531     * ----- -----
532     */
533
534    // callback by session at close time
535    protected void closeSession(SessionImpl session) {
536        sessions.remove(session);
537        sessionCount.dec();
538    }
539
540}