summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntonio Cunei <antonio.cunei@epfl.ch>2010-09-02 10:11:37 +0000
committerAntonio Cunei <antonio.cunei@epfl.ch>2010-09-02 10:11:37 +0000
commit9f3ad9f11f7943d9dfe9fb664c7d58712a05f8e6 (patch)
treeb0b3dc668a392b35ff4b30e36074d83255cf0f17
parentc45a1430699d7438f46d5af6f4db63f3ed587805 (diff)
downloadscala-9f3ad9f11f7943d9dfe9fb664c7d58712a05f8e6.tar.gz
scala-9f3ad9f11f7943d9dfe9fb664c7d58712a05f8e6.tar.bz2
scala-9f3ad9f11f7943d9dfe9fb664c7d58712a05f8e6.zip
Merged revisions 22536,22540,22542 via svnmerge...
Merged revisions 22536,22540,22542 via svnmerge from https://lampsvn.epfl.ch/svn-repos/scala/scala/trunk ........ r22536 | phaller | 2010-07-12 10:29:45 +0200 (Mon, 12 Jul 2010) | 1 line Added test case for #3645. Closes #3645. ........ r22540 | phaller | 2010-07-12 13:49:28 +0200 (Mon, 12 Jul 2010) | 1 line Fixed buggy test case. Closes #3551. No review. ........ r22542 | phaller | 2010-07-12 16:13:11 +0200 (Mon, 12 Jul 2010) | 1 line Updated LinkedBlockingQueue to latest jsr166 version. Verified bug fix using test case in [http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6806875 JDK bug report 6806875]. Closes #3629. ........
-rw-r--r--src/actors/scala/actors/threadpool/BlockingQueue.java24
-rw-r--r--src/actors/scala/actors/threadpool/LinkedBlockingQueue.java716
-rw-r--r--test/files/jvm/actor-executor2.scala12
-rw-r--r--test/files/run/t3645.scala6
4 files changed, 432 insertions, 326 deletions
diff --git a/src/actors/scala/actors/threadpool/BlockingQueue.java b/src/actors/scala/actors/threadpool/BlockingQueue.java
index 880c2580da..1b4e808d84 100644
--- a/src/actors/scala/actors/threadpool/BlockingQueue.java
+++ b/src/actors/scala/actors/threadpool/BlockingQueue.java
@@ -7,9 +7,10 @@
package scala.actors.threadpool;
import java.util.Collection;
+import java.util.Queue;
/**
- * A {@link edu.emory.mathcs.backport.java.util.Queue} that additionally supports operations
+ * A {@link java.util.Queue} that additionally supports operations
* that wait for the queue to become non-empty when retrieving an
* element, and wait for space to become available in the queue when
* storing an element.
@@ -146,8 +147,9 @@ import java.util.Collection;
*
* @since 1.5
* @author Doug Lea
+ * @param <E> the type of elements held in this collection
*/
-public interface BlockingQueue extends Queue {
+public interface BlockingQueue<E> extends Queue<E> {
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
@@ -157,7 +159,7 @@ public interface BlockingQueue extends Queue {
* use {@link #offer(Object) offer}.
*
* @param e the element to add
- * @return <tt>true</tt> (as specified by {@link java.util.Collection#add})
+ * @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
@@ -166,7 +168,7 @@ public interface BlockingQueue extends Queue {
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
- boolean add(Object e);
+ boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
@@ -185,7 +187,7 @@ public interface BlockingQueue extends Queue {
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
- boolean offer(Object e);
+ boolean offer(E e);
/**
* Inserts the specified element into this queue, waiting if necessary
@@ -199,7 +201,7 @@ public interface BlockingQueue extends Queue {
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
- void put(Object e) throws InterruptedException;
+ void put(E e) throws InterruptedException;
/**
* Inserts the specified element into this queue, waiting up to the
@@ -219,7 +221,7 @@ public interface BlockingQueue extends Queue {
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
- boolean offer(Object e, long timeout, TimeUnit unit)
+ boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
@@ -229,7 +231,7 @@ public interface BlockingQueue extends Queue {
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
- Object take() throws InterruptedException;
+ E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
@@ -243,7 +245,7 @@ public interface BlockingQueue extends Queue {
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
- Object poll(long timeout, TimeUnit unit)
+ E poll(long timeout, TimeUnit unit)
throws InterruptedException;
/**
@@ -313,7 +315,7 @@ public interface BlockingQueue extends Queue {
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
- int drainTo(Collection c);
+ int drainTo(Collection<? super E> c);
/**
* Removes at most the given number of available elements from
@@ -338,5 +340,5 @@ public interface BlockingQueue extends Queue {
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
- int drainTo(Collection c, int maxElements);
+ int drainTo(Collection<? super E> c, int maxElements);
}
diff --git a/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java b/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java
index 87fecff09c..f434ab0e7b 100644
--- a/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java
+++ b/src/actors/scala/actors/threadpool/LinkedBlockingQueue.java
@@ -6,11 +6,13 @@
package scala.actors.threadpool;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
-//import edu.emory.mathcs.backport.java.util.*;
-import scala.actors.threadpool.helpers.*;
/**
* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
@@ -41,10 +43,11 @@ import scala.actors.threadpool.helpers.*;
*
* @since 1.5
* @author Doug Lea
+ * @param <E> the type of elements held in this collection
*
*/
-public class LinkedBlockingQueue extends AbstractQueue
- implements BlockingQueue, java.io.Serializable {
+public class LinkedBlockingQueue<E> extends AbstractQueue<E>
+ implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
/*
@@ -59,43 +62,87 @@ public class LinkedBlockingQueue extends AbstractQueue
* items have been entered since the signal. And symmetrically for
* takes signalling puts. Operations such as remove(Object) and
* iterators acquire both locks.
+ *
+ * Visibility between writers and readers is provided as follows:
+ *
+ * Whenever an element is enqueued, the putLock is acquired and
+ * count updated. A subsequent reader guarantees visibility to the
+ * enqueued Node by either acquiring the putLock (via fullyLock)
+ * or by acquiring the takeLock, and then reading n = count.get();
+ * this gives visibility to the first n items.
+ *
+ * To implement weakly consistent iterators, it appears we need to
+ * keep all Nodes GC-reachable from a predecessor dequeued Node.
+ * That would cause two problems:
+ * - allow a rogue Iterator to cause unbounded memory retention
+ * - cause cross-generational linking of old Nodes to new Nodes if
+ * a Node was tenured while live, which generational GCs have a
+ * hard time dealing with, causing repeated major collections.
+ * However, only non-deleted Nodes need to be reachable from
+ * dequeued Nodes, and reachability does not necessarily have to
+ * be of the kind understood by the GC. We use the trick of
+ * linking a Node that has just been dequeued to itself. Such a
+ * self-link implicitly means to advance to head.next.
*/
/**
* Linked list node class
*/
- static class Node {
- /** The item, volatile to ensure barrier separating write and read */
- volatile Object item;
- Node next;
- Node(Object x) { item = x; }
+ static class Node<E> {
+ E item;
+
+ /**
+ * One of:
+ * - the real successor Node
+ * - this Node, meaning the successor is head.next
+ * - null, meaning there is no successor (this is the last node)
+ */
+ Node<E> next;
+
+ Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
- private volatile int count = 0;
+ private final AtomicInteger count = new AtomicInteger(0);
- /** Head of linked list */
- private transient Node head;
+ /**
+ * Head of linked list.
+ * Invariant: head.item == null
+ */
+ private transient Node<E> head;
- /** Tail of linked list */
- private transient Node last;
+ /**
+ * Tail of linked list.
+ * Invariant: last.next == null
+ */
+ private transient Node<E> last;
/** Lock held by take, poll, etc */
- private final Object takeLock = new SerializableLock();
+ private final ReentrantLock takeLock = new ReentrantLock();
+
+ /** Wait queue for waiting takes */
+ private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
- private final Object putLock = new SerializableLock();
+ private final ReentrantLock putLock = new ReentrantLock();
+
+ /** Wait queue for waiting puts */
+ private final Condition notFull = putLock.newCondition();
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
- synchronized (takeLock) {
- takeLock.notify();
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
}
}
@@ -103,34 +150,69 @@ public class LinkedBlockingQueue extends AbstractQueue
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
- synchronized (putLock) {
- putLock.notify();
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock();
+ try {
+ notFull.signal();
+ } finally {
+ putLock.unlock();
}
}
/**
* Creates a node and links it at end of queue.
+ *
* @param x the item
*/
- private void insert(Object x) {
- last = last.next = new Node(x);
+ private void enqueue(E x) {
+ // assert putLock.isHeldByCurrentThread();
+ // assert last.next == null;
+ last = last.next = new Node<E>(x);
}
/**
- * Removes a node from head of queue,
+ * Removes a node from head of queue.
+ *
* @return the node
*/
- private Object extract() {
- Node first = head.next;
+ private E dequeue() {
+ // assert takeLock.isHeldByCurrentThread();
+ // assert head.item == null;
+ Node<E> h = head;
+ Node<E> first = h.next;
+ h.next = h; // help GC
head = first;
- Object x = first.item;
+ E x = first.item;
first.item = null;
return x;
}
+ /**
+ * Lock to prevent both puts and takes.
+ */
+ void fullyLock() {
+ putLock.lock();
+ takeLock.lock();
+ }
/**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+ * Unlock to allow both puts and takes.
+ */
+ void fullyUnlock() {
+ takeLock.unlock();
+ putLock.unlock();
+ }
+
+// /**
+// * Tells whether both locks are held by current thread.
+// */
+// boolean isFullyLocked() {
+// return (putLock.isHeldByCurrentThread() &&
+// takeLock.isHeldByCurrentThread());
+// }
+
+ /**
+ * Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
@@ -138,20 +220,20 @@ public class LinkedBlockingQueue extends AbstractQueue
}
/**
- * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
+ * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
- * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
+ * @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
- last = head = new Node(null);
+ last = head = new Node<E>(null);
}
/**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+ * Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
@@ -160,11 +242,23 @@ public class LinkedBlockingQueue extends AbstractQueue
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
- public LinkedBlockingQueue(Collection c) {
+ public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
- for (Iterator itr = c.iterator(); itr.hasNext();) {
- Object e = itr.next();
- add(e);
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock(); // Never contended, but necessary for visibility
+ try {
+ int n = 0;
+ for (E e : c) {
+ if (e == null)
+ throw new NullPointerException();
+ if (n == capacity)
+ throw new IllegalStateException("Queue full");
+ enqueue(e);
+ ++n;
+ }
+ count.set(n);
+ } finally {
+ putLock.unlock();
}
}
@@ -177,7 +271,7 @@ public class LinkedBlockingQueue extends AbstractQueue
* @return the number of elements in this queue
*/
public int size() {
- return count;
+ return count.get();
}
// this doc comment is a modified copy of the inherited doc comment,
@@ -186,15 +280,15 @@ public class LinkedBlockingQueue extends AbstractQueue
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
- * less the current <tt>size</tt> of this queue.
+ * less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
- return capacity - count;
+ return capacity - count.get();
}
/**
@@ -204,34 +298,33 @@ public class LinkedBlockingQueue extends AbstractQueue
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
- public void put(Object e) throws InterruptedException {
+ public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
- // Note: convention in all put/take/etc is to preset
- // local var holding count negative to indicate failure unless set.
+ // Note: convention in all put/take/etc is to preset local var
+ // holding count negative to indicate failure unless set.
int c = -1;
- synchronized (putLock) {
+ final ReentrantLock putLock = this.putLock;
+ final AtomicInteger count = this.count;
+ putLock.lockInterruptibly();
+ try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
- * signalled if it ever changes from
- * capacity. Similarly for all other uses of count in
- * other wait guards.
+ * signalled if it ever changes from capacity. Similarly
+ * for all other uses of count in other wait guards.
*/
- try {
- while (count == capacity)
- putLock.wait();
- } catch (InterruptedException ie) {
- putLock.notify(); // propagate to a non-interrupted thread
- throw ie;
+ while (count.get() == capacity) {
+ notFull.await();
}
- insert(e);
- synchronized (this) { c = count++; }
+ enqueue(e);
+ c = count.getAndIncrement();
if (c + 1 < capacity)
- putLock.notify();
+ notFull.signal();
+ } finally {
+ putLock.unlock();
}
-
if (c == 0)
signalNotEmpty();
}
@@ -240,37 +333,32 @@ public class LinkedBlockingQueue extends AbstractQueue
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
- * @return <tt>true</tt> if successful, or <tt>false</tt> if
+ * @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available.
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
- public boolean offer(Object e, long timeout, TimeUnit unit)
+ public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
- synchronized (putLock) {
- long deadline = Utils.nanoTime() + nanos;
- for (;;) {
- if (count < capacity) {
- insert(e);
- synchronized (this) { c = count++; }
- if (c + 1 < capacity)
- putLock.notify();
- break;
- }
+ final ReentrantLock putLock = this.putLock;
+ final AtomicInteger count = this.count;
+ putLock.lockInterruptibly();
+ try {
+ while (count.get() == capacity) {
if (nanos <= 0)
return false;
- try {
- TimeUnit.NANOSECONDS.timedWait(putLock, nanos);
- nanos = deadline - Utils.nanoTime();
- } catch (InterruptedException ie) {
- putLock.notify(); // propagate to a non-interrupted thread
- throw ie;
- }
+ nanos = notFull.awaitNanos(nanos);
}
+ enqueue(e);
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ } finally {
+ putLock.unlock();
}
if (c == 0)
signalNotEmpty();
@@ -280,7 +368,7 @@ public class LinkedBlockingQueue extends AbstractQueue
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
- * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
+ * returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
@@ -288,18 +376,23 @@ public class LinkedBlockingQueue extends AbstractQueue
*
* @throws NullPointerException if the specified element is null
*/
- public boolean offer(Object e) {
+ public boolean offer(E e) {
if (e == null) throw new NullPointerException();
- if (count == capacity)
+ final AtomicInteger count = this.count;
+ if (count.get() == capacity)
return false;
int c = -1;
- synchronized (putLock) {
- if (count < capacity) {
- insert(e);
- synchronized (this) { c = count++; }
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock();
+ try {
+ if (count.get() < capacity) {
+ enqueue(e);
+ c = count.getAndIncrement();
if (c + 1 < capacity)
- putLock.notify();
+ notFull.signal();
}
+ } finally {
+ putLock.unlock();
}
if (c == 0)
signalNotEmpty();
@@ -307,128 +400,134 @@ public class LinkedBlockingQueue extends AbstractQueue
}
- public Object take() throws InterruptedException {
- Object x;
+ public E take() throws InterruptedException {
+ E x;
int c = -1;
- synchronized (takeLock) {
- try {
- while (count == 0)
- takeLock.wait();
- } catch (InterruptedException ie) {
- takeLock.notify(); // propagate to a non-interrupted thread
- throw ie;
+ final AtomicInteger count = this.count;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lockInterruptibly();
+ try {
+ while (count.get() == 0) {
+ notEmpty.await();
}
-
- x = extract();
- synchronized (this) { c = count--; }
+ x = dequeue();
+ c = count.getAndDecrement();
if (c > 1)
- takeLock.notify();
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
- Object x = null;
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
- synchronized (takeLock) {
- long deadline = Utils.nanoTime() + nanos;
- for (;;) {
- if (count > 0) {
- x = extract();
- synchronized (this) { c = count--; }
- if (c > 1)
- takeLock.notify();
- break;
- }
+ final AtomicInteger count = this.count;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lockInterruptibly();
+ try {
+ while (count.get() == 0) {
if (nanos <= 0)
return null;
- try {
- TimeUnit.NANOSECONDS.timedWait(takeLock, nanos);
- nanos = deadline - Utils.nanoTime();
- } catch (InterruptedException ie) {
- takeLock.notify(); // propagate to a non-interrupted thread
- throw ie;
- }
+ nanos = notEmpty.awaitNanos(nanos);
}
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- public Object poll() {
- if (count == 0)
+ public E poll() {
+ final AtomicInteger count = this.count;
+ if (count.get() == 0)
return null;
- Object x = null;
+ E x = null;
int c = -1;
- synchronized (takeLock) {
- if (count > 0) {
- x = extract();
- synchronized (this) { c = count--; }
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ if (count.get() > 0) {
+ x = dequeue();
+ c = count.getAndDecrement();
if (c > 1)
- takeLock.notify();
+ notEmpty.signal();
}
+ } finally {
+ takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
-
- public Object peek() {
- if (count == 0)
+ public E peek() {
+ if (count.get() == 0)
return null;
- synchronized (takeLock) {
- Node first = head.next;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
+ } finally {
+ takeLock.unlock();
}
}
/**
+ * Unlinks interior Node p with predecessor trail.
+ */
+ void unlink(Node<E> p, Node<E> trail) {
+ // assert isFullyLocked();
+ // p.next is not changed, to allow iterators that are
+ // traversing p to maintain their weak-consistency guarantee.
+ p.item = null;
+ trail.next = p.next;
+ if (last == p)
+ last = trail;
+ if (count.getAndDecrement() == capacity)
+ notFull.signal();
+ }
+
+ /**
* Removes a single instance of the specified element from this queue,
- * if it is present. More formally, removes an element <tt>e</tt> such
- * that <tt>o.equals(e)</tt>, if this queue contains one or more such
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
* elements.
- * Returns <tt>true</tt> if this queue contained the specified element
+ * Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
- * @return <tt>true</tt> if this queue changed as a result of the call
+ * @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
- boolean removed = false;
- synchronized (putLock) {
- synchronized (takeLock) {
- Node trail = head;
- Node p = head.next;
- while (p != null) {
- if (o.equals(p.item)) {
- removed = true;
- break;
- }
- trail = p;
- p = p.next;
- }
- if (removed) {
- p.item = null;
- trail.next = p.next;
- if (last == p)
- last = trail;
- synchronized (this) {
- if (count-- == capacity)
- putLock.notifyAll();
- }
+ fullyLock();
+ try {
+ for (Node<E> trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
+ if (o.equals(p.item)) {
+ unlink(p, trail);
+ return true;
}
}
+ return false;
+ } finally {
+ fullyUnlock();
}
- return removed;
}
/**
@@ -445,15 +544,16 @@ public class LinkedBlockingQueue extends AbstractQueue
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
- synchronized (putLock) {
- synchronized (takeLock) {
- int size = count;
- Object[] a = new Object[size];
- int k = 0;
- for (Node p = head.next; p != null; p = p.next)
- a[k++] = p.item;
- return a;
- }
+ fullyLock();
+ try {
+ int size = count.get();
+ Object[] a = new Object[size];
+ int k = 0;
+ for (Node<E> p = head.next; p != null; p = p.next)
+ a[k++] = p.item;
+ return a;
+ } finally {
+ fullyUnlock();
}
}
@@ -467,22 +567,22 @@ public class LinkedBlockingQueue extends AbstractQueue
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
- * <tt>null</tt>.
+ * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
- * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
+ * <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
- * allocated array of <tt>String</tt>:
+ * allocated array of {@code String}:
*
* <pre>
* String[] y = x.toArray(new String[0]);</pre>
*
- * Note that <tt>toArray(new Object[0])</tt> is identical in function to
- * <tt>toArray()</tt>.
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -493,29 +593,32 @@ public class LinkedBlockingQueue extends AbstractQueue
* this queue
* @throws NullPointerException if the specified array is null
*/
- public Object[] toArray(Object[] a) {
- synchronized (putLock) {
- synchronized (takeLock) {
- int size = count;
- if (a.length < size)
- a = (Object[])java.lang.reflect.Array.newInstance
- (a.getClass().getComponentType(), size);
-
- int k = 0;
- for (Node p = head.next; p != null; p = p.next)
- a[k++] = (Object)p.item;
- if (a.length > k)
- a[k] = null;
- return a;
- }
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ fullyLock();
+ try {
+ int size = count.get();
+ if (a.length < size)
+ a = (T[])java.lang.reflect.Array.newInstance
+ (a.getClass().getComponentType(), size);
+
+ int k = 0;
+ for (Node<E> p = head.next; p != null; p = p.next)
+ a[k++] = (T)p.item;
+ if (a.length > k)
+ a[k] = null;
+ return a;
+ } finally {
+ fullyUnlock();
}
}
public String toString() {
- synchronized (putLock) {
- synchronized (takeLock) {
- return super.toString();
- }
+ fullyLock();
+ try {
+ return super.toString();
+ } finally {
+ fullyUnlock();
}
}
@@ -524,19 +627,18 @@ public class LinkedBlockingQueue extends AbstractQueue
* The queue will be empty after this call returns.
*/
public void clear() {
- synchronized (putLock) {
- synchronized (takeLock) {
- head.next = null;
- assert head.item == null;
- last = head;
- int c;
- synchronized (this) {
- c = count;
- count = 0;
- }
- if (c == capacity)
- putLock.notifyAll();
+ fullyLock();
+ try {
+ for (Node<E> p, h = head; (p = h.next) != null; h = p) {
+ h.next = h;
+ p.item = null;
}
+ head = last;
+ // assert head.item == null && head.next == null;
+ if (count.getAndSet(0) == capacity)
+ notFull.signal();
+ } finally {
+ fullyUnlock();
}
}
@@ -546,35 +648,8 @@ public class LinkedBlockingQueue extends AbstractQueue
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
- public int drainTo(Collection c) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- Node first;
- synchronized (putLock) {
- synchronized (takeLock) {
- first = head.next;
- head.next = null;
- assert head.item == null;
- last = head;
- int cold;
- synchronized (this) {
- cold = count;
- count = 0;
- }
- if (cold == capacity)
- putLock.notifyAll();
- }
- }
- // Transfer the elements outside of locks
- int n = 0;
- for (Node p = first; p != null; p = p.next) {
- c.add(p.item);
- p.item = null;
- ++n;
- }
- return n;
+ public int drainTo(Collection<? super E> c) {
+ return drainTo(c, Integer.MAX_VALUE);
}
/**
@@ -583,70 +658,77 @@ public class LinkedBlockingQueue extends AbstractQueue
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
- public int drainTo(Collection c, int maxElements) {
+ public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
- synchronized (putLock) {
- synchronized (takeLock) {
- int n = 0;
- Node p = head.next;
- while (p != null && n < maxElements) {
+ boolean signalNotFull = false;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ int n = Math.min(maxElements, count.get());
+ // count.get provides visibility to first n Nodes
+ Node<E> h = head;
+ int i = 0;
+ try {
+ while (i < n) {
+ Node<E> p = h.next;
c.add(p.item);
p.item = null;
- p = p.next;
- ++n;
- }
- if (n != 0) {
- head.next = p;
- assert head.item == null;
- if (p == null)
- last = head;
- int cold;
- synchronized (this) {
- cold = count;
- count -= n;
- }
- if (cold == capacity)
- putLock.notifyAll();
+ h.next = h;
+ h = p;
+ ++i;
}
return n;
+ } finally {
+ // Restore invariants even if c.add() threw
+ if (i > 0) {
+ // assert h.item == null;
+ head = h;
+ signalNotFull = (count.getAndAdd(-i) == capacity);
+ }
}
+ } finally {
+ takeLock.unlock();
+ if (signalNotFull)
+ signalNotFull();
}
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
- * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
- * will never throw {@link java.util.ConcurrentModificationException},
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
- public Iterator iterator() {
+ public Iterator<E> iterator() {
return new Itr();
}
- private class Itr implements Iterator {
+ private class Itr implements Iterator<E> {
/*
- * Basic weak-consistent iterator. At all times hold the next
+ * Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
- private Node current;
- private Node lastRet;
- private Object currentElement;
+ private Node<E> current;
+ private Node<E> lastRet;
+ private E currentElement;
Itr() {
- synchronized (putLock) {
- synchronized (takeLock) {
- current = head.next;
- if (current != null)
- currentElement = current.item;
- }
+ fullyLock();
+ try {
+ current = head.next;
+ if (current != null)
+ currentElement = current.item;
+ } finally {
+ fullyUnlock();
}
}
@@ -654,45 +736,56 @@ public class LinkedBlockingQueue extends AbstractQueue
return current != null;
}
- public Object next() {
- synchronized (putLock) {
- synchronized (takeLock) {
- if (current == null)
- throw new NoSuchElementException();
- Object x = currentElement;
- lastRet = current;
- current = current.next;
- if (current != null)
- currentElement = current.item;
- return x;
- }
+ /**
+ * Returns the next live successor of p, or null if no such.
+ *
+ * Unlike other traversal methods, iterators need to handle both:
+ * - dequeued nodes (p.next == p)
+ * - (possibly multiple) interior removed nodes (p.item == null)
+ */
+ private Node<E> nextNode(Node<E> p) {
+ for (;;) {
+ Node<E> s = p.next;
+ if (s == p)
+ return head.next;
+ if (s == null || s.item != null)
+ return s;
+ p = s;
+ }
+ }
+
+ public E next() {
+ fullyLock();
+ try {
+ if (current == null)
+ throw new NoSuchElementException();
+ E x = currentElement;
+ lastRet = current;
+ current = nextNode(current);
+ currentElement = (current == null) ? null : current.item;
+ return x;
+ } finally {
+ fullyUnlock();
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
- synchronized (putLock) {
- synchronized (takeLock) {
- Node node = lastRet;
- lastRet = null;
- Node trail = head;
- Node p = head.next;
- while (p != null && p != node) {
- trail = p;
- p = p.next;
- }
+ fullyLock();
+ try {
+ Node<E> node = lastRet;
+ lastRet = null;
+ for (Node<E> trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
if (p == node) {
- p.item = null;
- trail.next = p.next;
- if (last == p)
- last = trail;
- int c;
- synchronized (this) { c = count--; }
- if (c == capacity)
- putLock.notifyAll();
+ unlink(p, trail);
+ break;
}
}
+ } finally {
+ fullyUnlock();
}
}
}
@@ -701,31 +794,33 @@ public class LinkedBlockingQueue extends AbstractQueue
* Save the state to a stream (that is, serialize it).
*
* @serialData The capacity is emitted (int), followed by all of
- * its elements (each an <tt>Object</tt>) in the proper order,
+ * its elements (each an {@code Object}) in the proper order,
* followed by a null
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
- synchronized (putLock) {
- synchronized (takeLock) {
- // Write out any hidden stuff, plus capacity
- s.defaultWriteObject();
+ fullyLock();
+ try {
+ // Write out any hidden stuff, plus capacity
+ s.defaultWriteObject();
- // Write out all elements in the proper order.
- for (Node p = head.next; p != null; p = p.next)
- s.writeObject(p.item);
+ // Write out all elements in the proper order.
+ for (Node<E> p = head.next; p != null; p = p.next)
+ s.writeObject(p.item);
- // Use trailing null as sentinel
- s.writeObject(null);
- }
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ } finally {
+ fullyUnlock();
}
}
/**
* Reconstitute this queue instance from a stream (that is,
* deserialize it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
@@ -733,19 +828,16 @@ public class LinkedBlockingQueue extends AbstractQueue
// Read in capacity, and any hidden stuff
s.defaultReadObject();
- synchronized (this) { count = 0; }
- last = head = new Node(null);
+ count.set(0);
+ last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
- Object item = (Object)s.readObject();
+ @SuppressWarnings("unchecked")
+ E item = (E)s.readObject();
if (item == null)
break;
add(item);
}
}
-
- private static class SerializableLock implements java.io.Serializable {
- private final static long serialVersionUID = -8856990691138858668L;
- }
}
diff --git a/test/files/jvm/actor-executor2.scala b/test/files/jvm/actor-executor2.scala
index da64a7fc43..f8fcaef69f 100644
--- a/test/files/jvm/actor-executor2.scala
+++ b/test/files/jvm/actor-executor2.scala
@@ -1,6 +1,6 @@
import scala.actors.{Actor, SchedulerAdapter, Exit}
import Actor._
-import java.util.concurrent.Executors
+import java.util.concurrent.{Executors, RejectedExecutionException}
object One extends AdaptedActor {
def act() {
@@ -57,9 +57,15 @@ object Test {
val scheduler =
new SchedulerAdapter {
def execute(block: => Unit) {
- executor.execute(new Runnable {
+ val task = new Runnable {
def run() { block }
- })
+ }
+ try {
+ executor.execute(task)
+ } catch {
+ case ree: RejectedExecutionException =>
+ task.run() // run task on current thread
+ }
}
}
diff --git a/test/files/run/t3645.scala b/test/files/run/t3645.scala
new file mode 100644
index 0000000000..af2543377b
--- /dev/null
+++ b/test/files/run/t3645.scala
@@ -0,0 +1,6 @@
+object Test {
+ def main(args: Array[String]) {
+ val s = Stream.tabulate(5)(x => x+2)
+ assert( s.toList == List(2,3,4,5,6) )
+ }
+}