001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import java.io.Serializable;
023import java.sql.Types;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
028import org.nuxeo.ecm.core.api.NuxeoException;
029import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
030import org.nuxeo.ecm.core.storage.sql.Invalidations;
031import org.nuxeo.ecm.core.storage.sql.Mapper;
032import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
033import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
034
035/**
036 * Implementation of {@link ClusterInvalidator} that uses the JDBC Mapper to read/write invalidations.
037 */
038public class JDBCClusterInvalidator implements ClusterInvalidator {
039
040    private static final Log log = LogFactory.getLog(JDBCClusterInvalidator.class);
041
042    /** Cluster node id. */
043    private Serializable nodeId;
044
045    /** Cluster node mapper. Used synchronized. */
046    private Mapper mapper;
047
048    private long clusteringDelay;
049
050    // modified only under clusterMapper synchronization
051    private long clusterNodeLastInvalidationTimeMillis;
052
053    @Override
054    public void initialize(String nodeId, RepositoryImpl repository) {
055        RepositoryDescriptor repositoryDescriptor = repository.getRepositoryDescriptor();
056        clusteringDelay = repositoryDescriptor.getClusteringDelay();
057        processClusterInvalidationsNext();
058        // create mapper
059        mapper = repository.newMapper(null, false);
060        Serializable nodeIdSer;
061        if (mapper.getClusterNodeIdType() == Types.VARCHAR) { // sql type
062            nodeIdSer = nodeId;
063        } else {
064            try {
065                nodeIdSer = Long.valueOf(nodeId);
066            } catch (NumberFormatException e) {
067                throw new NuxeoException("Cluster node id must be an integer", e);
068            }
069        }
070        this.nodeId = nodeIdSer;
071        try {
072            mapper.createClusterNode(nodeIdSer);
073        } catch (ConcurrentUpdateException e) {
074            e.addInfo("Failed to initialize clustering for repository: " + repository.getName());
075            throw e;
076        }
077        log.info("Clustering enabled for repository: " + repository.getName() + " with " + clusteringDelay
078                + " ms delay " + " and cluster node id: " + nodeId);
079    }
080
081    @Override
082    public void close() {
083        synchronized (mapper) {
084            mapper.removeClusterNode(nodeId);
085            mapper.close();
086        }
087    }
088
089    // TODO should be called by RepositoryManagement
090    protected void processClusterInvalidationsNext() {
091        clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis() - clusteringDelay - 1;
092    }
093
094    @Override
095    public Invalidations receiveInvalidations() {
096        synchronized (mapper) {
097            long remaining = clusterNodeLastInvalidationTimeMillis + clusteringDelay - System.currentTimeMillis();
098            if (remaining > 0) {
099                // delay hasn't expired
100                log.trace("Not fetching invalidations, remaining time: " + remaining + "ms");
101                return null;
102            }
103            Invalidations invalidations = mapper.getClusterInvalidations(nodeId);
104            clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis();
105            return invalidations;
106        }
107    }
108
109    @Override
110    public void sendInvalidations(Invalidations invalidations) {
111        if (invalidations == null || invalidations.isEmpty()) {
112            return;
113        }
114        synchronized (mapper) {
115            mapper.insertClusterInvalidations(nodeId, invalidations);
116        }
117    }
118
119}