001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.AbstractQueue; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.NoSuchElementException; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 031 032/** 033 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor 034 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds. 035 * 036 * @since 5.8 037 */ 038public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { 039 040 /* 041 * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: 042 * - isEmpty() 043 * - size() 044 * - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout 045 * - take() 046 * - offer(e) 047 * - remove(e) 048 * - toArray(), toArray(a): for purge and shutdown 049 * - drainTo(c) 050 * - iterator() : hasNext(), next(), remove() (called by toArray) 051 */ 052 053 protected final ReentrantLock activationLock = new ReentrantLock(); 054 055 protected final Condition activation = activationLock.newCondition(); 056 057 protected volatile boolean active = false; 058 059 protected final String queueId; 060 061 protected final WorkQueuing queuing; 062 063 protected NuxeoBlockingQueue(String queueId, WorkQueuing queuing) { 064 this.queueId = queueId; 065 this.queuing = queuing; 066 } 067 068 protected abstract WorkQueueMetrics metrics(); 069 070 /** 071 * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves 072 * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all 073 * elements are again available. 074 * 075 * @param active {@code true} to make the queue active, or {@code false} to deactivate it 076 */ 077 public WorkQueueMetrics setActive(boolean active) { 078 this.active = active; 079 activationLock.lock(); 080 try { 081 activation.signalAll(); 082 } finally { 083 activationLock.unlock(); 084 } 085 return metrics(); 086 } 087 088 @Override 089 public boolean offer(Runnable r) { 090 try { 091 put(r); 092 } catch (InterruptedException e) { 093 Thread.currentThread().interrupt(); 094 throw new RuntimeException(e); 095 } 096 return true; 097 } 098 099 @Override 100 public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 101 // not needed for ThreadPoolExecutor 102 put(r); 103 return true; 104 } 105 106 @Override 107 public void put(Runnable r) throws InterruptedException { 108 putElement(r); 109 } 110 111 @Override 112 public Runnable peek() { 113 // not needed for ThreadPoolExecutor 114 throw new UnsupportedOperationException("not supported"); 115 } 116 117 @Override 118 public Runnable poll() { 119 if (!active) { 120 return null; 121 } 122 Runnable runnable = pollElement(); 123 if (runnable == null) { 124 return null; 125 } 126 if (!active) { 127 queuing.workReschedule(queueId, WorkHolder.getWork(runnable)); 128 return null; 129 } 130 return runnable; 131 } 132 133 protected long timeUntil(long end) { 134 long timeout = end - System.currentTimeMillis(); 135 if (timeout < 0) { 136 timeout = 0; 137 } 138 return timeout; 139 } 140 141 protected long awaitActivation(long nanos) throws InterruptedException { 142 activationLock.lock(); 143 try { 144 while (nanos > 0 && !active) { 145 nanos = activation.awaitNanos(nanos); 146 } 147 148 } finally { 149 activationLock.unlock(); 150 } 151 return nanos; 152 } 153 154 @Override 155 public boolean contains(Object o) { 156 // TODO Auto-generated method stub 157 throw new UnsupportedOperationException(); 158 } 159 160 @Override 161 public int size() { 162 return getQueueSize(); 163 } 164 165 @Override 166 public int remainingCapacity() { 167 return Integer.MAX_VALUE; 168 } 169 170 @Override 171 public Iterator<Runnable> iterator() { 172 return new Iter(); 173 } 174 175 /* 176 * Used by drainQueue/purge methods of ThreadPoolExector through toArray. 177 */ 178 private class Iter implements Iterator<Runnable> { 179 180 @Override 181 public boolean hasNext() { 182 return false; 183 } 184 185 @Override 186 public Runnable next() { 187 throw new NoSuchElementException(); 188 } 189 190 @Override 191 public void remove() { 192 throw new IllegalStateException(); 193 } 194 } 195 196 @Override 197 public int drainTo(Collection<? super Runnable> c) { 198 return drainTo(c, Integer.MAX_VALUE); 199 } 200 201 @Override 202 public int drainTo(Collection<? super Runnable> c, int maxElements) { 203 for (int i = 0; i < maxElements; i++) { 204 Runnable r = poll(); 205 if (r == null) { 206 return i; 207 } 208 c.add(r); 209 } 210 return maxElements; 211 } 212 213 /** 214 * Gets the size of the queue. 215 */ 216 public abstract int getQueueSize(); 217 218 /** 219 * Adds an element into this queue, waiting if necessary for space to become available. 220 */ 221 public abstract void putElement(Runnable r) throws InterruptedException; 222 223 /** 224 * Retrieves and removes an element from the queue, or returns null if the queue is empty. 225 */ 226 public abstract Runnable pollElement(); 227 228}