001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.AbstractQueue; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.ReentrantLock; 028 029import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 030 031/** 032 * An abstract {@link BlockingQueue} suitable for a fixed-sized {@link java.util.concurrent.ThreadPoolExecutor 033 * ThreadPoolExecutor}, that can be implemented in terms of a few methods. {@link #offer} always succeeds. 034 * 035 * @since 5.8 036 */ 037public abstract class NuxeoBlockingQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { 038 039 /* 040 * ThreadPoolExecutor uses a BlockingQueue but the Java 7 implementation only calls these methods on it: 041 * - isEmpty() 042 * - size() 043 * - poll(timeout, unit): not used, as core pool size = max size and no core thread timeout 044 * - take() 045 * - offer(e) 046 * - remove(e) 047 * - toArray(), toArray(a): for purge and shutdown 048 * - drainTo(c) 049 * - iterator() : hasNext(), next(), remove() (called by toArray) 050 */ 051 052 protected final ReentrantLock activationLock = new ReentrantLock(); 053 054 protected final Condition activation = activationLock.newCondition(); 055 056 protected volatile boolean active = false; 057 058 protected final String queueId; 059 060 protected final WorkQueuing queuing; 061 062 protected NuxeoBlockingQueue(String queueId, WorkQueuing queuing) { 063 this.queueId = queueId; 064 this.queuing = queuing; 065 } 066 067 protected abstract WorkQueueMetrics metrics(); 068 069 /** 070 * Sets the queue active or inactive. When deactivated, taking an element from the queue (take, poll, peek) behaves 071 * as if the queue was empty. Elements can still be added when the queue is deactivated. When reactivated, all 072 * elements are again available. 073 * 074 * @param active {@code true} to make the queue active, or {@code false} to deactivate it 075 */ 076 public WorkQueueMetrics setActive(boolean active) { 077 this.active = active; 078 activationLock.lock(); 079 try { 080 activation.signalAll(); 081 } finally { 082 activationLock.unlock(); 083 } 084 return metrics(); 085 } 086 087 @Override 088 public boolean offer(Runnable r) { 089 try { 090 put(r); 091 } catch (InterruptedException e) { 092 Thread.currentThread().interrupt(); // restore interrupt status 093 throw new RuntimeException("interrupted", e); 094 } 095 return true; 096 } 097 098 @Override 099 public boolean offer(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 100 // not needed for ThreadPoolExecutor 101 put(r); 102 return true; 103 } 104 105 @Override 106 public void put(Runnable r) throws InterruptedException { 107 putElement(r); 108 } 109 110 @Override 111 public Runnable peek() { 112 // not needed for ThreadPoolExecutor 113 throw new UnsupportedOperationException("not supported"); 114 } 115 116 @Override 117 public Runnable poll() { 118 if (!active) { 119 return null; 120 } 121 Runnable runnable = pollElement(); 122 if (runnable == null) { 123 return null; 124 } 125 if (!active) { 126 queuing.workReschedule(queueId, WorkHolder.getWork(runnable)); 127 return null; 128 } 129 return runnable; 130 } 131 132 protected long timeUntil(long end) { 133 long timeout = end - System.currentTimeMillis(); 134 if (timeout < 0) { 135 timeout = 0; 136 } 137 return timeout; 138 } 139 140 protected long awaitActivation(long nanos) throws InterruptedException { 141 activationLock.lock(); 142 try { 143 while (nanos > 0 && !active) { 144 nanos = activation.awaitNanos(nanos); 145 } 146 147 } finally { 148 activationLock.unlock(); 149 } 150 return nanos; 151 } 152 153 @Override 154 public boolean contains(Object o) { 155 // TODO Auto-generated method stub 156 throw new UnsupportedOperationException(); 157 } 158 159 @Override 160 public int size() { 161 return getQueueSize(); 162 } 163 164 @Override 165 public int remainingCapacity() { 166 return Integer.MAX_VALUE; 167 } 168 169 @Override 170 public Iterator<Runnable> iterator() { 171 return new Iter(); 172 } 173 174 /* 175 * Used by drainQueue/purge methods of ThreadPoolExector through toArray. 176 */ 177 private class Iter implements Iterator<Runnable> { 178 179 @Override 180 public boolean hasNext() { 181 return false; 182 } 183 184 @Override 185 public Runnable next() { 186 throw new UnsupportedOperationException(); 187 } 188 189 @Override 190 public void remove() { 191 throw new UnsupportedOperationException(); 192 } 193 } 194 195 @Override 196 public int drainTo(Collection<? super Runnable> c) { 197 return drainTo(c, Integer.MAX_VALUE); 198 } 199 200 @Override 201 public int drainTo(Collection<? super Runnable> c, int maxElements) { 202 for (int i = 0; i < maxElements; i++) { 203 Runnable r = poll(); 204 if (r == null) { 205 return i; 206 } 207 c.add(r); 208 } 209 return maxElements; 210 } 211 212 /** 213 * Gets the size of the queue. 214 */ 215 public abstract int getQueueSize(); 216 217 /** 218 * Adds an element into this queue, waiting if necessary for space to become available. 219 */ 220 public abstract void putElement(Runnable r) throws InterruptedException; 221 222 /** 223 * Retrieves and removes an element from the queue, or returns null if the queue is empty. 224 */ 225 public abstract Runnable pollElement(); 226 227}