001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.AbstractQueue; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.ReentrantLock; 028 029/** 030 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor 031 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds. 032 * 033 * @since 5.8 034 */ 035public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { 036 037 /* 038 * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: - isEmpty() 039 * - size() - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout - take() - 040 * offer(e) - remove(e) - toArray(), toArray(a): for purge and shutdown - drainTo(c) - iterator() : hasNext(), 041 * next(), remove() (called by toArray) 042 */ 043 044 protected final ReentrantLock activationLock = new ReentrantLock(); 045 046 protected final Condition activation = activationLock.newCondition(); 047 048 protected volatile boolean active = true; 049 050 /** 051 * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves 052 * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all 053 * elements are again available. 054 * 055 * @param active {@code true} to make the queue active, or {@code false} to deactivate it 056 */ 057 public void setActive(boolean active) { 058 this.active = active; 059 activationLock.lock(); 060 try { 061 activation.signalAll(); 062 } finally { 063 activationLock.unlock(); 064 } 065 } 066 067 @Override 068 public boolean offer(Runnable r) { 069 try { 070 put(r); 071 } catch (InterruptedException e) { 072 Thread.currentThread().interrupt(); // restore interrupt status 073 throw new RuntimeException("interrupted", e); 074 } 075 return true; 076 } 077 078 @Override 079 public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 080 // not needed for ThreadPoolExecutor 081 put(r); 082 return true; 083 } 084 085 @Override 086 public void put(Runnable r) throws InterruptedException { 087 putElement(r); 088 } 089 090 @Override 091 public Runnable peek() { 092 // not needed for ThreadPoolExecutor 093 throw new UnsupportedOperationException("not supported"); 094 } 095 096 @Override 097 public Runnable poll() { 098 if (!active) { 099 return null; 100 } 101 return pollElement(); 102 } 103 104 protected long timeUntil(long end) { 105 long timeout = end - System.currentTimeMillis(); 106 if (timeout < 0) { 107 timeout = 0; 108 } 109 return timeout; 110 } 111 112 protected long awaitActivation(long nanos) throws InterruptedException { 113 activationLock.lock(); 114 try { 115 while (nanos > 0 && !active) { 116 nanos = activation.awaitNanos(nanos); 117 } 118 119 } finally { 120 activationLock.unlock(); 121 } 122 return nanos; 123 } 124 125 @Override 126 public boolean contains(Object o) { 127 // TODO Auto-generated method stub 128 throw new UnsupportedOperationException(); 129 } 130 131 @Override 132 public int size() { 133 return getQueueSize(); 134 } 135 136 @Override 137 public int remainingCapacity() { 138 return Integer.MAX_VALUE; 139 } 140 141 @Override 142 public Iterator<Runnable> iterator() { 143 return new Iter(); 144 } 145 146 /* 147 * Used by drainQueue/purge methods of ThreadPoolExector through toArray. 148 */ 149 private class Iter implements Iterator<Runnable> { 150 151 @Override 152 public boolean hasNext() { 153 return false; 154 } 155 156 @Override 157 public Runnable next() { 158 throw new UnsupportedOperationException(); 159 } 160 161 @Override 162 public void remove() { 163 throw new UnsupportedOperationException(); 164 } 165 } 166 167 @Override 168 public int drainTo(Collection<? super Runnable> c) { 169 return drainTo(c, Integer.MAX_VALUE); 170 } 171 172 @Override 173 public int drainTo(Collection<? super Runnable> c, int maxElements) { 174 for (int i = 0; i < maxElements; i++) { 175 Runnable r = poll(); 176 if (r == null) { 177 return i; 178 } 179 c.add(r); 180 } 181 return maxElements; 182 } 183 184 /** 185 * Gets the size of the queue. 186 */ 187 public abstract int getQueueSize(); 188 189 /** 190 * Adds an element into this queue, waiting if necessary for space to become available. 191 */ 192 public abstract void putElement(Runnable r) throws InterruptedException; 193 194 /** 195 * Retrieves and removes an element from the queue, or returns null if the queue is empty. 196 */ 197 public abstract Runnable pollElement(); 198 199}