001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import java.io.Serializable; 023import java.sql.Types; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 028import org.nuxeo.ecm.core.api.NuxeoException; 029import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 030import org.nuxeo.ecm.core.storage.sql.Invalidations; 031import org.nuxeo.ecm.core.storage.sql.Mapper; 032import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 033import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 034 035/** 036 * Implementation of {@link ClusterInvalidator} that uses the JDBC Mapper to read/write invalidations. 037 */ 038public class JDBCClusterInvalidator implements ClusterInvalidator { 039 040 private static final Log log = LogFactory.getLog(JDBCClusterInvalidator.class); 041 042 /** Cluster node id. */ 043 private Serializable nodeId; 044 045 /** Cluster node mapper. Used synchronized. */ 046 private Mapper mapper; 047 048 private long clusteringDelay; 049 050 // modified only under clusterMapper synchronization 051 private long clusterNodeLastInvalidationTimeMillis; 052 053 @Override 054 public void initialize(String nodeId, RepositoryImpl repository) { 055 RepositoryDescriptor repositoryDescriptor = repository.getRepositoryDescriptor(); 056 clusteringDelay = repositoryDescriptor.getClusteringDelay(); 057 processClusterInvalidationsNext(); 058 // create mapper 059 mapper = repository.newMapper(null, false); 060 Serializable nodeIdSer; 061 if (mapper.getClusterNodeIdType() == Types.VARCHAR) { // sql type 062 nodeIdSer = nodeId; 063 } else { 064 try { 065 nodeIdSer = Long.valueOf(nodeId); 066 } catch (NumberFormatException e) { 067 throw new NuxeoException("Cluster node id must be an integer", e); 068 } 069 } 070 this.nodeId = nodeIdSer; 071 try { 072 mapper.createClusterNode(nodeIdSer); 073 } catch (ConcurrentUpdateException e) { 074 e.addInfo("Failed to initialize clustering for repository: " + repository.getName()); 075 throw e; 076 } 077 log.info("Clustering enabled for repository: " + repository.getName() + " with " + clusteringDelay 078 + " ms delay " + " and cluster node id: " + nodeId); 079 } 080 081 @Override 082 public void close() { 083 synchronized (mapper) { 084 mapper.removeClusterNode(nodeId); 085 mapper.close(); 086 } 087 } 088 089 // TODO should be called by RepositoryManagement 090 protected void processClusterInvalidationsNext() { 091 clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis() - clusteringDelay - 1; 092 } 093 094 @Override 095 public Invalidations receiveInvalidations() { 096 synchronized (mapper) { 097 long remaining = clusterNodeLastInvalidationTimeMillis + clusteringDelay - System.currentTimeMillis(); 098 if (remaining > 0) { 099 // delay hasn't expired 100 log.trace("Not fetching invalidations, remaining time: " + remaining + "ms"); 101 return null; 102 } 103 Invalidations invalidations = mapper.getClusterInvalidations(nodeId); 104 clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis(); 105 return invalidations; 106 } 107 } 108 109 @Override 110 public void sendInvalidations(Invalidations invalidations) { 111 if (invalidations == null || invalidations.isEmpty()) { 112 return; 113 } 114 synchronized (mapper) { 115 mapper.insertClusterInvalidations(nodeId, invalidations); 116 } 117 } 118 119}