001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.work; 018 019import java.util.AbstractQueue; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.locks.Condition; 025import java.util.concurrent.locks.ReentrantLock; 026 027/** 028 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor 029 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds. 030 * 031 * @since 5.8 032 */ 033public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { 034 035 /* 036 * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: - isEmpty() 037 * - size() - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout - take() - 038 * offer(e) - remove(e) - toArray(), toArray(a): for purge and shutdown - drainTo(c) - iterator() : hasNext(), 039 * next(), remove() (called by toArray) 040 */ 041 042 protected final ReentrantLock activationLock = new ReentrantLock(); 043 044 protected final Condition activation = activationLock.newCondition(); 045 046 protected volatile boolean active = true; 047 048 /** 049 * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves 050 * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all 051 * elements are again available. 052 * 053 * @param active {@code true} to make the queue active, or {@code false} to deactivate it 054 */ 055 public void setActive(boolean active) { 056 this.active = active; 057 activationLock.lock(); 058 try { 059 activation.signalAll(); 060 } finally { 061 activationLock.unlock(); 062 } 063 } 064 065 @Override 066 public boolean offer(Runnable r) { 067 try { 068 put(r); 069 } catch (InterruptedException e) { 070 Thread.currentThread().interrupt(); // restore interrupt status 071 throw new RuntimeException("interrupted", e); 072 } 073 return true; 074 } 075 076 @Override 077 public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 078 // not needed for ThreadPoolExecutor 079 put(r); 080 return true; 081 } 082 083 @Override 084 public void put(Runnable r) throws InterruptedException { 085 putElement(r); 086 } 087 088 @Override 089 public Runnable peek() { 090 // not needed for ThreadPoolExecutor 091 throw new UnsupportedOperationException("not supported"); 092 } 093 094 @Override 095 public Runnable poll() { 096 if (!active) { 097 return null; 098 } 099 return pollElement(); 100 } 101 102 protected long timeUntil(long end) { 103 long timeout = end - System.currentTimeMillis(); 104 if (timeout < 0) { 105 timeout = 0; 106 } 107 return timeout; 108 } 109 110 protected long awaitActivation(long nanos) throws InterruptedException { 111 activationLock.lock(); 112 try { 113 while (nanos > 0 && !active) { 114 nanos = activation.awaitNanos(nanos); 115 } 116 117 } finally { 118 activationLock.unlock(); 119 } 120 return nanos; 121 } 122 123 @Override 124 public boolean contains(Object o) { 125 // TODO Auto-generated method stub 126 throw new UnsupportedOperationException(); 127 } 128 129 @Override 130 public int size() { 131 return getQueueSize(); 132 } 133 134 @Override 135 public int remainingCapacity() { 136 return Integer.MAX_VALUE; 137 } 138 139 @Override 140 public Iterator<Runnable> iterator() { 141 return new Iter(); 142 } 143 144 /* 145 * Used by drainQueue/purge methods of ThreadPoolExector through toArray. 146 */ 147 private class Iter implements Iterator<Runnable> { 148 149 public Iter() { 150 throw new UnsupportedOperationException(); 151 } 152 153 @Override 154 public boolean hasNext() { 155 // TODO Auto-generated method stub 156 throw new UnsupportedOperationException(); 157 } 158 159 @Override 160 public Runnable next() { 161 // TODO Auto-generated method stub 162 throw new UnsupportedOperationException(); 163 } 164 165 @Override 166 public void remove() { 167 // TODO Auto-generated method stub 168 throw new UnsupportedOperationException(); 169 } 170 } 171 172 @Override 173 public int drainTo(Collection<? super Runnable> c) { 174 return drainTo(c, Integer.MAX_VALUE); 175 } 176 177 @Override 178 public int drainTo(Collection<? super Runnable> c, int maxElements) { 179 for (int i = 0; i < maxElements; i++) { 180 Runnable r = poll(); 181 if (r == null) { 182 return i; 183 } 184 c.add(r); 185 } 186 return maxElements; 187 } 188 189 /** 190 * Gets the size of the queue. 191 */ 192 public abstract int getQueueSize(); 193 194 /** 195 * Adds an element into this queue, waiting if necessary for space to become available. 196 */ 197 public abstract void putElement(Runnable r) throws InterruptedException; 198 199 /** 200 * Retrieves and removes an element from the queue, or returns null if the queue is empty. 201 */ 202 public abstract Runnable pollElement(); 203 204}