001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.AbstractQueue; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.ReentrantLock; 028 029/** 030 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor 031 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds. 032 * 033 * @since 5.8 034 */ 035public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { 036 037 /* 038 * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: - isEmpty() 039 * - size() - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout - take() - 040 * offer(e) - remove(e) - toArray(), toArray(a): for purge and shutdown - drainTo(c) - iterator() : hasNext(), 041 * next(), remove() (called by toArray) 042 */ 043 044 protected final ReentrantLock activationLock = new ReentrantLock(); 045 046 protected final Condition activation = activationLock.newCondition(); 047 048 protected volatile boolean active = true; 049 050 /** 051 * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves 052 * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all 053 * elements are again available. 054 * 055 * @param active {@code true} to make the queue active, or {@code false} to deactivate it 056 */ 057 public void setActive(boolean active) { 058 this.active = active; 059 activationLock.lock(); 060 try { 061 activation.signalAll(); 062 } finally { 063 activationLock.unlock(); 064 } 065 } 066 067 @Override 068 public boolean offer(Runnable r) { 069 try { 070 put(r); 071 } catch (InterruptedException e) { 072 Thread.currentThread().interrupt(); // restore interrupt status 073 throw new RuntimeException("interrupted", e); 074 } 075 return true; 076 } 077 078 @Override 079 public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 080 // not needed for ThreadPoolExecutor 081 put(r); 082 return true; 083 } 084 085 @Override 086 public void put(Runnable r) throws InterruptedException { 087 putElement(r); 088 } 089 090 @Override 091 public Runnable peek() { 092 // not needed for ThreadPoolExecutor 093 throw new UnsupportedOperationException("not supported"); 094 } 095 096 @Override 097 public Runnable poll() { 098 if (!active) { 099 return null; 100 } 101 return pollElement(); 102 } 103 104 protected long timeUntil(long end) { 105 long timeout = end - System.currentTimeMillis(); 106 if (timeout < 0) { 107 timeout = 0; 108 } 109 return timeout; 110 } 111 112 protected long awaitActivation(long nanos) throws InterruptedException { 113 activationLock.lock(); 114 try { 115 while (nanos > 0 && !active) { 116 nanos = activation.awaitNanos(nanos); 117 } 118 119 } finally { 120 activationLock.unlock(); 121 } 122 return nanos; 123 } 124 125 @Override 126 public boolean contains(Object o) { 127 // TODO Auto-generated method stub 128 throw new UnsupportedOperationException(); 129 } 130 131 @Override 132 public int size() { 133 return getQueueSize(); 134 } 135 136 @Override 137 public int remainingCapacity() { 138 return Integer.MAX_VALUE; 139 } 140 141 @Override 142 public Iterator<Runnable> iterator() { 143 return new Iter(); 144 } 145 146 /* 147 * Used by drainQueue/purge methods of ThreadPoolExector through toArray. 148 */ 149 private class Iter implements Iterator<Runnable> { 150 151 public Iter() { 152 throw new UnsupportedOperationException(); 153 } 154 155 @Override 156 public boolean hasNext() { 157 // TODO Auto-generated method stub 158 throw new UnsupportedOperationException(); 159 } 160 161 @Override 162 public Runnable next() { 163 // TODO Auto-generated method stub 164 throw new UnsupportedOperationException(); 165 } 166 167 @Override 168 public void remove() { 169 // TODO Auto-generated method stub 170 throw new UnsupportedOperationException(); 171 } 172 } 173 174 @Override 175 public int drainTo(Collection<? super Runnable> c) { 176 return drainTo(c, Integer.MAX_VALUE); 177 } 178 179 @Override 180 public int drainTo(Collection<? super Runnable> c, int maxElements) { 181 for (int i = 0; i < maxElements; i++) { 182 Runnable r = poll(); 183 if (r == null) { 184 return i; 185 } 186 c.add(r); 187 } 188 return maxElements; 189 } 190 191 /** 192 * Gets the size of the queue. 193 */ 194 public abstract int getQueueSize(); 195 196 /** 197 * Adds an element into this queue, waiting if necessary for space to become available. 198 */ 199 public abstract void putElement(Runnable r) throws InterruptedException; 200 201 /** 202 * Retrieves and removes an element from the queue, or returns null if the queue is empty. 203 */ 204 public abstract Runnable pollElement(); 205 206}