001/* 002 * (C) Copyright 2013-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.work; 021 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Set; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.nuxeo.ecm.core.work.api.Work; 031 032/** 033 * Memory-based {@link BlockingQueue}. 034 * <p> 035 * In addition, this implementation also keeps a set of {@link Work} ids in the queue when the queue elements are 036 * {@link WorkHolder}s. 037 */ 038public class MemoryBlockingQueue extends NuxeoBlockingQueue { 039 040 /** 041 * A {@link LinkedBlockingQueue} that blocks on {@link #offer} and prevents starvation deadlocks on reentrant calls. 042 */ 043 private static class ReentrantLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> { 044 045 private static final long serialVersionUID = 1L; 046 047 private final ReentrantLock limitedPutLock = new ReentrantLock(); 048 049 private final int limitedCapacity; 050 051 /** 052 * Creates a {@link LinkedBlockingQueue} with a maximum capacity. 053 * <p> 054 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 055 * 056 * @param capacity the capacity, or -1 for unbounded 057 */ 058 public ReentrantLinkedBlockingQueue(int capacity) { 059 // Allocate more space to prevent starvation dead lock 060 // because a worker can add a new job to the queue. 061 super(capacity < 0 ? Integer.MAX_VALUE : (2 * capacity)); 062 limitedCapacity = capacity; 063 } 064 065 /** 066 * Block until there are enough remaining capacity to put the entry. 067 */ 068 public void limitedPut(T e) throws InterruptedException { 069 limitedPutLock.lockInterruptibly(); 070 try { 071 while (remainingCapacity() < limitedCapacity) { 072 // TODO replace by wakeup when an element is removed 073 Thread.sleep(100); 074 } 075 put(e); 076 } finally { 077 limitedPutLock.unlock(); 078 } 079 } 080 081 @Override 082 public boolean offer(T e) { 083 if (limitedCapacity < 0) { 084 return super.offer(e); 085 } 086 // turn non-blocking offer into a blocking put 087 try { 088 if (Thread.currentThread().getName().startsWith(WorkManagerImpl.THREAD_PREFIX)) { 089 // use the full queue capacity for reentrant call 090 put(e); 091 } else { 092 // put only if there are enough remaining capacity 093 limitedPut(e); 094 } 095 return true; 096 } catch (InterruptedException ie) { 097 Thread.currentThread().interrupt(); 098 throw new RuntimeException("interrupted", ie); 099 } 100 } 101 } 102 103 protected final BlockingQueue<Runnable> queue; 104 105 // @GuardedBy("itself") 106 protected final Set<String> workIds; 107 108 /** 109 * Creates a {@link BlockingQueue} with a maximum capacity. 110 * <p> 111 * If the capacity is -1 then this is treated as a regular unbounded {@link LinkedBlockingQueue}. 112 * 113 * @param capacity the capacity, or -1 for unbounded 114 */ 115 public MemoryBlockingQueue(int capacity) { 116 queue = new ReentrantLinkedBlockingQueue<>(capacity); 117 workIds = new HashSet<>(); 118 } 119 120 /** 121 * Checks if the queue contains a given work id. 122 * 123 * @param workId the work id 124 * @return {@code true} if the queue contains the work id 125 */ 126 public boolean containsWorkId(String workId) { 127 synchronized (workIds) { 128 return workIds.contains(workId); 129 } 130 } 131 132 private Runnable addWorkId(Runnable r) { 133 if (r instanceof WorkHolder) { 134 WorkHolder wh = (WorkHolder) r; 135 String id = WorkHolder.getWork(wh).getId(); 136 synchronized (workIds) { 137 workIds.add(id); 138 } 139 } 140 return r; 141 } 142 143 private Runnable removeWorkId(Runnable r) { 144 if (r instanceof WorkHolder) { 145 WorkHolder wh = (WorkHolder) r; 146 String id = WorkHolder.getWork(wh).getId(); 147 synchronized (workIds) { 148 workIds.remove(id); 149 } 150 } 151 return r; 152 } 153 154 @Override 155 public int getQueueSize() { 156 return queue.size(); 157 } 158 159 @Override 160 public void putElement(Runnable r) throws InterruptedException { 161 queue.put(r); 162 addWorkId(r); 163 } 164 165 @Override 166 public Runnable pollElement() { 167 Runnable r = queue.poll(); 168 removeWorkId(r); 169 return r; 170 } 171 172 @Override 173 public Runnable take() throws InterruptedException { 174 Runnable r = queue.take(); 175 removeWorkId(r); 176 return r; 177 } 178 179 /* 180 * We can implement iterator, super doesn't have it. 181 */ 182 @Override 183 public Iterator<Runnable> iterator() { 184 return new Itr(queue.iterator()); 185 } 186 187 private class Itr implements Iterator<Runnable> { 188 private Iterator<Runnable> it; 189 190 private Runnable last; 191 192 public Itr(Iterator<Runnable> it) { 193 this.it = it; 194 } 195 196 @Override 197 public boolean hasNext() { 198 return it.hasNext(); 199 } 200 201 @Override 202 public Runnable next() { 203 return last = it.next(); 204 } 205 206 @Override 207 public void remove() { 208 it.remove(); 209 removeWorkId(last); 210 } 211 } 212 213 @Override 214 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 215 long nanos = unit.toNanos(timeout); 216 nanos = awaitActivation(nanos); 217 if (nanos <= 0) { 218 return null; 219 } 220 return queue.poll(nanos, TimeUnit.NANOSECONDS); 221 } 222 223}