001/*
002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012
013package org.nuxeo.ecm.core.storage.sql.jdbc;
014
015import java.io.Serializable;
016import java.sql.Types;
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.nuxeo.ecm.core.api.NuxeoException;
021import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
022import org.nuxeo.ecm.core.storage.sql.Invalidations;
023import org.nuxeo.ecm.core.storage.sql.Mapper;
024import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
025import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
026
027/**
028 * Implementation of {@link ClusterInvalidator} that uses the JDBC Mapper to read/write invalidations.
029 */
030public class JDBCClusterInvalidator implements ClusterInvalidator {
031
032    private static final Log log = LogFactory.getLog(JDBCClusterInvalidator.class);
033
034    /** Cluster node id. */
035    private Serializable nodeId;
036
037    /** Cluster node mapper. Used synchronized. */
038    private Mapper mapper;
039
040    private long clusteringDelay;
041
042    // modified only under clusterMapper synchronization
043    private long clusterNodeLastInvalidationTimeMillis;
044
045    @Override
046    public void initialize(String nodeId, RepositoryImpl repository) {
047        RepositoryDescriptor repositoryDescriptor = repository.getRepositoryDescriptor();
048        clusteringDelay = repositoryDescriptor.getClusteringDelay();
049        processClusterInvalidationsNext();
050        // create mapper
051        mapper = repository.newMapper(null, false);
052        Serializable nodeIdSer;
053        if (mapper.getClusterNodeIdType() == Types.VARCHAR) { // sql type
054            nodeIdSer = nodeId;
055        } else {
056            try {
057                nodeIdSer = Long.valueOf(nodeId);
058            } catch (NumberFormatException e) {
059                throw new NuxeoException("Cluster node id must be an integer", e);
060            }
061        }
062        this.nodeId = nodeIdSer;
063        mapper.createClusterNode(nodeIdSer);
064        log.info("Clustering enabled for repository: " + repository.getName() + " with " + clusteringDelay
065                + " ms delay " + " and cluster node id: " + nodeId);
066    }
067
068    @Override
069    public void close() {
070        synchronized (mapper) {
071            mapper.removeClusterNode(nodeId);
072            mapper.close();
073        }
074    }
075
076    // TODO should be called by RepositoryManagement
077    protected void processClusterInvalidationsNext() {
078        clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis() - clusteringDelay - 1;
079    }
080
081    @Override
082    public Invalidations receiveInvalidations() {
083        synchronized (mapper) {
084            long remaining = clusterNodeLastInvalidationTimeMillis + clusteringDelay - System.currentTimeMillis();
085            if (remaining > 0) {
086                // delay hasn't expired
087                log.trace("Not fetching invalidations, remaining time: " + remaining + "ms");
088                return null;
089            }
090            Invalidations invalidations = mapper.getClusterInvalidations(nodeId);
091            clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis();
092            return invalidations;
093        }
094    }
095
096    @Override
097    public void sendInvalidations(Invalidations invalidations) {
098        if (invalidations == null || invalidations.isEmpty()) {
099            return;
100        }
101        synchronized (mapper) {
102            mapper.insertClusterInvalidations(nodeId, invalidations);
103        }
104    }
105
106}