1. /*
  2. * @(#)DelayQueue.java 1.7 04/06/11
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. import java.util.*;
  10. /**
  11. * An unbounded {@linkplain BlockingQueue blocking queue} of
  12. * <tt>Delayed</tt> elements, in which an element can only be taken
  13. * when its delay has expired. The <em>head</em> of the queue is that
  14. * <tt>Delayed</tt> element whose delay expired furthest in the
  15. * past. If no delay has expired there is no head and <tt>poll</tt>
  16. * will return <tt>null</tt>. Expiration occurs when an element's
  17. * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
  18. * than or equal to zero. This queue does not permit <tt>null</tt>
  19. * elements.
  20. *
  21. * <p>This class and its iterator implement all of the
  22. * <em>optional</em> methods of the {@link Collection} and {@link
  23. * Iterator} interfaces.
  24. *
  25. * <p>This class is a member of the
  26. * <a href="{@docRoot}/../guide/collections/index.html">
  27. * Java Collections Framework</a>.
  28. *
  29. * @since 1.5
  30. * @author Doug Lea
  31. * @param <E> the type of elements held in this collection
  32. */
  33. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  34. implements BlockingQueue<E> {
  35. private transient final ReentrantLock lock = new ReentrantLock();
  36. private transient final Condition available = lock.newCondition();
  37. private final PriorityQueue<E> q = new PriorityQueue<E>();
  38. /**
  39. * Creates a new <tt>DelayQueue</tt> that is initially empty.
  40. */
  41. public DelayQueue() {}
  42. /**
  43. * Creates a <tt>DelayQueue</tt> initially containing the elements of the
  44. * given collection of {@link Delayed} instances.
  45. *
  46. * @param c the collection
  47. * @throws NullPointerException if <tt>c</tt> or any element within it
  48. * is <tt>null</tt>
  49. *
  50. */
  51. public DelayQueue(Collection<? extends E> c) {
  52. this.addAll(c);
  53. }
  54. /**
  55. * Inserts the specified element into this delay queue.
  56. *
  57. * @param o the element to add
  58. * @return <tt>true</tt>
  59. * @throws NullPointerException if the specified element is <tt>null</tt>.
  60. */
  61. public boolean offer(E o) {
  62. final ReentrantLock lock = this.lock;
  63. lock.lock();
  64. try {
  65. E first = q.peek();
  66. q.offer(o);
  67. if (first == null || o.compareTo(first) < 0)
  68. available.signalAll();
  69. return true;
  70. } finally {
  71. lock.unlock();
  72. }
  73. }
  74. /**
  75. * Adds the specified element to this delay queue. As the queue is
  76. * unbounded this method will never block.
  77. * @param o the element to add
  78. * @throws NullPointerException if the specified element is <tt>null</tt>.
  79. */
  80. public void put(E o) {
  81. offer(o);
  82. }
  83. /**
  84. * Inserts the specified element into this delay queue. As the queue is
  85. * unbounded this method will never block.
  86. * @param o the element to add
  87. * @param timeout This parameter is ignored as the method never blocks
  88. * @param unit This parameter is ignored as the method never blocks
  89. * @return <tt>true</tt>
  90. * @throws NullPointerException if the specified element is <tt>null</tt>.
  91. */
  92. public boolean offer(E o, long timeout, TimeUnit unit) {
  93. return offer(o);
  94. }
  95. /**
  96. * Adds the specified element to this queue.
  97. * @param o the element to add
  98. * @return <tt>true</tt> (as per the general contract of
  99. * <tt>Collection.add</tt>).
  100. *
  101. * @throws NullPointerException if the specified element is <tt>null</tt>.
  102. */
  103. public boolean add(E o) {
  104. return offer(o);
  105. }
  106. /**
  107. * Retrieves and removes the head of this queue, waiting
  108. * if no elements with an unexpired delay are present on this queue.
  109. * @return the head of this queue
  110. * @throws InterruptedException if interrupted while waiting.
  111. */
  112. public E take() throws InterruptedException {
  113. final ReentrantLock lock = this.lock;
  114. lock.lockInterruptibly();
  115. try {
  116. for (;;) {
  117. E first = q.peek();
  118. if (first == null) {
  119. available.await();
  120. } else {
  121. long delay = first.getDelay(TimeUnit.NANOSECONDS);
  122. if (delay > 0) {
  123. long tl = available.awaitNanos(delay);
  124. } else {
  125. E x = q.poll();
  126. assert x != null;
  127. if (q.size() != 0)
  128. available.signalAll(); // wake up other takers
  129. return x;
  130. }
  131. }
  132. }
  133. } finally {
  134. lock.unlock();
  135. }
  136. }
  137. /**
  138. * Retrieves and removes the head of this queue, waiting
  139. * if necessary up to the specified wait time if no elements with
  140. * an unexpired delay are
  141. * present on this queue.
  142. * @param timeout how long to wait before giving up, in units of
  143. * <tt>unit</tt>
  144. * @param unit a <tt>TimeUnit</tt> determining how to interpret the
  145. * <tt>timeout</tt> parameter
  146. * @return the head of this queue, or <tt>null</tt> if the
  147. * specified waiting time elapses before an element with
  148. * an unexpired delay is present.
  149. * @throws InterruptedException if interrupted while waiting.
  150. */
  151. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  152. final ReentrantLock lock = this.lock;
  153. lock.lockInterruptibly();
  154. long nanos = unit.toNanos(timeout);
  155. try {
  156. for (;;) {
  157. E first = q.peek();
  158. if (first == null) {
  159. if (nanos <= 0)
  160. return null;
  161. else
  162. nanos = available.awaitNanos(nanos);
  163. } else {
  164. long delay = first.getDelay(TimeUnit.NANOSECONDS);
  165. if (delay > 0) {
  166. if (delay > nanos)
  167. delay = nanos;
  168. long timeLeft = available.awaitNanos(delay);
  169. nanos -= delay - timeLeft;
  170. } else {
  171. E x = q.poll();
  172. assert x != null;
  173. if (q.size() != 0)
  174. available.signalAll();
  175. return x;
  176. }
  177. }
  178. }
  179. } finally {
  180. lock.unlock();
  181. }
  182. }
  183. /**
  184. * Retrieves and removes the head of this queue, or <tt>null</tt>
  185. * if this queue has no elements with an unexpired delay.
  186. *
  187. * @return the head of this queue, or <tt>null</tt> if this
  188. * queue has no elements with an unexpired delay.
  189. */
  190. public E poll() {
  191. final ReentrantLock lock = this.lock;
  192. lock.lock();
  193. try {
  194. E first = q.peek();
  195. if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  196. return null;
  197. else {
  198. E x = q.poll();
  199. assert x != null;
  200. if (q.size() != 0)
  201. available.signalAll();
  202. return x;
  203. }
  204. } finally {
  205. lock.unlock();
  206. }
  207. }
  208. /**
  209. * Retrieves, but does not remove, the head of this queue,
  210. * returning <tt>null</tt> if this queue has no elements with an
  211. * unexpired delay.
  212. *
  213. * @return the head of this queue, or <tt>null</tt> if this queue
  214. * has no elements with an unexpired delay.
  215. */
  216. public E peek() {
  217. final ReentrantLock lock = this.lock;
  218. lock.lock();
  219. try {
  220. return q.peek();
  221. } finally {
  222. lock.unlock();
  223. }
  224. }
  225. public int size() {
  226. final ReentrantLock lock = this.lock;
  227. lock.lock();
  228. try {
  229. return q.size();
  230. } finally {
  231. lock.unlock();
  232. }
  233. }
  234. public int drainTo(Collection<? super E> c) {
  235. if (c == null)
  236. throw new NullPointerException();
  237. if (c == this)
  238. throw new IllegalArgumentException();
  239. final ReentrantLock lock = this.lock;
  240. lock.lock();
  241. try {
  242. int n = 0;
  243. for (;;) {
  244. E first = q.peek();
  245. if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  246. break;
  247. c.add(q.poll());
  248. ++n;
  249. }
  250. if (n > 0)
  251. available.signalAll();
  252. return n;
  253. } finally {
  254. lock.unlock();
  255. }
  256. }
  257. public int drainTo(Collection<? super E> c, int maxElements) {
  258. if (c == null)
  259. throw new NullPointerException();
  260. if (c == this)
  261. throw new IllegalArgumentException();
  262. if (maxElements <= 0)
  263. return 0;
  264. final ReentrantLock lock = this.lock;
  265. lock.lock();
  266. try {
  267. int n = 0;
  268. while (n < maxElements) {
  269. E first = q.peek();
  270. if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  271. break;
  272. c.add(q.poll());
  273. ++n;
  274. }
  275. if (n > 0)
  276. available.signalAll();
  277. return n;
  278. } finally {
  279. lock.unlock();
  280. }
  281. }
  282. /**
  283. * Atomically removes all of the elements from this delay queue.
  284. * The queue will be empty after this call returns.
  285. */
  286. public void clear() {
  287. final ReentrantLock lock = this.lock;
  288. lock.lock();
  289. try {
  290. q.clear();
  291. } finally {
  292. lock.unlock();
  293. }
  294. }
  295. /**
  296. * Always returns <tt>Integer.MAX_VALUE</tt> because
  297. * a <tt>DelayQueue</tt> is not capacity constrained.
  298. * @return <tt>Integer.MAX_VALUE</tt>
  299. */
  300. public int remainingCapacity() {
  301. return Integer.MAX_VALUE;
  302. }
  303. public Object[] toArray() {
  304. final ReentrantLock lock = this.lock;
  305. lock.lock();
  306. try {
  307. return q.toArray();
  308. } finally {
  309. lock.unlock();
  310. }
  311. }
  312. public <T> T[] toArray(T[] array) {
  313. final ReentrantLock lock = this.lock;
  314. lock.lock();
  315. try {
  316. return q.toArray(array);
  317. } finally {
  318. lock.unlock();
  319. }
  320. }
  321. /**
  322. * Removes a single instance of the specified element from this
  323. * queue, if it is present.
  324. */
  325. public boolean remove(Object o) {
  326. final ReentrantLock lock = this.lock;
  327. lock.lock();
  328. try {
  329. return q.remove(o);
  330. } finally {
  331. lock.unlock();
  332. }
  333. }
  334. /**
  335. * Returns an iterator over the elements in this queue. The iterator
  336. * does not return the elements in any particular order. The
  337. * returned iterator is a thread-safe "fast-fail" iterator that will
  338. * throw {@link java.util.ConcurrentModificationException}
  339. * upon detected interference.
  340. *
  341. * @return an iterator over the elements in this queue.
  342. */
  343. public Iterator<E> iterator() {
  344. final ReentrantLock lock = this.lock;
  345. lock.lock();
  346. try {
  347. return new Itr(q.iterator());
  348. } finally {
  349. lock.unlock();
  350. }
  351. }
  352. private class Itr<E> implements Iterator<E> {
  353. private final Iterator<E> iter;
  354. Itr(Iterator<E> i) {
  355. iter = i;
  356. }
  357. public boolean hasNext() {
  358. return iter.hasNext();
  359. }
  360. public E next() {
  361. final ReentrantLock lock = DelayQueue.this.lock;
  362. lock.lock();
  363. try {
  364. return iter.next();
  365. } finally {
  366. lock.unlock();
  367. }
  368. }
  369. public void remove() {
  370. final ReentrantLock lock = DelayQueue.this.lock;
  371. lock.lock();
  372. try {
  373. iter.remove();
  374. } finally {
  375. lock.unlock();
  376. }
  377. }
  378. }
  379. }