1. /*
  2. * @(#)ArrayBlockingQueue.java 1.9 04/06/14
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. import java.util.*;
  10. /**
  11. * A bounded {@linkplain BlockingQueue blocking queue} backed by an
  12. * array. This queue orders elements FIFO (first-in-first-out). The
  13. * <em>head</em> of the queue is that element that has been on the
  14. * queue the longest time. The <em>tail</em> of the queue is that
  15. * element that has been on the queue the shortest time. New elements
  16. * are inserted at the tail of the queue, and the queue retrieval
  17. * operations obtain elements at the head of the queue.
  18. *
  19. * <p>This is a classic "bounded buffer", in which a
  20. * fixed-sized array holds elements inserted by producers and
  21. * extracted by consumers. Once created, the capacity cannot be
  22. * increased. Attempts to put an element to a full queue will
  23. * result in the put operation blocking; attempts to retrieve an
  24. * element from an empty queue will similarly block.
  25. *
  26. * <p> This class supports an optional fairness policy for ordering
  27. * waiting producer and consumer threads. By default, this ordering
  28. * is not guaranteed. However, a queue constructed with fairness set
  29. * to <tt>true</tt> grants threads access in FIFO order. Fairness
  30. * generally decreases throughput but reduces variability and avoids
  31. * starvation.
  32. *
  33. * <p>This class and its iterator implement all of the
  34. * <em>optional</em> methods of the {@link Collection} and {@link
  35. * Iterator} interfaces.
  36. *
  37. * <p>This class is a member of the
  38. * <a href="{@docRoot}/../guide/collections/index.html">
  39. * Java Collections Framework</a>.
  40. *
  41. * @since 1.5
  42. * @author Doug Lea
  43. * @param <E> the type of elements held in this collection
  44. */
  45. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  46. implements BlockingQueue<E>, java.io.Serializable {
  47. /**
  48. * Serialization ID. This class relies on default serialization
  49. * even for the items array, which is default-serialized, even if
  50. * it is empty. Otherwise it could not be declared final, which is
  51. * necessary here.
  52. */
  53. private static final long serialVersionUID = -817911632652898426L;
  54. /** The queued items */
  55. private final E[] items;
  56. /** items index for next take, poll or remove */
  57. private transient int takeIndex;
  58. /** items index for next put, offer, or add. */
  59. private transient int putIndex;
  60. /** Number of items in the queue */
  61. private int count;
  62. /*
  63. * Concurrency control uses the classic two-condition algorithm
  64. * found in any textbook.
  65. */
  66. /** Main lock guarding all access */
  67. private final ReentrantLock lock;
  68. /** Condition for waiting takes */
  69. private final Condition notEmpty;
  70. /** Condition for waiting puts */
  71. private final Condition notFull;
  72. // Internal helper methods
  73. /**
  74. * Circularly increment i.
  75. */
  76. final int inc(int i) {
  77. return (++i == items.length)? 0 : i;
  78. }
  79. /**
  80. * Insert element at current put position, advance, and signal.
  81. * Call only when holding lock.
  82. */
  83. private void insert(E x) {
  84. items[putIndex] = x;
  85. putIndex = inc(putIndex);
  86. ++count;
  87. notEmpty.signal();
  88. }
  89. /**
  90. * Extract element at current take position, advance, and signal.
  91. * Call only when holding lock.
  92. */
  93. private E extract() {
  94. final E[] items = this.items;
  95. E x = items[takeIndex];
  96. items[takeIndex] = null;
  97. takeIndex = inc(takeIndex);
  98. --count;
  99. notFull.signal();
  100. return x;
  101. }
  102. /**
  103. * Utility for remove and iterator.remove: Delete item at position i.
  104. * Call only when holding lock.
  105. */
  106. void removeAt(int i) {
  107. final E[] items = this.items;
  108. // if removing front item, just advance
  109. if (i == takeIndex) {
  110. items[takeIndex] = null;
  111. takeIndex = inc(takeIndex);
  112. } else {
  113. // slide over all others up through putIndex.
  114. for (;;) {
  115. int nexti = inc(i);
  116. if (nexti != putIndex) {
  117. items[i] = items[nexti];
  118. i = nexti;
  119. } else {
  120. items[i] = null;
  121. putIndex = i;
  122. break;
  123. }
  124. }
  125. }
  126. --count;
  127. notFull.signal();
  128. }
  129. /**
  130. * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
  131. * capacity and default access policy.
  132. * @param capacity the capacity of this queue
  133. * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
  134. */
  135. public ArrayBlockingQueue(int capacity) {
  136. this(capacity, false);
  137. }
  138. /**
  139. * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
  140. * capacity and the specified access policy.
  141. * @param capacity the capacity of this queue
  142. * @param fair if <tt>true</tt> then queue accesses for threads blocked
  143. * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
  144. * the access order is unspecified.
  145. * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
  146. */
  147. public ArrayBlockingQueue(int capacity, boolean fair) {
  148. if (capacity <= 0)
  149. throw new IllegalArgumentException();
  150. this.items = (E[]) new Object[capacity];
  151. lock = new ReentrantLock(fair);
  152. notEmpty = lock.newCondition();
  153. notFull = lock.newCondition();
  154. }
  155. /**
  156. * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
  157. * capacity, the specified access policy and initially containing the
  158. * elements of the given collection,
  159. * added in traversal order of the collection's iterator.
  160. * @param capacity the capacity of this queue
  161. * @param fair if <tt>true</tt> then queue accesses for threads blocked
  162. * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
  163. * the access order is unspecified.
  164. * @param c the collection of elements to initially contain
  165. * @throws IllegalArgumentException if <tt>capacity</tt> is less than
  166. * <tt>c.size()</tt>, or less than 1.
  167. * @throws NullPointerException if <tt>c</tt> or any element within it
  168. * is <tt>null</tt>
  169. */
  170. public ArrayBlockingQueue(int capacity, boolean fair,
  171. Collection<? extends E> c) {
  172. this(capacity, fair);
  173. if (capacity < c.size())
  174. throw new IllegalArgumentException();
  175. for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
  176. add(it.next());
  177. }
  178. /**
  179. * Inserts the specified element at the tail of this queue if possible,
  180. * returning immediately if this queue is full.
  181. *
  182. * @param o the element to add.
  183. * @return <tt>true</tt> if it was possible to add the element to
  184. * this queue, else <tt>false</tt>
  185. * @throws NullPointerException if the specified element is <tt>null</tt>
  186. */
  187. public boolean offer(E o) {
  188. if (o == null) throw new NullPointerException();
  189. final ReentrantLock lock = this.lock;
  190. lock.lock();
  191. try {
  192. if (count == items.length)
  193. return false;
  194. else {
  195. insert(o);
  196. return true;
  197. }
  198. } finally {
  199. lock.unlock();
  200. }
  201. }
  202. /**
  203. * Inserts the specified element at the tail of this queue, waiting if
  204. * necessary up to the specified wait time for space to become available.
  205. * @param o the element to add
  206. * @param timeout how long to wait before giving up, in units of
  207. * <tt>unit</tt>
  208. * @param unit a <tt>TimeUnit</tt> determining how to interpret the
  209. * <tt>timeout</tt> parameter
  210. * @return <tt>true</tt> if successful, or <tt>false</tt> if
  211. * the specified waiting time elapses before space is available.
  212. * @throws InterruptedException if interrupted while waiting.
  213. * @throws NullPointerException if the specified element is <tt>null</tt>.
  214. */
  215. public boolean offer(E o, long timeout, TimeUnit unit)
  216. throws InterruptedException {
  217. if (o == null) throw new NullPointerException();
  218. final ReentrantLock lock = this.lock;
  219. lock.lockInterruptibly();
  220. try {
  221. long nanos = unit.toNanos(timeout);
  222. for (;;) {
  223. if (count != items.length) {
  224. insert(o);
  225. return true;
  226. }
  227. if (nanos <= 0)
  228. return false;
  229. try {
  230. nanos = notFull.awaitNanos(nanos);
  231. } catch (InterruptedException ie) {
  232. notFull.signal(); // propagate to non-interrupted thread
  233. throw ie;
  234. }
  235. }
  236. } finally {
  237. lock.unlock();
  238. }
  239. }
  240. public E poll() {
  241. final ReentrantLock lock = this.lock;
  242. lock.lock();
  243. try {
  244. if (count == 0)
  245. return null;
  246. E x = extract();
  247. return x;
  248. } finally {
  249. lock.unlock();
  250. }
  251. }
  252. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  253. final ReentrantLock lock = this.lock;
  254. lock.lockInterruptibly();
  255. try {
  256. long nanos = unit.toNanos(timeout);
  257. for (;;) {
  258. if (count != 0) {
  259. E x = extract();
  260. return x;
  261. }
  262. if (nanos <= 0)
  263. return null;
  264. try {
  265. nanos = notEmpty.awaitNanos(nanos);
  266. } catch (InterruptedException ie) {
  267. notEmpty.signal(); // propagate to non-interrupted thread
  268. throw ie;
  269. }
  270. }
  271. } finally {
  272. lock.unlock();
  273. }
  274. }
  275. /**
  276. * Removes a single instance of the specified element from this
  277. * queue, if it is present.
  278. */
  279. public boolean remove(Object o) {
  280. if (o == null) return false;
  281. final E[] items = this.items;
  282. final ReentrantLock lock = this.lock;
  283. lock.lock();
  284. try {
  285. int i = takeIndex;
  286. int k = 0;
  287. for (;;) {
  288. if (k++ >= count)
  289. return false;
  290. if (o.equals(items[i])) {
  291. removeAt(i);
  292. return true;
  293. }
  294. i = inc(i);
  295. }
  296. } finally {
  297. lock.unlock();
  298. }
  299. }
  300. public E peek() {
  301. final ReentrantLock lock = this.lock;
  302. lock.lock();
  303. try {
  304. return (count == 0) ? null : items[takeIndex];
  305. } finally {
  306. lock.unlock();
  307. }
  308. }
  309. public E take() throws InterruptedException {
  310. final ReentrantLock lock = this.lock;
  311. lock.lockInterruptibly();
  312. try {
  313. try {
  314. while (count == 0)
  315. notEmpty.await();
  316. } catch (InterruptedException ie) {
  317. notEmpty.signal(); // propagate to non-interrupted thread
  318. throw ie;
  319. }
  320. E x = extract();
  321. return x;
  322. } finally {
  323. lock.unlock();
  324. }
  325. }
  326. /**
  327. * Adds the specified element to the tail of this queue, waiting if
  328. * necessary for space to become available.
  329. * @param o the element to add
  330. * @throws InterruptedException if interrupted while waiting.
  331. * @throws NullPointerException if the specified element is <tt>null</tt>.
  332. */
  333. public void put(E o) throws InterruptedException {
  334. if (o == null) throw new NullPointerException();
  335. final E[] items = this.items;
  336. final ReentrantLock lock = this.lock;
  337. lock.lockInterruptibly();
  338. try {
  339. try {
  340. while (count == items.length)
  341. notFull.await();
  342. } catch (InterruptedException ie) {
  343. notFull.signal(); // propagate to non-interrupted thread
  344. throw ie;
  345. }
  346. insert(o);
  347. } finally {
  348. lock.unlock();
  349. }
  350. }
  351. // this doc comment is overridden to remove the reference to collections
  352. // greater in size than Integer.MAX_VALUE
  353. /**
  354. * Returns the number of elements in this queue.
  355. *
  356. * @return the number of elements in this queue.
  357. */
  358. public int size() {
  359. final ReentrantLock lock = this.lock;
  360. lock.lock();
  361. try {
  362. return count;
  363. } finally {
  364. lock.unlock();
  365. }
  366. }
  367. // this doc comment is a modified copy of the inherited doc comment,
  368. // without the reference to unlimited queues.
  369. /**
  370. * Returns the number of elements that this queue can ideally (in
  371. * the absence of memory or resource constraints) accept without
  372. * blocking. This is always equal to the initial capacity of this queue
  373. * less the current <tt>size</tt> of this queue.
  374. * <p>Note that you <em>cannot</em> always tell if
  375. * an attempt to <tt>add</tt> an element will succeed by
  376. * inspecting <tt>remainingCapacity</tt> because it may be the
  377. * case that a waiting consumer is ready to <tt>take</tt> an
  378. * element out of an otherwise full queue.
  379. */
  380. public int remainingCapacity() {
  381. final ReentrantLock lock = this.lock;
  382. lock.lock();
  383. try {
  384. return items.length - count;
  385. } finally {
  386. lock.unlock();
  387. }
  388. }
  389. public boolean contains(Object o) {
  390. if (o == null) return false;
  391. final E[] items = this.items;
  392. final ReentrantLock lock = this.lock;
  393. lock.lock();
  394. try {
  395. int i = takeIndex;
  396. int k = 0;
  397. while (k++ < count) {
  398. if (o.equals(items[i]))
  399. return true;
  400. i = inc(i);
  401. }
  402. return false;
  403. } finally {
  404. lock.unlock();
  405. }
  406. }
  407. public Object[] toArray() {
  408. final E[] items = this.items;
  409. final ReentrantLock lock = this.lock;
  410. lock.lock();
  411. try {
  412. Object[] a = new Object[count];
  413. int k = 0;
  414. int i = takeIndex;
  415. while (k < count) {
  416. a[k++] = items[i];
  417. i = inc(i);
  418. }
  419. return a;
  420. } finally {
  421. lock.unlock();
  422. }
  423. }
  424. public <T> T[] toArray(T[] a) {
  425. final E[] items = this.items;
  426. final ReentrantLock lock = this.lock;
  427. lock.lock();
  428. try {
  429. if (a.length < count)
  430. a = (T[])java.lang.reflect.Array.newInstance(
  431. a.getClass().getComponentType(),
  432. count
  433. );
  434. int k = 0;
  435. int i = takeIndex;
  436. while (k < count) {
  437. a[k++] = (T)items[i];
  438. i = inc(i);
  439. }
  440. if (a.length > count)
  441. a[count] = null;
  442. return a;
  443. } finally {
  444. lock.unlock();
  445. }
  446. }
  447. public String toString() {
  448. final ReentrantLock lock = this.lock;
  449. lock.lock();
  450. try {
  451. return super.toString();
  452. } finally {
  453. lock.unlock();
  454. }
  455. }
  456. /**
  457. * Atomically removes all of the elements from this queue.
  458. * The queue will be empty after this call returns.
  459. */
  460. public void clear() {
  461. final E[] items = this.items;
  462. final ReentrantLock lock = this.lock;
  463. lock.lock();
  464. try {
  465. int i = takeIndex;
  466. int k = count;
  467. while (k-- > 0) {
  468. items[i] = null;
  469. i = inc(i);
  470. }
  471. count = 0;
  472. putIndex = 0;
  473. takeIndex = 0;
  474. notFull.signalAll();
  475. } finally {
  476. lock.unlock();
  477. }
  478. }
  479. public int drainTo(Collection<? super E> c) {
  480. if (c == null)
  481. throw new NullPointerException();
  482. if (c == this)
  483. throw new IllegalArgumentException();
  484. final E[] items = this.items;
  485. final ReentrantLock lock = this.lock;
  486. lock.lock();
  487. try {
  488. int i = takeIndex;
  489. int n = 0;
  490. int max = count;
  491. while (n < max) {
  492. c.add(items[i]);
  493. items[i] = null;
  494. i = inc(i);
  495. ++n;
  496. }
  497. if (n > 0) {
  498. count = 0;
  499. putIndex = 0;
  500. takeIndex = 0;
  501. notFull.signalAll();
  502. }
  503. return n;
  504. } finally {
  505. lock.unlock();
  506. }
  507. }
  508. public int drainTo(Collection<? super E> c, int maxElements) {
  509. if (c == null)
  510. throw new NullPointerException();
  511. if (c == this)
  512. throw new IllegalArgumentException();
  513. if (maxElements <= 0)
  514. return 0;
  515. final E[] items = this.items;
  516. final ReentrantLock lock = this.lock;
  517. lock.lock();
  518. try {
  519. int i = takeIndex;
  520. int n = 0;
  521. int sz = count;
  522. int max = (maxElements < count)? maxElements : count;
  523. while (n < max) {
  524. c.add(items[i]);
  525. items[i] = null;
  526. i = inc(i);
  527. ++n;
  528. }
  529. if (n > 0) {
  530. count -= n;
  531. takeIndex = i;
  532. notFull.signalAll();
  533. }
  534. return n;
  535. } finally {
  536. lock.unlock();
  537. }
  538. }
  539. /**
  540. * Returns an iterator over the elements in this queue in proper sequence.
  541. * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
  542. * will never throw {@link java.util.ConcurrentModificationException},
  543. * and guarantees to traverse elements as they existed upon
  544. * construction of the iterator, and may (but is not guaranteed to)
  545. * reflect any modifications subsequent to construction.
  546. *
  547. * @return an iterator over the elements in this queue in proper sequence.
  548. */
  549. public Iterator<E> iterator() {
  550. final ReentrantLock lock = this.lock;
  551. lock.lock();
  552. try {
  553. return new Itr();
  554. } finally {
  555. lock.unlock();
  556. }
  557. }
  558. /**
  559. * Iterator for ArrayBlockingQueue
  560. */
  561. private class Itr implements Iterator<E> {
  562. /**
  563. * Index of element to be returned by next,
  564. * or a negative number if no such.
  565. */
  566. private int nextIndex;
  567. /**
  568. * nextItem holds on to item fields because once we claim
  569. * that an element exists in hasNext(), we must return it in
  570. * the following next() call even if it was in the process of
  571. * being removed when hasNext() was called.
  572. **/
  573. private E nextItem;
  574. /**
  575. * Index of element returned by most recent call to next.
  576. * Reset to -1 if this element is deleted by a call to remove.
  577. */
  578. private int lastRet;
  579. Itr() {
  580. lastRet = -1;
  581. if (count == 0)
  582. nextIndex = -1;
  583. else {
  584. nextIndex = takeIndex;
  585. nextItem = items[takeIndex];
  586. }
  587. }
  588. public boolean hasNext() {
  589. /*
  590. * No sync. We can return true by mistake here
  591. * only if this iterator passed across threads,
  592. * which we don't support anyway.
  593. */
  594. return nextIndex >= 0;
  595. }
  596. /**
  597. * Check whether nextIndex is valid; if so setting nextItem.
  598. * Stops iterator when either hits putIndex or sees null item.
  599. */
  600. private void checkNext() {
  601. if (nextIndex == putIndex) {
  602. nextIndex = -1;
  603. nextItem = null;
  604. } else {
  605. nextItem = items[nextIndex];
  606. if (nextItem == null)
  607. nextIndex = -1;
  608. }
  609. }
  610. public E next() {
  611. final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  612. lock.lock();
  613. try {
  614. if (nextIndex < 0)
  615. throw new NoSuchElementException();
  616. lastRet = nextIndex;
  617. E x = nextItem;
  618. nextIndex = inc(nextIndex);
  619. checkNext();
  620. return x;
  621. } finally {
  622. lock.unlock();
  623. }
  624. }
  625. public void remove() {
  626. final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  627. lock.lock();
  628. try {
  629. int i = lastRet;
  630. if (i == -1)
  631. throw new IllegalStateException();
  632. lastRet = -1;
  633. int ti = takeIndex;
  634. removeAt(i);
  635. // back up cursor (reset to front if was first element)
  636. nextIndex = (i == ti) ? takeIndex : i;
  637. checkNext();
  638. } finally {
  639. lock.unlock();
  640. }
  641. }
  642. }
  643. }