001/* 002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012 013package org.nuxeo.ecm.core.storage.sql.jdbc; 014 015import java.io.Serializable; 016import java.sql.Types; 017 018import org.apache.commons.logging.Log; 019import org.apache.commons.logging.LogFactory; 020import org.nuxeo.ecm.core.api.NuxeoException; 021import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 022import org.nuxeo.ecm.core.storage.sql.Invalidations; 023import org.nuxeo.ecm.core.storage.sql.Mapper; 024import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 025import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 026 027/** 028 * Implementation of {@link ClusterInvalidator} that uses the JDBC Mapper to read/write invalidations. 029 */ 030public class JDBCClusterInvalidator implements ClusterInvalidator { 031 032 private static final Log log = LogFactory.getLog(JDBCClusterInvalidator.class); 033 034 /** Cluster node id. */ 035 private Serializable nodeId; 036 037 /** Cluster node mapper. Used synchronized. */ 038 private Mapper mapper; 039 040 private long clusteringDelay; 041 042 // modified only under clusterMapper synchronization 043 private long clusterNodeLastInvalidationTimeMillis; 044 045 @Override 046 public void initialize(String nodeId, RepositoryImpl repository) { 047 RepositoryDescriptor repositoryDescriptor = repository.getRepositoryDescriptor(); 048 clusteringDelay = repositoryDescriptor.getClusteringDelay(); 049 processClusterInvalidationsNext(); 050 // create mapper 051 mapper = repository.newMapper(null, false); 052 Serializable nodeIdSer; 053 if (mapper.getClusterNodeIdType() == Types.VARCHAR) { // sql type 054 nodeIdSer = nodeId; 055 } else { 056 try { 057 nodeIdSer = Long.valueOf(nodeId); 058 } catch (NumberFormatException e) { 059 throw new NuxeoException("Cluster node id must be an integer", e); 060 } 061 } 062 this.nodeId = nodeIdSer; 063 mapper.createClusterNode(nodeIdSer); 064 log.info("Clustering enabled for repository: " + repository.getName() + " with " + clusteringDelay 065 + " ms delay " + " and cluster node id: " + nodeId); 066 } 067 068 @Override 069 public void close() { 070 synchronized (mapper) { 071 mapper.removeClusterNode(nodeId); 072 mapper.close(); 073 } 074 } 075 076 // TODO should be called by RepositoryManagement 077 protected void processClusterInvalidationsNext() { 078 clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis() - clusteringDelay - 1; 079 } 080 081 @Override 082 public Invalidations receiveInvalidations() { 083 synchronized (mapper) { 084 long remaining = clusterNodeLastInvalidationTimeMillis + clusteringDelay - System.currentTimeMillis(); 085 if (remaining > 0) { 086 // delay hasn't expired 087 log.trace("Not fetching invalidations, remaining time: " + remaining + "ms"); 088 return null; 089 } 090 Invalidations invalidations = mapper.getClusterInvalidations(nodeId); 091 clusterNodeLastInvalidationTimeMillis = System.currentTimeMillis(); 092 return invalidations; 093 } 094 } 095 096 @Override 097 public void sendInvalidations(Invalidations invalidations) { 098 if (invalidations == null || invalidations.isEmpty()) { 099 return; 100 } 101 synchronized (mapper) { 102 mapper.insertClusterInvalidations(nodeId, invalidations); 103 } 104 } 105 106}