diff options
author | Philipp Haller <hallerp@gmail.com> | 2010-07-12 14:13:11 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2010-07-12 14:13:11 +0000 |
commit | fe378b7d81a97665a50d47e31a1a410ad71f492f (patch) | |
tree | 8e6f077d105646e9ff4fdf7eea7223e0d98e978f /src | |
parent | a012c4c92010b640e3448593a948aa5bf3f37950 (diff) | |
download | scala-fe378b7d81a97665a50d47e31a1a410ad71f492f.tar.gz scala-fe378b7d81a97665a50d47e31a1a410ad71f492f.tar.bz2 scala-fe378b7d81a97665a50d47e31a1a410ad71f492f.zip |
Updated LinkedBlockingQueue to latest jsr166 ve...
Updated LinkedBlockingQueue to latest jsr166
version. Verified bug fix using test case in
[http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6806875 JDK bug
report 6806875]. Closes #3629.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/threadpool/BlockingQueue.java | 24 | ||||
-rw-r--r-- | src/actors/scala/actors/threadpool/LinkedBlockingQueue.java | 716 |
2 files changed, 417 insertions, 323 deletions
diff --git a/src/actors/scala/actors/threadpool/BlockingQueue.java b/src/actors/scala/actors/threadpool/BlockingQueue.java index 880c2580da..1b4e808d84 100644 --- a/src/actors/scala/actors/threadpool/BlockingQueue.java +++ b/src/actors/scala/actors/threadpool/BlockingQueue.java @@ -7,9 +7,10 @@ package scala.actors.threadpool; import java.util.Collection; +import java.util.Queue; /** - * A {@link edu.emory.mathcs.backport.java.util.Queue} that additionally supports operations + * A {@link java.util.Queue} that additionally supports operations * that wait for the queue to become non-empty when retrieving an * element, and wait for space to become available in the queue when * storing an element. @@ -146,8 +147,9 @@ import java.util.Collection; * * @since 1.5 * @author Doug Lea + * @param <E> the type of elements held in this collection */ -public interface BlockingQueue extends Queue { +public interface BlockingQueue<E> extends Queue<E> { /** * Inserts the specified element into this queue if it is possible to do * so immediately without violating capacity restrictions, returning @@ -157,7 +159,7 @@ public interface BlockingQueue extends Queue { * use {@link #offer(Object) offer}. * * @param e the element to add - * @return <tt>true</tt> (as specified by {@link java.util.Collection#add}) + * @return <tt>true</tt> (as specified by {@link Collection#add}) * @throws IllegalStateException if the element cannot be added at this * time due to capacity restrictions * @throws ClassCastException if the class of the specified element @@ -166,7 +168,7 @@ public interface BlockingQueue extends Queue { * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ - boolean add(Object e); + boolean add(E e); /** * Inserts the specified element into this queue if it is possible to do @@ -185,7 +187,7 @@ public interface BlockingQueue extends Queue { * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ - boolean offer(Object e); + boolean offer(E e); /** * Inserts the specified element into this queue, waiting if necessary @@ -199,7 +201,7 @@ public interface BlockingQueue extends Queue { * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ - void put(Object e) throws InterruptedException; + void put(E e) throws InterruptedException; /** * Inserts the specified element into this queue, waiting up to the @@ -219,7 +221,7 @@ public interface BlockingQueue extends Queue { * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ - boolean offer(Object e, long timeout, TimeUnit unit) + boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** @@ -229,7 +231,7 @@ public interface BlockingQueue extends Queue { * @return the head of this queue * @throws InterruptedException if interrupted while waiting */ - Object take() throws InterruptedException; + E take() throws InterruptedException; /** * Retrieves and removes the head of this queue, waiting up to the @@ -243,7 +245,7 @@ public interface BlockingQueue extends Queue { * specified waiting time elapses before an element is available * @throws InterruptedException if interrupted while waiting */ - Object poll(long timeout, TimeUnit unit) + E poll(long timeout, TimeUnit unit) throws InterruptedException; /** @@ -313,7 +315,7 @@ public interface BlockingQueue extends Queue { * queue, or some property of an element of this queue prevents * it from being added to the specified collection */ - int drainTo(Collection c); + int drainTo(Collection<? super E> c); /** * Removes at most the given number of available elements from @@ -338,5 +340,5 @@ public interface BlockingQueue extends Queue { * queue, or some property of an element of this queue prevents * it from being added to the specified collection */ - int drainTo(Collection c, int maxElements); + int drainTo(Collection<? super E> c, int maxElements); } diff --git a/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java b/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java index 87fecff09c..f434ab0e7b 100644 --- a/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java +++ b/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java @@ -6,11 +6,13 @@ package scala.actors.threadpool; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; -//import edu.emory.mathcs.backport.java.util.*; -import scala.actors.threadpool.helpers.*; /** * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on @@ -41,10 +43,11 @@ import scala.actors.threadpool.helpers.*; * * @since 1.5 * @author Doug Lea + * @param <E> the type of elements held in this collection * */ -public class LinkedBlockingQueue extends AbstractQueue - implements BlockingQueue, java.io.Serializable { +public class LinkedBlockingQueue<E> extends AbstractQueue<E> + implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /* @@ -59,43 +62,87 @@ public class LinkedBlockingQueue extends AbstractQueue * items have been entered since the signal. And symmetrically for * takes signalling puts. Operations such as remove(Object) and * iterators acquire both locks. + * + * Visibility between writers and readers is provided as follows: + * + * Whenever an element is enqueued, the putLock is acquired and + * count updated. A subsequent reader guarantees visibility to the + * enqueued Node by either acquiring the putLock (via fullyLock) + * or by acquiring the takeLock, and then reading n = count.get(); + * this gives visibility to the first n items. + * + * To implement weakly consistent iterators, it appears we need to + * keep all Nodes GC-reachable from a predecessor dequeued Node. + * That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to advance to head.next. */ /** * Linked list node class */ - static class Node { - /** The item, volatile to ensure barrier separating write and read */ - volatile Object item; - Node next; - Node(Object x) { item = x; } + static class Node<E> { + E item; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head.next + * - null, meaning there is no successor (this is the last node) + */ + Node<E> next; + + Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ - private volatile int count = 0; + private final AtomicInteger count = new AtomicInteger(0); - /** Head of linked list */ - private transient Node head; + /** + * Head of linked list. + * Invariant: head.item == null + */ + private transient Node<E> head; - /** Tail of linked list */ - private transient Node last; + /** + * Tail of linked list. + * Invariant: last.next == null + */ + private transient Node<E> last; /** Lock held by take, poll, etc */ - private final Object takeLock = new SerializableLock(); + private final ReentrantLock takeLock = new ReentrantLock(); + + /** Wait queue for waiting takes */ + private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ - private final Object putLock = new SerializableLock(); + private final ReentrantLock putLock = new ReentrantLock(); + + /** Wait queue for waiting puts */ + private final Condition notFull = putLock.newCondition(); /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { - synchronized (takeLock) { - takeLock.notify(); + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); } } @@ -103,34 +150,69 @@ public class LinkedBlockingQueue extends AbstractQueue * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { - synchronized (putLock) { - putLock.notify(); + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } finally { + putLock.unlock(); } } /** * Creates a node and links it at end of queue. + * * @param x the item */ - private void insert(Object x) { - last = last.next = new Node(x); + private void enqueue(E x) { + // assert putLock.isHeldByCurrentThread(); + // assert last.next == null; + last = last.next = new Node<E>(x); } /** - * Removes a node from head of queue, + * Removes a node from head of queue. + * * @return the node */ - private Object extract() { - Node first = head.next; + private E dequeue() { + // assert takeLock.isHeldByCurrentThread(); + // assert head.item == null; + Node<E> h = head; + Node<E> first = h.next; + h.next = h; // help GC head = first; - Object x = first.item; + E x = first.item; first.item = null; return x; } + /** + * Lock to prevent both puts and takes. + */ + void fullyLock() { + putLock.lock(); + takeLock.lock(); + } /** - * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * Unlock to allow both puts and takes. + */ + void fullyUnlock() { + takeLock.unlock(); + putLock.unlock(); + } + +// /** +// * Tells whether both locks are held by current thread. +// */ +// boolean isFullyLocked() { +// return (putLock.isHeldByCurrentThread() && +// takeLock.isHeldByCurrentThread()); +// } + + /** + * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { @@ -138,20 +220,20 @@ public class LinkedBlockingQueue extends AbstractQueue } /** - * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. + * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue - * @throws IllegalArgumentException if <tt>capacity</tt> is not greater + * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; - last = head = new Node(null); + last = head = new Node<E>(null); } /** - * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. @@ -160,11 +242,23 @@ public class LinkedBlockingQueue extends AbstractQueue * @throws NullPointerException if the specified collection or any * of its elements are null */ - public LinkedBlockingQueue(Collection c) { + public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); - for (Iterator itr = c.iterator(); itr.hasNext();) { - Object e = itr.next(); - add(e); + final ReentrantLock putLock = this.putLock; + putLock.lock(); // Never contended, but necessary for visibility + try { + int n = 0; + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (n == capacity) + throw new IllegalStateException("Queue full"); + enqueue(e); + ++n; + } + count.set(n); + } finally { + putLock.unlock(); } } @@ -177,7 +271,7 @@ public class LinkedBlockingQueue extends AbstractQueue * @return the number of elements in this queue */ public int size() { - return count; + return count.get(); } // this doc comment is a modified copy of the inherited doc comment, @@ -186,15 +280,15 @@ public class LinkedBlockingQueue extends AbstractQueue * Returns the number of additional elements that this queue can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this queue - * less the current <tt>size</tt> of this queue. + * less the current {@code size} of this queue. * * <p>Note that you <em>cannot</em> always tell if an attempt to insert - * an element will succeed by inspecting <tt>remainingCapacity</tt> + * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ public int remainingCapacity() { - return capacity - count; + return capacity - count.get(); } /** @@ -204,34 +298,33 @@ public class LinkedBlockingQueue extends AbstractQueue * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ - public void put(Object e) throws InterruptedException { + public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); - // Note: convention in all put/take/etc is to preset - // local var holding count negative to indicate failure unless set. + // Note: convention in all put/take/etc is to preset local var + // holding count negative to indicate failure unless set. int c = -1; - synchronized (putLock) { + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are - * signalled if it ever changes from - * capacity. Similarly for all other uses of count in - * other wait guards. + * signalled if it ever changes from capacity. Similarly + * for all other uses of count in other wait guards. */ - try { - while (count == capacity) - putLock.wait(); - } catch (InterruptedException ie) { - putLock.notify(); // propagate to a non-interrupted thread - throw ie; + while (count.get() == capacity) { + notFull.await(); } - insert(e); - synchronized (this) { c = count++; } + enqueue(e); + c = count.getAndIncrement(); if (c + 1 < capacity) - putLock.notify(); + notFull.signal(); + } finally { + putLock.unlock(); } - if (c == 0) signalNotEmpty(); } @@ -240,37 +333,32 @@ public class LinkedBlockingQueue extends AbstractQueue * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * - * @return <tt>true</tt> if successful, or <tt>false</tt> if + * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available. * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ - public boolean offer(Object e, long timeout, TimeUnit unit) + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; - synchronized (putLock) { - long deadline = Utils.nanoTime() + nanos; - for (;;) { - if (count < capacity) { - insert(e); - synchronized (this) { c = count++; } - if (c + 1 < capacity) - putLock.notify(); - break; - } + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + while (count.get() == capacity) { if (nanos <= 0) return false; - try { - TimeUnit.NANOSECONDS.timedWait(putLock, nanos); - nanos = deadline - Utils.nanoTime(); - } catch (InterruptedException ie) { - putLock.notify(); // propagate to a non-interrupted thread - throw ie; - } + nanos = notFull.awaitNanos(nanos); } + enqueue(e); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); } if (c == 0) signalNotEmpty(); @@ -280,7 +368,7 @@ public class LinkedBlockingQueue extends AbstractQueue /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, - * returning <tt>true</tt> upon success and <tt>false</tt> if this queue + * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to @@ -288,18 +376,23 @@ public class LinkedBlockingQueue extends AbstractQueue * * @throws NullPointerException if the specified element is null */ - public boolean offer(Object e) { + public boolean offer(E e) { if (e == null) throw new NullPointerException(); - if (count == capacity) + final AtomicInteger count = this.count; + if (count.get() == capacity) return false; int c = -1; - synchronized (putLock) { - if (count < capacity) { - insert(e); - synchronized (this) { c = count++; } + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + if (count.get() < capacity) { + enqueue(e); + c = count.getAndIncrement(); if (c + 1 < capacity) - putLock.notify(); + notFull.signal(); } + } finally { + putLock.unlock(); } if (c == 0) signalNotEmpty(); @@ -307,128 +400,134 @@ public class LinkedBlockingQueue extends AbstractQueue } - public Object take() throws InterruptedException { - Object x; + public E take() throws InterruptedException { + E x; int c = -1; - synchronized (takeLock) { - try { - while (count == 0) - takeLock.wait(); - } catch (InterruptedException ie) { - takeLock.notify(); // propagate to a non-interrupted thread - throw ie; + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + while (count.get() == 0) { + notEmpty.await(); } - - x = extract(); - synchronized (this) { c = count--; } + x = dequeue(); + c = count.getAndDecrement(); if (c > 1) - takeLock.notify(); + notEmpty.signal(); + } finally { + takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } - public Object poll(long timeout, TimeUnit unit) throws InterruptedException { - Object x = null; + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E x = null; int c = -1; long nanos = unit.toNanos(timeout); - synchronized (takeLock) { - long deadline = Utils.nanoTime() + nanos; - for (;;) { - if (count > 0) { - x = extract(); - synchronized (this) { c = count--; } - if (c > 1) - takeLock.notify(); - break; - } + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + while (count.get() == 0) { if (nanos <= 0) return null; - try { - TimeUnit.NANOSECONDS.timedWait(takeLock, nanos); - nanos = deadline - Utils.nanoTime(); - } catch (InterruptedException ie) { - takeLock.notify(); // propagate to a non-interrupted thread - throw ie; - } + nanos = notEmpty.awaitNanos(nanos); } + x = dequeue(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } - public Object poll() { - if (count == 0) + public E poll() { + final AtomicInteger count = this.count; + if (count.get() == 0) return null; - Object x = null; + E x = null; int c = -1; - synchronized (takeLock) { - if (count > 0) { - x = extract(); - synchronized (this) { c = count--; } + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + if (count.get() > 0) { + x = dequeue(); + c = count.getAndDecrement(); if (c > 1) - takeLock.notify(); + notEmpty.signal(); } + } finally { + takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } - - public Object peek() { - if (count == 0) + public E peek() { + if (count.get() == 0) return null; - synchronized (takeLock) { - Node first = head.next; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + Node<E> first = head.next; if (first == null) return null; else return first.item; + } finally { + takeLock.unlock(); } } /** + * Unlinks interior Node p with predecessor trail. + */ + void unlink(Node<E> p, Node<E> trail) { + // assert isFullyLocked(); + // p.next is not changed, to allow iterators that are + // traversing p to maintain their weak-consistency guarantee. + p.item = null; + trail.next = p.next; + if (last == p) + last = trail; + if (count.getAndDecrement() == capacity) + notFull.signal(); + } + + /** * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element <tt>e</tt> such - * that <tt>o.equals(e)</tt>, if this queue contains one or more such + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such * elements. - * Returns <tt>true</tt> if this queue contained the specified element + * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present - * @return <tt>true</tt> if this queue changed as a result of the call + * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; - boolean removed = false; - synchronized (putLock) { - synchronized (takeLock) { - Node trail = head; - Node p = head.next; - while (p != null) { - if (o.equals(p.item)) { - removed = true; - break; - } - trail = p; - p = p.next; - } - if (removed) { - p.item = null; - trail.next = p.next; - if (last == p) - last = trail; - synchronized (this) { - if (count-- == capacity) - putLock.notifyAll(); - } + fullyLock(); + try { + for (Node<E> trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { + if (o.equals(p.item)) { + unlink(p, trail); + return true; } } + return false; + } finally { + fullyUnlock(); } - return removed; } /** @@ -445,15 +544,16 @@ public class LinkedBlockingQueue extends AbstractQueue * @return an array containing all of the elements in this queue */ public Object[] toArray() { - synchronized (putLock) { - synchronized (takeLock) { - int size = count; - Object[] a = new Object[size]; - int k = 0; - for (Node p = head.next; p != null; p = p.next) - a[k++] = p.item; - return a; - } + fullyLock(); + try { + int size = count.get(); + Object[] a = new Object[size]; + int k = 0; + for (Node<E> p = head.next; p != null; p = p.next) + a[k++] = p.item; + return a; + } finally { + fullyUnlock(); } } @@ -467,22 +567,22 @@ public class LinkedBlockingQueue extends AbstractQueue * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to - * <tt>null</tt>. + * {@code null}. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - * <p>Suppose <tt>x</tt> is a queue known to contain only strings. + * <p>Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly - * allocated array of <tt>String</tt>: + * allocated array of {@code String}: * * <pre> * String[] y = x.toArray(new String[0]);</pre> * - * Note that <tt>toArray(new Object[0])</tt> is identical in function to - * <tt>toArray()</tt>. + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the @@ -493,29 +593,32 @@ public class LinkedBlockingQueue extends AbstractQueue * this queue * @throws NullPointerException if the specified array is null */ - public Object[] toArray(Object[] a) { - synchronized (putLock) { - synchronized (takeLock) { - int size = count; - if (a.length < size) - a = (Object[])java.lang.reflect.Array.newInstance - (a.getClass().getComponentType(), size); - - int k = 0; - for (Node p = head.next; p != null; p = p.next) - a[k++] = (Object)p.item; - if (a.length > k) - a[k] = null; - return a; - } + @SuppressWarnings("unchecked") + public <T> T[] toArray(T[] a) { + fullyLock(); + try { + int size = count.get(); + if (a.length < size) + a = (T[])java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), size); + + int k = 0; + for (Node<E> p = head.next; p != null; p = p.next) + a[k++] = (T)p.item; + if (a.length > k) + a[k] = null; + return a; + } finally { + fullyUnlock(); } } public String toString() { - synchronized (putLock) { - synchronized (takeLock) { - return super.toString(); - } + fullyLock(); + try { + return super.toString(); + } finally { + fullyUnlock(); } } @@ -524,19 +627,18 @@ public class LinkedBlockingQueue extends AbstractQueue * The queue will be empty after this call returns. */ public void clear() { - synchronized (putLock) { - synchronized (takeLock) { - head.next = null; - assert head.item == null; - last = head; - int c; - synchronized (this) { - c = count; - count = 0; - } - if (c == capacity) - putLock.notifyAll(); + fullyLock(); + try { + for (Node<E> p, h = head; (p = h.next) != null; h = p) { + h.next = h; + p.item = null; } + head = last; + // assert head.item == null && head.next == null; + if (count.getAndSet(0) == capacity) + notFull.signal(); + } finally { + fullyUnlock(); } } @@ -546,35 +648,8 @@ public class LinkedBlockingQueue extends AbstractQueue * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ - public int drainTo(Collection c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - Node first; - synchronized (putLock) { - synchronized (takeLock) { - first = head.next; - head.next = null; - assert head.item == null; - last = head; - int cold; - synchronized (this) { - cold = count; - count = 0; - } - if (cold == capacity) - putLock.notifyAll(); - } - } - // Transfer the elements outside of locks - int n = 0; - for (Node p = first; p != null; p = p.next) { - c.add(p.item); - p.item = null; - ++n; - } - return n; + public int drainTo(Collection<? super E> c) { + return drainTo(c, Integer.MAX_VALUE); } /** @@ -583,70 +658,77 @@ public class LinkedBlockingQueue extends AbstractQueue * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ - public int drainTo(Collection c, int maxElements) { + public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); - synchronized (putLock) { - synchronized (takeLock) { - int n = 0; - Node p = head.next; - while (p != null && n < maxElements) { + boolean signalNotFull = false; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + int n = Math.min(maxElements, count.get()); + // count.get provides visibility to first n Nodes + Node<E> h = head; + int i = 0; + try { + while (i < n) { + Node<E> p = h.next; c.add(p.item); p.item = null; - p = p.next; - ++n; - } - if (n != 0) { - head.next = p; - assert head.item == null; - if (p == null) - last = head; - int cold; - synchronized (this) { - cold = count; - count -= n; - } - if (cold == capacity) - putLock.notifyAll(); + h.next = h; + h = p; + ++i; } return n; + } finally { + // Restore invariants even if c.add() threw + if (i > 0) { + // assert h.item == null; + head = h; + signalNotFull = (count.getAndAdd(-i) == capacity); + } } + } finally { + takeLock.unlock(); + if (signalNotFull) + signalNotFull(); } } /** * Returns an iterator over the elements in this queue in proper sequence. - * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that - * will never throw {@link java.util.ConcurrentModificationException}, + * The returned {@code Iterator} is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException + * ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ - public Iterator iterator() { + public Iterator<E> iterator() { return new Itr(); } - private class Itr implements Iterator { + private class Itr implements Iterator<E> { /* - * Basic weak-consistent iterator. At all times hold the next + * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ - private Node current; - private Node lastRet; - private Object currentElement; + private Node<E> current; + private Node<E> lastRet; + private E currentElement; Itr() { - synchronized (putLock) { - synchronized (takeLock) { - current = head.next; - if (current != null) - currentElement = current.item; - } + fullyLock(); + try { + current = head.next; + if (current != null) + currentElement = current.item; + } finally { + fullyUnlock(); } } @@ -654,45 +736,56 @@ public class LinkedBlockingQueue extends AbstractQueue return current != null; } - public Object next() { - synchronized (putLock) { - synchronized (takeLock) { - if (current == null) - throw new NoSuchElementException(); - Object x = currentElement; - lastRet = current; - current = current.next; - if (current != null) - currentElement = current.item; - return x; - } + /** + * Returns the next live successor of p, or null if no such. + * + * Unlike other traversal methods, iterators need to handle both: + * - dequeued nodes (p.next == p) + * - (possibly multiple) interior removed nodes (p.item == null) + */ + private Node<E> nextNode(Node<E> p) { + for (;;) { + Node<E> s = p.next; + if (s == p) + return head.next; + if (s == null || s.item != null) + return s; + p = s; + } + } + + public E next() { + fullyLock(); + try { + if (current == null) + throw new NoSuchElementException(); + E x = currentElement; + lastRet = current; + current = nextNode(current); + currentElement = (current == null) ? null : current.item; + return x; + } finally { + fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); - synchronized (putLock) { - synchronized (takeLock) { - Node node = lastRet; - lastRet = null; - Node trail = head; - Node p = head.next; - while (p != null && p != node) { - trail = p; - p = p.next; - } + fullyLock(); + try { + Node<E> node = lastRet; + lastRet = null; + for (Node<E> trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { if (p == node) { - p.item = null; - trail.next = p.next; - if (last == p) - last = trail; - int c; - synchronized (this) { c = count--; } - if (c == capacity) - putLock.notifyAll(); + unlink(p, trail); + break; } } + } finally { + fullyUnlock(); } } } @@ -701,31 +794,33 @@ public class LinkedBlockingQueue extends AbstractQueue * Save the state to a stream (that is, serialize it). * * @serialData The capacity is emitted (int), followed by all of - * its elements (each an <tt>Object</tt>) in the proper order, + * its elements (each an {@code Object}) in the proper order, * followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { - synchronized (putLock) { - synchronized (takeLock) { - // Write out any hidden stuff, plus capacity - s.defaultWriteObject(); + fullyLock(); + try { + // Write out any hidden stuff, plus capacity + s.defaultWriteObject(); - // Write out all elements in the proper order. - for (Node p = head.next; p != null; p = p.next) - s.writeObject(p.item); + // Write out all elements in the proper order. + for (Node<E> p = head.next; p != null; p = p.next) + s.writeObject(p.item); - // Use trailing null as sentinel - s.writeObject(null); - } + // Use trailing null as sentinel + s.writeObject(null); + } finally { + fullyUnlock(); } } /** * Reconstitute this queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -733,19 +828,16 @@ public class LinkedBlockingQueue extends AbstractQueue // Read in capacity, and any hidden stuff s.defaultReadObject(); - synchronized (this) { count = 0; } - last = head = new Node(null); + count.set(0); + last = head = new Node<E>(null); // Read in all elements and place in queue for (;;) { - Object item = (Object)s.readObject(); + @SuppressWarnings("unchecked") + E item = (E)s.readObject(); if (item == null) break; add(item); } } - - private static class SerializableLock implements java.io.Serializable { - private final static long serialVersionUID = -8856990691138858668L; - } } |