001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import java.io.Serializable; 023import java.sql.Types; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.nuxeo.ecm.core.api.NuxeoException; 028import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 029import org.nuxeo.ecm.core.storage.sql.Invalidations; 030import org.nuxeo.ecm.core.storage.sql.Mapper; 031import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 032import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 033 034/** 035 * Implementation of {@link ClusterInvalidator} that uses the JDBC Mapper to read/write invalidations. 036 */ 037public class JDBCClusterInvalidator implements ClusterInvalidator { 038 039 private static final Log log = LogFactory.getLog(JDBCClusterInvalidator.class); 040 041 /** Cluster node id. */ 042 private Serializable nodeId; 043 044 /** Cluster node mapper. Used synchronized. */ 045 private Mapper mapper; 046 047 private long clusteringDelay; 048 049 // modified only under clusterMapper synchronization 050 private long clusterNodeLastInvalidationTimeMillis; 051 052 @Override 053 public void initialize(String nodeId, RepositoryImpl repository) { 054 RepositoryDescriptor repositoryDescriptor = repository.getRepositoryDescriptor(); 055 clusteringDelay = repositoryDescriptor.getClusteringDelay(); 056 processClusterInvalidationsNext(); 057 // create mapper 058 mapper = repository.newMapper(null, false); 059 Serializable nodeIdSer; 060 if (mapper.getClusterNodeIdType() == Types.VARCHAR) { // sql type 061 nodeIdSer = nodeId; 062 } else { 063 try { 064 nodeIdSer = Long.valueOf(nodeId); 065 } catch (NumberFormatException e) { 066 throw new NuxeoException("Cluster node id must be an integer", e); 067 } 068 } 069 this.nodeId = nodeIdSer; 070 mapper.createClusterNode(nodeIdSer); 071 log.info("Clustering enabled for repository: " + repository.getName() + " with " + clusteringDelay 072 + " ms delay " + " and cluster node id: " + nodeId); 073 } 074 075 @Override 076 public void close() { 077 synchronized (mapper) { 078 mapper.removeClusterNode(nodeId); 079 mapper.close(); 080 } 081 } 082 083 // TODO should be called by RepositoryManagement 084 protected void processClusterInvalidationsNext() { 085 clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis() - clusteringDelay - 1; 086 } 087 088 @Override 089 public Invalidations receiveInvalidations() { 090 synchronized (mapper) { 091 long remaining = clusterNodeLastInvalidationTimeMillis + clusteringDelay - System.currentTimeMillis(); 092 if (remaining > 0) { 093 // delay hasn't expired 094 log.trace("Not fetching invalidations, remaining time: " + remaining + "ms"); 095 return null; 096 } 097 Invalidations invalidations = mapper.getClusterInvalidations(nodeId); 098 clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis(); 099 return invalidations; 100 } 101 } 102 103 @Override 104 public void sendInvalidations(Invalidations invalidations) { 105 if (invalidations == null || invalidations.isEmpty()) { 106 return; 107 } 108 synchronized (mapper) { 109 mapper.insertClusterInvalidations(nodeId, invalidations); 110 } 111 } 112 113}