1. /*
  2. * @(#)PriorityBlockingQueue.java 1.9 04/06/11
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. import java.util.*;
  10. /**
  11. * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  12. * the same ordering rules as class {@link PriorityQueue} and supplies
  13. * blocking retrieval operations. While this queue is logically
  14. * unbounded, attempted additions may fail due to resource exhaustion
  15. * (causing <tt>OutOfMemoryError</tt>). This class does not permit
  16. * <tt>null</tt> elements. A priority queue relying on natural
  17. * ordering also does not permit insertion of non-comparable objects
  18. * (doing so results in <tt>ClassCastException</tt>).
  19. *
  20. * <p>This class and its iterator implement all of the
  21. * <em>optional</em> methods of the {@link Collection} and {@link
  22. * Iterator} interfaces.
  23. * The Iterator provided in method {@link #iterator()} is
  24. * <em>not</em> guaranteed to traverse the elements of the
  25. * PriorityBlockingQueue in any particular order. If you need ordered
  26. * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
  27. *
  28. * <p>This class is a member of the
  29. * <a href="{@docRoot}/../guide/collections/index.html">
  30. * Java Collections Framework</a>.
  31. *
  32. * @since 1.5
  33. * @author Doug Lea
  34. * @param <E> the type of elements held in this collection
  35. */
  36. public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  37. implements BlockingQueue<E>, java.io.Serializable {
  38. private static final long serialVersionUID = 5595510919245408276L;
  39. private final PriorityQueue<E> q;
  40. private final ReentrantLock lock = new ReentrantLock(true);
  41. private final Condition notEmpty = lock.newCondition();
  42. /**
  43. * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
  44. * capacity
  45. * (11) that orders its elements according to their natural
  46. * ordering (using <tt>Comparable</tt>).
  47. */
  48. public PriorityBlockingQueue() {
  49. q = new PriorityQueue<E>();
  50. }
  51. /**
  52. * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
  53. * capacity
  54. * that orders its elements according to their natural ordering
  55. * (using <tt>Comparable</tt>).
  56. *
  57. * @param initialCapacity the initial capacity for this priority queue.
  58. * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
  59. * than 1
  60. */
  61. public PriorityBlockingQueue(int initialCapacity) {
  62. q = new PriorityQueue<E>(initialCapacity, null);
  63. }
  64. /**
  65. * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
  66. * capacity
  67. * that orders its elements according to the specified comparator.
  68. *
  69. * @param initialCapacity the initial capacity for this priority queue.
  70. * @param comparator the comparator used to order this priority queue.
  71. * If <tt>null</tt> then the order depends on the elements' natural
  72. * ordering.
  73. * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
  74. * than 1
  75. */
  76. public PriorityBlockingQueue(int initialCapacity,
  77. Comparator<? super E> comparator) {
  78. q = new PriorityQueue<E>(initialCapacity, comparator);
  79. }
  80. /**
  81. * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
  82. * in the specified collection. The priority queue has an initial
  83. * capacity of 110% of the size of the specified collection. If
  84. * the specified collection is a {@link SortedSet} or a {@link
  85. * PriorityQueue}, this priority queue will be sorted according to
  86. * the same comparator, or according to its elements' natural
  87. * order if the collection is sorted according to its elements'
  88. * natural order. Otherwise, this priority queue is ordered
  89. * according to its elements' natural order.
  90. *
  91. * @param c the collection whose elements are to be placed
  92. * into this priority queue.
  93. * @throws ClassCastException if elements of the specified collection
  94. * cannot be compared to one another according to the priority
  95. * queue's ordering.
  96. * @throws NullPointerException if <tt>c</tt> or any element within it
  97. * is <tt>null</tt>
  98. */
  99. public PriorityBlockingQueue(Collection<? extends E> c) {
  100. q = new PriorityQueue<E>(c);
  101. }
  102. // these first few override just to update doc comments
  103. /**
  104. * Adds the specified element to this queue.
  105. * @param o the element to add
  106. * @return <tt>true</tt> (as per the general contract of
  107. * <tt>Collection.add</tt>).
  108. *
  109. * @throws NullPointerException if the specified element is <tt>null</tt>.
  110. * @throws ClassCastException if the specified element cannot be compared
  111. * with elements currently in the priority queue according
  112. * to the priority queue's ordering.
  113. */
  114. public boolean add(E o) {
  115. return super.add(o);
  116. }
  117. /**
  118. * Returns the comparator used to order this collection, or <tt>null</tt>
  119. * if this collection is sorted according to its elements natural ordering
  120. * (using <tt>Comparable</tt>).
  121. *
  122. * @return the comparator used to order this collection, or <tt>null</tt>
  123. * if this collection is sorted according to its elements natural ordering.
  124. */
  125. public Comparator<? super E> comparator() {
  126. return q.comparator();
  127. }
  128. /**
  129. * Inserts the specified element into this priority queue.
  130. *
  131. * @param o the element to add
  132. * @return <tt>true</tt>
  133. * @throws ClassCastException if the specified element cannot be compared
  134. * with elements currently in the priority queue according
  135. * to the priority queue's ordering.
  136. * @throws NullPointerException if the specified element is <tt>null</tt>.
  137. */
  138. public boolean offer(E o) {
  139. if (o == null) throw new NullPointerException();
  140. final ReentrantLock lock = this.lock;
  141. lock.lock();
  142. try {
  143. boolean ok = q.offer(o);
  144. assert ok;
  145. notEmpty.signal();
  146. return true;
  147. } finally {
  148. lock.unlock();
  149. }
  150. }
  151. /**
  152. * Adds the specified element to this priority queue. As the queue is
  153. * unbounded this method will never block.
  154. * @param o the element to add
  155. * @throws ClassCastException if the element cannot be compared
  156. * with elements currently in the priority queue according
  157. * to the priority queue's ordering.
  158. * @throws NullPointerException if the specified element is <tt>null</tt>.
  159. */
  160. public void put(E o) {
  161. offer(o); // never need to block
  162. }
  163. /**
  164. * Inserts the specified element into this priority queue. As the queue is
  165. * unbounded this method will never block.
  166. * @param o the element to add
  167. * @param timeout This parameter is ignored as the method never blocks
  168. * @param unit This parameter is ignored as the method never blocks
  169. * @return <tt>true</tt>
  170. * @throws ClassCastException if the element cannot be compared
  171. * with elements currently in the priority queue according
  172. * to the priority queue's ordering.
  173. * @throws NullPointerException if the specified element is <tt>null</tt>.
  174. */
  175. public boolean offer(E o, long timeout, TimeUnit unit) {
  176. return offer(o); // never need to block
  177. }
  178. public E take() throws InterruptedException {
  179. final ReentrantLock lock = this.lock;
  180. lock.lockInterruptibly();
  181. try {
  182. try {
  183. while (q.size() == 0)
  184. notEmpty.await();
  185. } catch (InterruptedException ie) {
  186. notEmpty.signal(); // propagate to non-interrupted thread
  187. throw ie;
  188. }
  189. E x = q.poll();
  190. assert x != null;
  191. return x;
  192. } finally {
  193. lock.unlock();
  194. }
  195. }
  196. public E poll() {
  197. final ReentrantLock lock = this.lock;
  198. lock.lock();
  199. try {
  200. return q.poll();
  201. } finally {
  202. lock.unlock();
  203. }
  204. }
  205. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  206. long nanos = unit.toNanos(timeout);
  207. final ReentrantLock lock = this.lock;
  208. lock.lockInterruptibly();
  209. try {
  210. for (;;) {
  211. E x = q.poll();
  212. if (x != null)
  213. return x;
  214. if (nanos <= 0)
  215. return null;
  216. try {
  217. nanos = notEmpty.awaitNanos(nanos);
  218. } catch (InterruptedException ie) {
  219. notEmpty.signal(); // propagate to non-interrupted thread
  220. throw ie;
  221. }
  222. }
  223. } finally {
  224. lock.unlock();
  225. }
  226. }
  227. public E peek() {
  228. final ReentrantLock lock = this.lock;
  229. lock.lock();
  230. try {
  231. return q.peek();
  232. } finally {
  233. lock.unlock();
  234. }
  235. }
  236. public int size() {
  237. final ReentrantLock lock = this.lock;
  238. lock.lock();
  239. try {
  240. return q.size();
  241. } finally {
  242. lock.unlock();
  243. }
  244. }
  245. /**
  246. * Always returns <tt>Integer.MAX_VALUE</tt> because
  247. * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
  248. * @return <tt>Integer.MAX_VALUE</tt>
  249. */
  250. public int remainingCapacity() {
  251. return Integer.MAX_VALUE;
  252. }
  253. /**
  254. * Removes a single instance of the specified element from this
  255. * queue, if it is present.
  256. */
  257. public boolean remove(Object o) {
  258. final ReentrantLock lock = this.lock;
  259. lock.lock();
  260. try {
  261. return q.remove(o);
  262. } finally {
  263. lock.unlock();
  264. }
  265. }
  266. public boolean contains(Object o) {
  267. final ReentrantLock lock = this.lock;
  268. lock.lock();
  269. try {
  270. return q.contains(o);
  271. } finally {
  272. lock.unlock();
  273. }
  274. }
  275. public Object[] toArray() {
  276. final ReentrantLock lock = this.lock;
  277. lock.lock();
  278. try {
  279. return q.toArray();
  280. } finally {
  281. lock.unlock();
  282. }
  283. }
  284. public String toString() {
  285. final ReentrantLock lock = this.lock;
  286. lock.lock();
  287. try {
  288. return q.toString();
  289. } finally {
  290. lock.unlock();
  291. }
  292. }
  293. public int drainTo(Collection<? super E> c) {
  294. if (c == null)
  295. throw new NullPointerException();
  296. if (c == this)
  297. throw new IllegalArgumentException();
  298. final ReentrantLock lock = this.lock;
  299. lock.lock();
  300. try {
  301. int n = 0;
  302. E e;
  303. while ( (e = q.poll()) != null) {
  304. c.add(e);
  305. ++n;
  306. }
  307. return n;
  308. } finally {
  309. lock.unlock();
  310. }
  311. }
  312. public int drainTo(Collection<? super E> c, int maxElements) {
  313. if (c == null)
  314. throw new NullPointerException();
  315. if (c == this)
  316. throw new IllegalArgumentException();
  317. if (maxElements <= 0)
  318. return 0;
  319. final ReentrantLock lock = this.lock;
  320. lock.lock();
  321. try {
  322. int n = 0;
  323. E e;
  324. while (n < maxElements && (e = q.poll()) != null) {
  325. c.add(e);
  326. ++n;
  327. }
  328. return n;
  329. } finally {
  330. lock.unlock();
  331. }
  332. }
  333. /**
  334. * Atomically removes all of the elements from this queue.
  335. * The queue will be empty after this call returns.
  336. */
  337. public void clear() {
  338. final ReentrantLock lock = this.lock;
  339. lock.lock();
  340. try {
  341. q.clear();
  342. } finally {
  343. lock.unlock();
  344. }
  345. }
  346. public <T> T[] toArray(T[] a) {
  347. final ReentrantLock lock = this.lock;
  348. lock.lock();
  349. try {
  350. return q.toArray(a);
  351. } finally {
  352. lock.unlock();
  353. }
  354. }
  355. /**
  356. * Returns an iterator over the elements in this queue. The
  357. * iterator does not return the elements in any particular order.
  358. * The returned iterator is a thread-safe "fast-fail" iterator
  359. * that will throw {@link
  360. * java.util.ConcurrentModificationException} upon detected
  361. * interference.
  362. *
  363. * @return an iterator over the elements in this queue.
  364. */
  365. public Iterator<E> iterator() {
  366. final ReentrantLock lock = this.lock;
  367. lock.lock();
  368. try {
  369. return new Itr(q.iterator());
  370. } finally {
  371. lock.unlock();
  372. }
  373. }
  374. private class Itr<E> implements Iterator<E> {
  375. private final Iterator<E> iter;
  376. Itr(Iterator<E> i) {
  377. iter = i;
  378. }
  379. public boolean hasNext() {
  380. /*
  381. * No sync -- we rely on underlying hasNext to be
  382. * stateless, in which case we can return true by mistake
  383. * only when next() will subsequently throw
  384. * ConcurrentModificationException.
  385. */
  386. return iter.hasNext();
  387. }
  388. public E next() {
  389. ReentrantLock lock = PriorityBlockingQueue.this.lock;
  390. lock.lock();
  391. try {
  392. return iter.next();
  393. } finally {
  394. lock.unlock();
  395. }
  396. }
  397. public void remove() {
  398. ReentrantLock lock = PriorityBlockingQueue.this.lock;
  399. lock.lock();
  400. try {
  401. iter.remove();
  402. } finally {
  403. lock.unlock();
  404. }
  405. }
  406. }
  407. /**
  408. * Save the state to a stream (that is, serialize it). This
  409. * merely wraps default serialization within lock. The
  410. * serialization strategy for items is left to underlying
  411. * Queue. Note that locking is not needed on deserialization, so
  412. * readObject is not defined, just relying on default.
  413. */
  414. private void writeObject(java.io.ObjectOutputStream s)
  415. throws java.io.IOException {
  416. lock.lock();
  417. try {
  418. s.defaultWriteObject();
  419. } finally {
  420. lock.unlock();
  421. }
  422. }
  423. }