001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.AbstractQueue; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.ReentrantLock; 028 029import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 030 031/** 032 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor 033 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds. 034 * 035 * @since 5.8 036 */ 037public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { 038 039 /* 040 * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: 041 * - isEmpty() 042 * - size() 043 * - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout 044 * - take() 045 * - offer(e) 046 * - remove(e) 047 * - toArray(), toArray(a): for purge and shutdown 048 * - drainTo(c) 049 * - iterator() : hasNext(), next(), remove() (called by toArray) 050 */ 051 052 protected final ReentrantLock activationLock = new ReentrantLock(); 053 054 protected final Condition activation = activationLock.newCondition(); 055 056 protected volatile boolean active = true; 057 058 protected final String queueId; 059 060 protected final WorkQueuing queuing; 061 062 protected NuxeoBlockingQueue(String queueId, WorkQueuing queuing) { 063 this.queueId = queueId; 064 this.queuing = queuing; 065 } 066 067 protected abstract WorkQueueMetrics metrics(); 068 069 /** 070 * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves 071 * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all 072 * elements are again available. 073 * 074 * @param active {@code true} to make the queue active, or {@code false} to deactivate it 075 */ 076 public WorkQueueMetrics setActive(boolean active) { 077 WorkQueueMetrics metrics = metrics(); 078 this.active = active; 079 activationLock.lock(); 080 try { 081 activation.signalAll(); 082 } finally { 083 activationLock.unlock(); 084 } 085 return metrics; 086 } 087 088 @Override 089 public boolean offer(Runnable r) { 090 try { 091 put(r); 092 } catch (InterruptedException e) { 093 Thread.currentThread().interrupt(); // restore interrupt status 094 throw new RuntimeException("interrupted", e); 095 } 096 return true; 097 } 098 099 @Override 100 public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 101 // not needed for ThreadPoolExecutor 102 put(r); 103 return true; 104 } 105 106 @Override 107 public void put(Runnable r) throws InterruptedException { 108 putElement(r); 109 } 110 111 @Override 112 public Runnable peek() { 113 // not needed for ThreadPoolExecutor 114 throw new UnsupportedOperationException("not supported"); 115 } 116 117 @Override 118 public Runnable poll() { 119 if (!active) { 120 return null; 121 } 122 return pollElement(); 123 } 124 125 protected long timeUntil(long end) { 126 long timeout = end - System.currentTimeMillis(); 127 if (timeout < 0) { 128 timeout = 0; 129 } 130 return timeout; 131 } 132 133 protected long awaitActivation(long nanos) throws InterruptedException { 134 activationLock.lock(); 135 try { 136 while (nanos > 0 && !active) { 137 nanos = activation.awaitNanos(nanos); 138 } 139 140 } finally { 141 activationLock.unlock(); 142 } 143 return nanos; 144 } 145 146 @Override 147 public boolean contains(Object o) { 148 // TODO Auto-generated method stub 149 throw new UnsupportedOperationException(); 150 } 151 152 @Override 153 public int size() { 154 return getQueueSize(); 155 } 156 157 @Override 158 public int remainingCapacity() { 159 return Integer.MAX_VALUE; 160 } 161 162 @Override 163 public Iterator<Runnable> iterator() { 164 return new Iter(); 165 } 166 167 /* 168 * Used by drainQueue/purge methods of ThreadPoolExector through toArray. 169 */ 170 private class Iter implements Iterator<Runnable> { 171 172 @Override 173 public boolean hasNext() { 174 return false; 175 } 176 177 @Override 178 public Runnable next() { 179 throw new UnsupportedOperationException(); 180 } 181 182 @Override 183 public void remove() { 184 throw new UnsupportedOperationException(); 185 } 186 } 187 188 @Override 189 public int drainTo(Collection<? super Runnable> c) { 190 return drainTo(c, Integer.MAX_VALUE); 191 } 192 193 @Override 194 public int drainTo(Collection<? super Runnable> c, int maxElements) { 195 for (int i = 0; i < maxElements; i++) { 196 Runnable r = poll(); 197 if (r == null) { 198 return i; 199 } 200 c.add(r); 201 } 202 return maxElements; 203 } 204 205 /** 206 * Gets the size of the queue. 207 */ 208 public abstract int getQueueSize(); 209 210 /** 211 * Adds an element into this queue, waiting if necessary for space to become available. 212 */ 213 public abstract void putElement(Runnable r) throws InterruptedException; 214 215 /** 216 * Retrieves and removes an element from the queue, or returns null if the queue is empty. 217 */ 218 public abstract Runnable pollElement(); 219 220}