001/* 002 * (C) Copyright 2006-2010 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bstefanescu 018 */ 019package org.nuxeo.ecm.platform.uidgen; 020 021import java.util.concurrent.ExecutionException; 022import java.util.concurrent.Future; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026 027import javax.persistence.EntityManager; 028import javax.persistence.NoResultException; 029 030import org.nuxeo.ecm.core.api.NuxeoException; 031import org.nuxeo.ecm.core.persistence.PersistenceProvider; 032import org.nuxeo.ecm.core.persistence.PersistenceProvider.RunCallback; 033import org.nuxeo.ecm.core.persistence.PersistenceProviderFactory; 034import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer; 035import org.nuxeo.runtime.api.Framework; 036import org.nuxeo.runtime.transaction.TransactionHelper; 037 038/** 039 * This implementation uses a static persistence provider to be able to instantiate this class without passing by 040 * Framework.getService -> this is to avoid potential problems do to sequencer factories. Anyway sequencer factories 041 * should be removed (I don't think they are really needed). 042 * 043 * @author <a href="mailto:bs@nuxeo.com">Bogdan Stefanescu</a> 044 */ 045public class JPAUIDSequencerImpl extends AbstractUIDSequencer { 046 047 public static final int POOL_SIZE = 1; 048 public static final int MAX_POOL_SIZE = 2; 049 public static final long KEEP_ALIVE_TIME = 10L; 050 public static final int QUEUE_SIZE = 1000; 051 052 private static volatile PersistenceProvider persistenceProvider; 053 054 protected ThreadPoolExecutor tpe; 055 056 public JPAUIDSequencerImpl() { 057 } 058 059 @Override 060 public void init() { 061 if (tpe != null && !tpe.isShutdown()) { 062 tpe.shutdownNow(); 063 } 064 tpe = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, 065 new LinkedBlockingQueue<>(QUEUE_SIZE)); 066 } 067 068 /** 069 * Must be called when the service is no longer needed 070 */ 071 @Override 072 public void dispose() { 073 deactivatePersistenceProvider(); 074 tpe.shutdownNow(); 075 } 076 077 protected PersistenceProvider getOrCreatePersistenceProvider() { 078 if (persistenceProvider == null) { 079 synchronized (JPAUIDSequencerImpl.class) { 080 if (persistenceProvider == null) { 081 activatePersistenceProvider(); 082 } 083 } 084 } 085 return persistenceProvider; 086 } 087 088 protected static void activatePersistenceProvider() { 089 Thread thread = Thread.currentThread(); 090 ClassLoader last = thread.getContextClassLoader(); 091 try { 092 thread.setContextClassLoader(PersistenceProvider.class.getClassLoader()); 093 PersistenceProviderFactory persistenceProviderFactory = Framework.getService(PersistenceProviderFactory.class); 094 persistenceProvider = persistenceProviderFactory.newProvider("NXUIDSequencer"); 095 persistenceProvider.openPersistenceUnit(); 096 } finally { 097 thread.setContextClassLoader(last); 098 } 099 } 100 101 private static void deactivatePersistenceProvider() { 102 if (persistenceProvider != null) { 103 synchronized (JPAUIDSequencerImpl.class) { 104 if (persistenceProvider != null) { 105 persistenceProvider.closePersistenceUnit(); 106 persistenceProvider = null; 107 } 108 } 109 } 110 } 111 112 protected class SeqRunner implements Runnable { 113 114 protected final String key; 115 116 protected int result; 117 118 protected boolean completed = false; 119 120 public SeqRunner(final String key) { 121 this.key = key; 122 } 123 124 @Override 125 public void run() { 126 TransactionHelper.startTransaction(); 127 try { 128 result = doGetNext(key); 129 completed = true; 130 } finally { 131 TransactionHelper.commitOrRollbackTransaction(); 132 } 133 134 } 135 136 public int getResult() { 137 return result; 138 } 139 140 public boolean isCompleted() { 141 return completed; 142 } 143 144 } 145 146 @Override 147 public void initSequence(String key, long id) { 148 while (getNextLong(key) < id) { 149 continue; 150 } 151 } 152 153 @Override 154 public long getNextLong(final String key) { 155 156 SeqRunner runner = new SeqRunner(key); 157 158 Future<?> future = tpe.submit(runner); 159 160 try { 161 future.get(); 162 } catch (InterruptedException e) { 163 Thread.currentThread().interrupt(); 164 throw new NuxeoException(e); 165 } catch (ExecutionException e) { 166 throw new NuxeoException(e); 167 } 168 169 return runner.getResult(); 170 171 } 172 173 @SuppressWarnings("boxing") 174 protected int doGetNext(final String key) { 175 return getOrCreatePersistenceProvider().run(true, new RunCallback<Integer>() { 176 @Override 177 public Integer runWith(EntityManager em) { 178 return getNext(em, key); 179 } 180 }); 181 } 182 183 protected int getNext(EntityManager em, String key) { 184 UIDSequenceBean seq; 185 try { 186 seq = (UIDSequenceBean) em.createNamedQuery("UIDSequence.findByKey").setParameter("key", key).getSingleResult(); 187 // createQuery("FROM UIDSequenceBean seq WHERE seq.key = :key") 188 } catch (NoResultException e) { 189 seq = new UIDSequenceBean(key); 190 em.persist(seq); 191 } 192 return seq.nextIndex(); 193 } 194 195}