summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-07-20 15:28:34 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-07-20 15:28:34 +0000
commit9fcf6dc3c6b32e03c64d340e285e1d7886593d20 (patch)
tree39b1fa5e251d62fba686e691e85316b9f11608eb /src/actors
parent670edfe22a90dc08928b95e947b1a72c9ad463bf (diff)
downloadscala-9fcf6dc3c6b32e03c64d340e285e1d7886593d20.tar.gz
scala-9fcf6dc3c6b32e03c64d340e285e1d7886593d20.tar.bz2
scala-9fcf6dc3c6b32e03c64d340e285e1d7886593d20.zip
Re-enabled snapshot for new ForkJoinPool.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/DefaultExecutorScheduler.scala5
-rw-r--r--src/actors/scala/actors/DrainableForkJoinPool.scala11
-rw-r--r--src/actors/scala/actors/ExecutorScheduler.scala13
-rw-r--r--src/actors/scala/actors/ForkJoinScheduler.scala85
-rw-r--r--src/actors/scala/actors/Scheduler.scala55
-rw-r--r--src/actors/scala/actors/SimpleExecutorScheduler.scala32
-rw-r--r--src/actors/scala/actors/TerminationMonitor.scala4
-rw-r--r--src/actors/scala/actors/TerminationService.scala3
-rw-r--r--src/actors/scala/actors/ThreadPoolScheduler.scala79
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinPool.java2
-rw-r--r--src/actors/scala/actors/forkjoin/Phaser.java960
12 files changed, 165 insertions, 1086 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 0f0a5f6390..f9e79ce98f 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -643,7 +643,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
scheduler executeFromActor task
}
- class ActorBlocker(timeout: Long) extends ManagedBlocker {
+ private class ActorBlocker(timeout: Long) extends ManagedBlocker {
def block() = {
if (timeout > 0)
Actor.this.suspendActorFor(timeout)
diff --git a/src/actors/scala/actors/DefaultExecutorScheduler.scala b/src/actors/scala/actors/DefaultExecutorScheduler.scala
index ec23262c6a..641425e0e0 100644
--- a/src/actors/scala/actors/DefaultExecutorScheduler.scala
+++ b/src/actors/scala/actors/DefaultExecutorScheduler.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue,
*
* @author Philipp Haller
*/
-class DefaultExecutorScheduler(daemon: Boolean) extends ExecutorScheduler {
+class DefaultExecutorScheduler(daemon: Boolean)
+ extends SchedulerService(daemon) with ExecutorScheduler {
def this() =
this(false)
@@ -45,7 +46,7 @@ class DefaultExecutorScheduler(daemon: Boolean) extends ExecutorScheduler {
workQueue,
threadFactory)
- executor = threadPool
+ val executor = threadPool
override val CHECK_FREQ = 50
}
diff --git a/src/actors/scala/actors/DrainableForkJoinPool.scala b/src/actors/scala/actors/DrainableForkJoinPool.scala
new file mode 100644
index 0000000000..c1b25f1de4
--- /dev/null
+++ b/src/actors/scala/actors/DrainableForkJoinPool.scala
@@ -0,0 +1,11 @@
+package scala.actors
+
+import java.util.Collection
+import forkjoin.{ForkJoinPool, ForkJoinTask}
+
+private class DrainableForkJoinPool extends ForkJoinPool {
+
+ override def drainTasksTo(c: Collection[ForkJoinTask[_]]): Int =
+ super.drainTasksTo(c)
+
+}
diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala
index 16a88c1539..389b4507f0 100644
--- a/src/actors/scala/actors/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/ExecutorScheduler.scala
@@ -18,15 +18,9 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException}
*
* @author Philipp Haller
*/
-class ExecutorScheduler(protected var executor: ExecutorService) extends SchedulerService {
+trait ExecutorScheduler extends IScheduler {
- /* This constructor (and the var above) is currently only used to work
- * around a bug in scaladoc, which cannot deal with early initializers
- * (to be used in subclasses such as DefaultExecutorScheduler) properly.
- */
- def this() {
- this(null)
- }
+ protected def executor: ExecutorService
/** Submits a <code>Runnable</code> for execution.
*
@@ -51,6 +45,9 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul
def onShutdown(): Unit =
executor.shutdown()
+ /** The scheduler is active if the underlying <code>ExecutorService</code>
+ * has not been shut down.
+ */
def isActive =
(executor ne null) && !executor.isShutdown
}
diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala
index 66122654a4..2c4a77bdd4 100644
--- a/src/actors/scala/actors/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/ForkJoinScheduler.scala
@@ -1,12 +1,25 @@
package scala.actors
import java.lang.Thread.State
+import java.util.{Collection, ArrayList}
import forkjoin._
-class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
+/** The <code>ForkJoinScheduler</code> is backed by a lightweight
+ * fork-join task execution framework.
+ *
+ * @author Philipp Haller
+ */
+class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor {
- private val pool = {
- val p = new ForkJoinPool()
+ private var pool = makeNewPool()
+ private var terminating = false
+ private var snapshoting = false
+ private var drainedTasks: Collection[ForkJoinTask[_]] = null
+
+ private val CHECK_FREQ = 10
+
+ private def makeNewPool(): DrainableForkJoinPool = {
+ val p = new DrainableForkJoinPool()
// enable locally FIFO scheduling mode
p.setAsyncMode(true)
Debug.info(this+": parallelism "+p.getParallelism())
@@ -14,14 +27,21 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
p
}
- private var terminating = false
-
- private val CHECK_FREQ = 50
-
- override def managedBlock(blocker: ManagedBlocker) {
- ForkJoinPool.managedBlock(blocker, true)
+ /** Starts this scheduler.
+ */
+ def start() {
+ (new Thread(this)).start()
}
+ private def allWorkersBlocked: Boolean =
+ (pool.workers != null) &&
+ pool.workers.forall(t => {
+ (t == null) || {
+ val s = t.getState()
+ s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
+ }
+ })
+
override def run() {
try {
while (true) {
@@ -31,6 +51,7 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
} catch {
case _: InterruptedException =>
}
+
if (terminating)
throw new QuitException
@@ -38,6 +59,19 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
//Debug.info(this+": all actors terminated")
throw new QuitException
}
+
+ if (!snapshoting) {
+ val poolSize = pool.getPoolSize()
+ if (allWorkersBlocked && (poolSize < ThreadPoolConfig.maxPoolSize)) {
+ pool.setParallelism(poolSize + 1)
+ }
+ } else if (pool.isQuiescent()) {
+ val list = new ArrayList[ForkJoinTask[_]]
+ val num = pool.drainTasksTo(list)
+ Debug.info(this+": drained "+num+" tasks")
+ drainedTasks = list
+ throw new QuitException
+ }
}
}
} catch {
@@ -75,6 +109,10 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
def run() { fun }
})
+ override def managedBlock(blocker: ManagedBlocker) {
+ ForkJoinPool.managedBlock(blocker, true)
+ }
+
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
@@ -82,6 +120,33 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
}
def isActive =
- !pool.isShutdown()
+ (pool ne null) && !pool.isShutdown()
+
+ /** Suspends the scheduler. All threads that were in use by the
+ * scheduler and its internal thread pool are terminated.
+ */
+ def snapshot() = synchronized {
+ snapshoting = true
+ }
+
+ /** Resumes the execution of the scheduler if it was previously
+ * suspended using <code>ForkJoinScheduler.snapshot</code>.
+ */
+ def restart() {
+ synchronized {
+ if (!snapshoting)
+ error("snapshot has not been invoked")
+ else if (isActive)
+ error("scheduler is still active")
+ else
+ snapshoting = false
+ }
+ pool = makeNewPool()
+ val iter = drainedTasks.iterator()
+ while (iter.hasNext()) {
+ pool.execute(iter.next())
+ }
+ start()
+ }
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index d81a9d1bc5..d3dd27678b 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -10,14 +10,12 @@
package scala.actors
-import java.lang.Runnable
import java.util.concurrent._
/**
* The <code>Scheduler</code> object is used by <code>Actor</code> to
* execute tasks of an execution of an actor.
*
- * @version 0.9.18
* @author Philipp Haller
*/
object Scheduler extends DelegatingScheduler {
@@ -38,53 +36,22 @@ object Scheduler extends DelegatingScheduler {
s
}
- private var tasks: LinkedQueue = null
-
- /* Assumes <code>sched</code> holds an instance
- * of <code>FJTaskScheduler2</code>.
+ /* Only <code>ForkJoinScheduler</code> implements this method.
*/
- @deprecated def snapshot(): Unit = synchronized {
- if (sched.isInstanceOf[FJTaskScheduler2]) {
- val fjts = sched.asInstanceOf[FJTaskScheduler2]
- tasks = fjts.snapshot()
- fjts.shutdown()
+ @deprecated def snapshot() {
+ if (sched.isInstanceOf[ForkJoinScheduler]) {
+ sched.asInstanceOf[ForkJoinScheduler].snapshot()
} else
- error("snapshot operation not supported.")
+ error("scheduler does not implement snapshot")
}
- /** Shuts down the current scheduler and creates and starts a new scheduler.
- *
- * If the current scheduler is an <code>FJTaskScheduler2</code>
- * a new scheduler of the same class is created. In that case,
- * tasks resulting from a <code>snapshot</code> are
- * submitted for execution.
- *
- * If the current scheduler is not an <code>FJTaskScheduler2</code>,
- * a <code>DefaultExecutorScheduler</code> is created.
+ /* Only <code>ForkJoinScheduler</code> implements this method.
*/
- def restart(): Unit = synchronized {
- // 1. shut down current scheduler
- if (sched ne null) {
- sched.shutdown()
- }
-
- // 2. create and start new scheduler
- if ((sched ne null) && sched.isInstanceOf[FJTaskScheduler2]) {
- sched = {
- val s = new FJTaskScheduler2
- s.start()
- s
- }
- if (tasks != null) {
- while (!tasks.isEmpty()) {
- sched.execute(tasks.take().asInstanceOf[FJTask])
- }
- tasks = null
- }
- } else {
- // will trigger creation of new delegate scheduler
- sched = null
- }
+ @deprecated def restart() {
+ if (sched.isInstanceOf[ForkJoinScheduler]) {
+ sched.asInstanceOf[ForkJoinScheduler].restart()
+ } else
+ error("scheduler does not implement restart")
}
}
diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala
index ffca87f87d..49f037971c 100644
--- a/src/actors/scala/actors/SimpleExecutorScheduler.scala
+++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala
@@ -10,8 +10,7 @@
package scala.actors
-import scala.collection.mutable.HashMap
-import java.util.concurrent.{ExecutorService, RejectedExecutionException}
+import java.util.concurrent.ExecutorService
/**
* The <code>SimpleExecutorScheduler</code> class uses an
@@ -29,7 +28,8 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException}
* @author Philipp Haller
*/
class SimpleExecutorScheduler(protected var executor: ExecutorService,
- protected var terminate: Boolean) extends TerminationService(terminate) {
+ protected var terminate: Boolean)
+ extends TerminationService(terminate) with ExecutorScheduler {
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
@@ -39,30 +39,4 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService,
this(null, true)
}
- /** Submits a <code>Runnable</code> for execution.
- *
- * @param task the task to be executed
- */
- def execute(task: Runnable) {
- try {
- executor execute task
- } catch {
- case ree: RejectedExecutionException =>
- // run task on current thread
- task.run()
- }
- }
-
- def executeFromActor(task: Runnable) =
- execute(task)
-
- def onShutdown() {
- executor.shutdown()
- }
-
- /** The scheduler is active if the underlying <code>ExecutorService</code>
- * has not been shut down.
- */
- def isActive =
- (executor ne null) && !executor.isShutdown()
}
diff --git a/src/actors/scala/actors/TerminationMonitor.scala b/src/actors/scala/actors/TerminationMonitor.scala
index ce0adc2e59..15fab26c3f 100644
--- a/src/actors/scala/actors/TerminationMonitor.scala
+++ b/src/actors/scala/actors/TerminationMonitor.scala
@@ -12,7 +12,7 @@ package scala.actors
import scala.collection.mutable.HashMap
-trait TerminationMonitor extends IScheduler {
+trait TerminationMonitor {
private var pendingReactions = 0
private val termHandlers = new HashMap[Reactor, () => Unit]
@@ -31,7 +31,7 @@ trait TerminationMonitor extends IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: Reactor)(f: => Unit) = synchronized {
+ def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized {
termHandlers += (a -> (() => f))
}
diff --git a/src/actors/scala/actors/TerminationService.scala b/src/actors/scala/actors/TerminationService.scala
index c983d2d558..930967a67b 100644
--- a/src/actors/scala/actors/TerminationService.scala
+++ b/src/actors/scala/actors/TerminationService.scala
@@ -20,7 +20,8 @@ import java.lang.{Runnable, Thread, InterruptedException}
*
* @author Philipp Haller
*/
-abstract class TerminationService(terminate: Boolean) extends Thread with TerminationMonitor {
+abstract class TerminationService(terminate: Boolean)
+ extends Thread with IScheduler with TerminationMonitor {
private var terminating = false
diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala
index 42cedf5f54..dd694f24ff 100644
--- a/src/actors/scala/actors/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/ThreadPoolScheduler.scala
@@ -15,8 +15,7 @@ import java.util.concurrent.{ThreadPoolExecutor, RejectedExecutionException}
/**
* The <code>ThreadPoolScheduler</code> class uses an
- * <code>ExecutorService</code> to execute <code>Actor</code>s. It
- * does not start an additional thread.
+ * <code>ThreadPoolExecutor</code> to execute <code>Actor</code>s.
*
* A <code>ThreadPoolScheduler</code> attempts to shut down
* the underlying <code>ExecutorService</code> only if
@@ -29,7 +28,11 @@ import java.util.concurrent.{ThreadPoolExecutor, RejectedExecutionException}
* @author Philipp Haller
*/
class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
- protected var terminate: Boolean) extends TerminationService(terminate) {
+ protected var terminate: Boolean)
+ extends Thread with TerminationMonitor with ExecutorScheduler {
+
+ private var terminating = false
+ protected val CHECK_FREQ = 10
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
@@ -39,38 +42,58 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
this(null, true)
}
- /** Submits a <code>Runnable</code> for execution.
- *
- * @param task the task to be executed
- */
- def execute(task: Runnable) {
- try {
- executor execute task
- } catch {
- case ree: RejectedExecutionException =>
- // run task on current thread
- task.run()
+ override def managedBlock(blocker: ManagedBlocker) {
+ val coreSize = executor.getCorePoolSize()
+ if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ executor.setCorePoolSize(coreSize + 1)
}
+ blocker.block()
}
- def executeFromActor(task: Runnable) =
- execute(task)
+ override def run() {
+ try {
+ while (true) {
+ this.synchronized {
+ try {
+ wait(CHECK_FREQ)
+ } catch {
+ case _: InterruptedException =>
+ }
+
+ if (terminating)
+ throw new QuitException
- def onShutdown() {
- executor.shutdown()
+ if (terminate && allTerminated)
+ throw new QuitException
+
+ val coreSize = executor.getCorePoolSize()
+ if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ executor.setCorePoolSize(coreSize + 1)
+ }
+ }
+ }
+ } catch {
+ case _: QuitException =>
+ Debug.info(this+": initiating shutdown...")
+ // invoke shutdown hook
+ onShutdown()
+ // allow thread to exit
+ }
}
- /** The scheduler is active if the underlying <code>ExecutorService</code>
- * has not been shut down.
+ /** Submits a closure for execution.
+ *
+ * @param fun the closure to be executed
*/
- def isActive =
- (executor ne null) && !executor.isShutdown()
+ def execute(fun: => Unit): Unit =
+ execute(new Runnable {
+ def run() { fun }
+ })
- override def managedBlock(blocker: ManagedBlocker) {
- val coreSize = executor.getCorePoolSize()
- if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
- executor.setCorePoolSize(coreSize + 1)
- }
- blocker.block()
+ /** Shuts down the scheduler.
+ */
+ def shutdown(): Unit = synchronized {
+ terminating = true
}
+
}
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinPool.java b/src/actors/scala/actors/forkjoin/ForkJoinPool.java
index e633d4bfed..cd4444ea97 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinPool.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinPool.java
@@ -143,7 +143,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* in a consistent enough state to be randomly accessed without
* locking by workers performing work-stealing.
*/
- volatile ForkJoinWorkerThread[] workers;
+ public volatile ForkJoinWorkerThread[] workers;
/**
* Lock protecting access to workers.
diff --git a/src/actors/scala/actors/forkjoin/Phaser.java b/src/actors/scala/actors/forkjoin/Phaser.java
deleted file mode 100644
index f87531a6b9..0000000000
--- a/src/actors/scala/actors/forkjoin/Phaser.java
+++ /dev/null
@@ -1,960 +0,0 @@
-/*
- * Written by Doug Lea with assistance from members of JCP JSR-166
- * Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/licenses/publicdomain
- */
-
-package scala.actors.forkjoin;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.LockSupport;
-import sun.misc.Unsafe;
-import java.lang.reflect.*;
-
-/**
- * A reusable synchronization barrier, similar in functionality to a
- * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
- * {@link java.util.concurrent.CountDownLatch CountDownLatch}
- * but supporting more flexible usage.
- *
- * <ul>
- *
- * <li> The number of parties synchronizing on a phaser may vary over
- * time. A task may register to be a party at any time, and may
- * deregister upon arriving at the barrier. As is the case with most
- * basic synchronization constructs, registration and deregistration
- * affect only internal counts; they do not establish any further
- * internal bookkeeping, so tasks cannot query whether they are
- * registered. (However, you can introduce such bookkeeping by
- * subclassing this class.)
- *
- * <li> Each generation has an associated phase value, starting at
- * zero, and advancing when all parties reach the barrier (wrapping
- * around to zero after reaching {@code Integer.MAX_VALUE}).
- *
- * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
- * Method {@code arriveAndAwaitAdvance} has effect analogous to
- * {@code CyclicBarrier.await}. However, Phasers separate two
- * aspects of coordination, that may also be invoked independently:
- *
- * <ul>
- *
- * <li> Arriving at a barrier. Methods {@code arrive} and
- * {@code arriveAndDeregister} do not block, but return
- * the phase value current upon entry to the method.
- *
- * <li> Awaiting others. Method {@code awaitAdvance} requires an
- * argument indicating the entry phase, and returns when the
- * barrier advances to a new phase.
- * </ul>
- *
- *
- * <li> Barrier actions, performed by the task triggering a phase
- * advance while others may be waiting, are arranged by overriding
- * method {@code onAdvance}, that also controls termination.
- * Overriding this method may be used to similar but more flexible
- * effect as providing a barrier action to a CyclicBarrier.
- *
- * <li> Phasers may enter a <em>termination</em> state in which all
- * actions immediately return without updating phaser state or waiting
- * for advance, and indicating (via a negative phase value) that
- * execution is complete. Termination is triggered by executing the
- * overridable {@code onAdvance} method that is invoked each time the
- * barrier is about to be tripped. When a Phaser is controlling an
- * action with a fixed number of iterations, it is often convenient to
- * override this method to cause termination when the current phase
- * number reaches a threshold. Method {@code forceTermination} is also
- * available to abruptly release waiting threads and allow them to
- * terminate.
- *
- * <li> Phasers may be tiered to reduce contention. Phasers with large
- * numbers of parties that would otherwise experience heavy
- * synchronization contention costs may instead be arranged in trees.
- * This will typically greatly increase throughput even though it
- * incurs somewhat greater per-operation overhead.
- *
- * <li> By default, {@code awaitAdvance} continues to wait even if
- * the waiting thread is interrupted. And unlike the case in
- * CyclicBarriers, exceptions encountered while tasks wait
- * interruptibly or with timeout do not change the state of the
- * barrier. If necessary, you can perform any associated recovery
- * within handlers of those exceptions, often after invoking
- * {@code forceTermination}.
- *
- * <li>Phasers ensure lack of starvation when used by ForkJoinTasks.
- *
- * </ul>
- *
- * <p><b>Sample usages:</b>
- *
- * <p>A Phaser may be used instead of a {@code CountDownLatch} to control
- * a one-shot action serving a variable number of parties. The typical
- * idiom is for the method setting this up to first register, then
- * start the actions, then deregister, as in:
- *
- * <pre>
- * void runTasks(List&lt;Runnable&gt; list) {
- * final Phaser phaser = new Phaser(1); // "1" to register self
- * for (Runnable r : list) {
- * phaser.register();
- * new Thread() {
- * public void run() {
- * phaser.arriveAndAwaitAdvance(); // await all creation
- * r.run();
- * phaser.arriveAndDeregister(); // signal completion
- * }
- * }.start();
- * }
- *
- * doSomethingOnBehalfOfWorkers();
- * phaser.arrive(); // allow threads to start
- * int p = phaser.arriveAndDeregister(); // deregister self ...
- * p = phaser.awaitAdvance(p); // ... and await arrival
- * otherActions(); // do other things while tasks execute
- * phaser.awaitAdvance(p); // await final completion
- * }
- * </pre>
- *
- * <p>One way to cause a set of threads to repeatedly perform actions
- * for a given number of iterations is to override {@code onAdvance}:
- *
- * <pre>
- * void startTasks(List&lt;Runnable&gt; list, final int iterations) {
- * final Phaser phaser = new Phaser() {
- * public boolean onAdvance(int phase, int registeredParties) {
- * return phase &gt;= iterations || registeredParties == 0;
- * }
- * };
- * phaser.register();
- * for (Runnable r : list) {
- * phaser.register();
- * new Thread() {
- * public void run() {
- * do {
- * r.run();
- * phaser.arriveAndAwaitAdvance();
- * } while(!phaser.isTerminated();
- * }
- * }.start();
- * }
- * phaser.arriveAndDeregister(); // deregister self, don't wait
- * }
- * </pre>
- *
- * <p> To create a set of tasks using a tree of Phasers,
- * you could use code of the following form, assuming a
- * Task class with a constructor accepting a Phaser that
- * it registers for upon construction:
- * <pre>
- * void build(Task[] actions, int lo, int hi, Phaser b) {
- * int step = (hi - lo) / TASKS_PER_PHASER;
- * if (step &gt; 1) {
- * int i = lo;
- * while (i &lt; hi) {
- * int r = Math.min(i + step, hi);
- * build(actions, i, r, new Phaser(b));
- * i = r;
- * }
- * }
- * else {
- * for (int i = lo; i &lt; hi; ++i)
- * actions[i] = new Task(b);
- * // assumes new Task(b) performs b.register()
- * }
- * }
- * // .. initially called, for n tasks via
- * build(new Task[n], 0, n, new Phaser());
- * </pre>
- *
- * The best value of {@code TASKS_PER_PHASER} depends mainly on
- * expected barrier synchronization rates. A value as low as four may
- * be appropriate for extremely small per-barrier task bodies (thus
- * high rates), or up to hundreds for extremely large ones.
- *
- * </pre>
- *
- * <p><b>Implementation notes</b>: This implementation restricts the
- * maximum number of parties to 65535. Attempts to register additional
- * parties result in IllegalStateExceptions. However, you can and
- * should create tiered phasers to accommodate arbitrarily large sets
- * of participants.
- */
-public class Phaser {
- /*
- * This class implements an extension of X10 "clocks". Thanks to
- * Vijay Saraswat for the idea, and to Vivek Sarkar for
- * enhancements to extend functionality.
- */
-
- /**
- * Barrier state representation. Conceptually, a barrier contains
- * four values:
- *
- * * parties -- the number of parties to wait (16 bits)
- * * unarrived -- the number of parties yet to hit barrier (16 bits)
- * * phase -- the generation of the barrier (31 bits)
- * * terminated -- set if barrier is terminated (1 bit)
- *
- * However, to efficiently maintain atomicity, these values are
- * packed into a single (atomic) long. Termination uses the sign
- * bit of 32 bit representation of phase, so phase is set to -1 on
- * termination. Good performance relies on keeping state decoding
- * and encoding simple, and keeping race windows short.
- *
- * Note: there are some cheats in arrive() that rely on unarrived
- * count being lowest 16 bits.
- */
- private volatile long state;
-
- private static final int ushortBits = 16;
- private static final int ushortMask = 0xffff;
- private static final int phaseMask = 0x7fffffff;
-
- private static int unarrivedOf(long s) {
- return (int)(s & ushortMask);
- }
-
- private static int partiesOf(long s) {
- return ((int)s) >>> 16;
- }
-
- private static int phaseOf(long s) {
- return (int)(s >>> 32);
- }
-
- private static int arrivedOf(long s) {
- return partiesOf(s) - unarrivedOf(s);
- }
-
- private static long stateFor(int phase, int parties, int unarrived) {
- return ((((long)phase) << 32) | (((long)parties) << 16) |
- (long)unarrived);
- }
-
- private static long trippedStateFor(int phase, int parties) {
- long lp = (long)parties;
- return (((long)phase) << 32) | (lp << 16) | lp;
- }
-
- /**
- * Returns message string for bad bounds exceptions
- */
- private static String badBounds(int parties, int unarrived) {
- return ("Attempt to set " + unarrived +
- " unarrived of " + parties + " parties");
- }
-
- /**
- * The parent of this phaser, or null if none
- */
- private final Phaser parent;
-
- /**
- * The root of Phaser tree. Equals this if not in a tree. Used to
- * support faster state push-down.
- */
- private final Phaser root;
-
- // Wait queues
-
- /**
- * Heads of Treiber stacks for waiting threads. To eliminate
- * contention while releasing some threads while adding others, we
- * use two of them, alternating across even and odd phases.
- */
- private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
- private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
-
- private AtomicReference<QNode> queueFor(int phase) {
- return (phase & 1) == 0? evenQ : oddQ;
- }
-
- /**
- * Returns current state, first resolving lagged propagation from
- * root if necessary.
- */
- private long getReconciledState() {
- return parent == null? state : reconcileState();
- }
-
- /**
- * Recursively resolves state.
- */
- private long reconcileState() {
- Phaser p = parent;
- long s = state;
- if (p != null) {
- while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
- long parentState = p.getReconciledState();
- int parentPhase = phaseOf(parentState);
- int phase = phaseOf(s = state);
- if (phase != parentPhase) {
- long next = trippedStateFor(parentPhase, partiesOf(s));
- if (casState(s, next)) {
- releaseWaiters(phase);
- s = next;
- }
- }
- }
- }
- return s;
- }
-
- /**
- * Creates a new Phaser without any initially registered parties,
- * initial phase number 0, and no parent. Any thread using this
- * Phaser will need to first register for it.
- */
- public Phaser() {
- this(null);
- }
-
- /**
- * Creates a new Phaser with the given numbers of registered
- * unarrived parties, initial phase number 0, and no parent.
- * @param parties the number of parties required to trip barrier.
- * @throws IllegalArgumentException if parties less than zero
- * or greater than the maximum number of parties supported.
- */
- public Phaser(int parties) {
- this(null, parties);
- }
-
- /**
- * Creates a new Phaser with the given parent, without any
- * initially registered parties. If parent is non-null this phaser
- * is registered with the parent and its initial phase number is
- * the same as that of parent phaser.
- * @param parent the parent phaser.
- */
- public Phaser(Phaser parent) {
- int phase = 0;
- this.parent = parent;
- if (parent != null) {
- this.root = parent.root;
- phase = parent.register();
- }
- else
- this.root = this;
- this.state = trippedStateFor(phase, 0);
- }
-
- /**
- * Creates a new Phaser with the given parent and numbers of
- * registered unarrived parties. If parent is non-null this phaser
- * is registered with the parent and its initial phase number is
- * the same as that of parent phaser.
- * @param parent the parent phaser.
- * @param parties the number of parties required to trip barrier.
- * @throws IllegalArgumentException if parties less than zero
- * or greater than the maximum number of parties supported.
- */
- public Phaser(Phaser parent, int parties) {
- if (parties < 0 || parties > ushortMask)
- throw new IllegalArgumentException("Illegal number of parties");
- int phase = 0;
- this.parent = parent;
- if (parent != null) {
- this.root = parent.root;
- phase = parent.register();
- }
- else
- this.root = this;
- this.state = trippedStateFor(phase, parties);
- }
-
- /**
- * Adds a new unarrived party to this phaser.
- * @return the current barrier phase number upon registration
- * @throws IllegalStateException if attempting to register more
- * than the maximum supported number of parties.
- */
- public int register() {
- return doRegister(1);
- }
-
- /**
- * Adds the given number of new unarrived parties to this phaser.
- * @param parties the number of parties required to trip barrier.
- * @return the current barrier phase number upon registration
- * @throws IllegalStateException if attempting to register more
- * than the maximum supported number of parties.
- */
- public int bulkRegister(int parties) {
- if (parties < 0)
- throw new IllegalArgumentException();
- if (parties == 0)
- return getPhase();
- return doRegister(parties);
- }
-
- /**
- * Shared code for register, bulkRegister
- */
- private int doRegister(int registrations) {
- int phase;
- for (;;) {
- long s = getReconciledState();
- phase = phaseOf(s);
- int unarrived = unarrivedOf(s) + registrations;
- int parties = partiesOf(s) + registrations;
- if (phase < 0)
- break;
- if (parties > ushortMask || unarrived > ushortMask)
- throw new IllegalStateException(badBounds(parties, unarrived));
- if (phase == phaseOf(root.state) &&
- casState(s, stateFor(phase, parties, unarrived)))
- break;
- }
- return phase;
- }
-
- /**
- * Arrives at the barrier, but does not wait for others. (You can
- * in turn wait for others via {@link #awaitAdvance}).
- *
- * @return the barrier phase number upon entry to this method, or a
- * negative value if terminated;
- * @throws IllegalStateException if not terminated and the number
- * of unarrived parties would become negative.
- */
- public int arrive() {
- int phase;
- for (;;) {
- long s = state;
- phase = phaseOf(s);
- if (phase < 0)
- break;
- int parties = partiesOf(s);
- int unarrived = unarrivedOf(s) - 1;
- if (unarrived > 0) { // Not the last arrival
- if (casState(s, s - 1)) // s-1 adds one arrival
- break;
- }
- else if (unarrived == 0) { // the last arrival
- Phaser par = parent;
- if (par == null) { // directly trip
- if (casState
- (s,
- trippedStateFor(onAdvance(phase, parties)? -1 :
- ((phase + 1) & phaseMask), parties))) {
- releaseWaiters(phase);
- break;
- }
- }
- else { // cascade to parent
- if (casState(s, s - 1)) { // zeroes unarrived
- par.arrive();
- reconcileState();
- break;
- }
- }
- }
- else if (phase != phaseOf(root.state)) // or if unreconciled
- reconcileState();
- else
- throw new IllegalStateException(badBounds(parties, unarrived));
- }
- return phase;
- }
-
- /**
- * Arrives at the barrier, and deregisters from it, without
- * waiting for others. Deregistration reduces number of parties
- * required to trip the barrier in future phases. If this phaser
- * has a parent, and deregistration causes this phaser to have
- * zero parties, this phaser is also deregistered from its parent.
- *
- * @return the current barrier phase number upon entry to
- * this method, or a negative value if terminated;
- * @throws IllegalStateException if not terminated and the number
- * of registered or unarrived parties would become negative.
- */
- public int arriveAndDeregister() {
- // similar code to arrive, but too different to merge
- Phaser par = parent;
- int phase;
- for (;;) {
- long s = state;
- phase = phaseOf(s);
- if (phase < 0)
- break;
- int parties = partiesOf(s) - 1;
- int unarrived = unarrivedOf(s) - 1;
- if (parties >= 0) {
- if (unarrived > 0 || (unarrived == 0 && par != null)) {
- if (casState
- (s,
- stateFor(phase, parties, unarrived))) {
- if (unarrived == 0) {
- par.arriveAndDeregister();
- reconcileState();
- }
- break;
- }
- continue;
- }
- if (unarrived == 0) {
- if (casState
- (s,
- trippedStateFor(onAdvance(phase, parties)? -1 :
- ((phase + 1) & phaseMask), parties))) {
- releaseWaiters(phase);
- break;
- }
- continue;
- }
- if (par != null && phase != phaseOf(root.state)) {
- reconcileState();
- continue;
- }
- }
- throw new IllegalStateException(badBounds(parties, unarrived));
- }
- return phase;
- }
-
- /**
- * Arrives at the barrier and awaits others. Equivalent in effect
- * to {@code awaitAdvance(arrive())}. If you instead need to
- * await with interruption of timeout, and/or deregister upon
- * arrival, you can arrange them using analogous constructions.
- * @return the phase on entry to this method
- * @throws IllegalStateException if not terminated and the number
- * of unarrived parties would become negative.
- */
- public int arriveAndAwaitAdvance() {
- return awaitAdvance(arrive());
- }
-
- /**
- * Awaits the phase of the barrier to advance from the given
- * value, or returns immediately if argument is negative or this
- * barrier is terminated.
- * @param phase the phase on entry to this method
- * @return the phase on exit from this method
- */
- public int awaitAdvance(int phase) {
- if (phase < 0)
- return phase;
- long s = getReconciledState();
- int p = phaseOf(s);
- if (p != phase)
- return p;
- if (unarrivedOf(s) == 0 && parent != null)
- parent.awaitAdvance(phase);
- // Fall here even if parent waited, to reconcile and help release
- return untimedWait(phase);
- }
-
- /**
- * Awaits the phase of the barrier to advance from the given
- * value, or returns immediately if argument is negative or this
- * barrier is terminated, or throws InterruptedException if
- * interrupted while waiting.
- * @param phase the phase on entry to this method
- * @return the phase on exit from this method
- * @throws InterruptedException if thread interrupted while waiting
- */
- public int awaitAdvanceInterruptibly(int phase)
- throws InterruptedException {
- if (phase < 0)
- return phase;
- long s = getReconciledState();
- int p = phaseOf(s);
- if (p != phase)
- return p;
- if (unarrivedOf(s) == 0 && parent != null)
- parent.awaitAdvanceInterruptibly(phase);
- return interruptibleWait(phase);
- }
-
- /**
- * Awaits the phase of the barrier to advance from the given value
- * or the given timeout elapses, or returns immediately if
- * argument is negative or this barrier is terminated.
- * @param phase the phase on entry to this method
- * @return the phase on exit from this method
- * @throws InterruptedException if thread interrupted while waiting
- * @throws TimeoutException if timed out while waiting
- */
- public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException {
- if (phase < 0)
- return phase;
- long s = getReconciledState();
- int p = phaseOf(s);
- if (p != phase)
- return p;
- if (unarrivedOf(s) == 0 && parent != null)
- parent.awaitAdvanceInterruptibly(phase, timeout, unit);
- return timedWait(phase, unit.toNanos(timeout));
- }
-
- /**
- * Forces this barrier to enter termination state. Counts of
- * arrived and registered parties are unaffected. If this phaser
- * has a parent, it too is terminated. This method may be useful
- * for coordinating recovery after one or more tasks encounter
- * unexpected exceptions.
- */
- public void forceTermination() {
- for (;;) {
- long s = getReconciledState();
- int phase = phaseOf(s);
- int parties = partiesOf(s);
- int unarrived = unarrivedOf(s);
- if (phase < 0 ||
- casState(s, stateFor(-1, parties, unarrived))) {
- releaseWaiters(0);
- releaseWaiters(1);
- if (parent != null)
- parent.forceTermination();
- return;
- }
- }
- }
-
- /**
- * Returns the current phase number. The maximum phase number is
- * {@code Integer.MAX_VALUE}, after which it restarts at
- * zero. Upon termination, the phase number is negative.
- * @return the phase number, or a negative value if terminated
- */
- public final int getPhase() {
- return phaseOf(getReconciledState());
- }
-
- /**
- * Returns {@code true} if the current phase number equals the given phase.
- * @param phase the phase
- * @return {@code true} if the current phase number equals the given phase
- */
- public final boolean hasPhase(int phase) {
- return phaseOf(getReconciledState()) == phase;
- }
-
- /**
- * Returns the number of parties registered at this barrier.
- * @return the number of parties
- */
- public int getRegisteredParties() {
- return partiesOf(state);
- }
-
- /**
- * Returns the number of parties that have arrived at the current
- * phase of this barrier.
- * @return the number of arrived parties
- */
- public int getArrivedParties() {
- return arrivedOf(state);
- }
-
- /**
- * Returns the number of registered parties that have not yet
- * arrived at the current phase of this barrier.
- * @return the number of unarrived parties
- */
- public int getUnarrivedParties() {
- return unarrivedOf(state);
- }
-
- /**
- * Returns the parent of this phaser, or null if none.
- * @return the parent of this phaser, or null if none
- */
- public Phaser getParent() {
- return parent;
- }
-
- /**
- * Returns the root ancestor of this phaser, which is the same as
- * this phaser if it has no parent.
- * @return the root ancestor of this phaser
- */
- public Phaser getRoot() {
- return root;
- }
-
- /**
- * Returns {@code true} if this barrier has been terminated.
- * @return {@code true} if this barrier has been terminated
- */
- public boolean isTerminated() {
- return getPhase() < 0;
- }
-
- /**
- * Overridable method to perform an action upon phase advance, and
- * to control termination. This method is invoked whenever the
- * barrier is tripped (and thus all other waiting parties are
- * dormant). If it returns true, then, rather than advance the
- * phase number, this barrier will be set to a final termination
- * state, and subsequent calls to {@code isTerminated} will
- * return true.
- *
- * <p> The default version returns true when the number of
- * registered parties is zero. Normally, overrides that arrange
- * termination for other reasons should also preserve this
- * property.
- *
- * <p> You may override this method to perform an action with side
- * effects visible to participating tasks, but it is in general
- * only sensible to do so in designs where all parties register
- * before any arrive, and all {@code awaitAdvance} at each phase.
- * Otherwise, you cannot ensure lack of interference. In
- * particular, this method may be invoked more than once per
- * transition if other parties successfully register while the
- * invocation of this method is in progress, thus postponing the
- * transition until those parties also arrive, re-triggering this
- * method.
- *
- * @param phase the phase number on entering the barrier
- * @param registeredParties the current number of registered parties
- * @return {@code true} if this barrier should terminate
- */
- protected boolean onAdvance(int phase, int registeredParties) {
- return registeredParties <= 0;
- }
-
- /**
- * Returns a string identifying this phaser, as well as its
- * state. The state, in brackets, includes the String {@code
- * "phase = "} followed by the phase number, {@code "parties = "}
- * followed by the number of registered parties, and {@code
- * "arrived = "} followed by the number of arrived parties.
- *
- * @return a string identifying this barrier, as well as its state
- */
- public String toString() {
- long s = getReconciledState();
- return super.toString() +
- "[phase = " + phaseOf(s) +
- " parties = " + partiesOf(s) +
- " arrived = " + arrivedOf(s) + "]";
- }
-
- // methods for waiting
-
- /**
- * Wait nodes for Treiber stack representing wait queue
- */
- static final class QNode implements ForkJoinPool.ManagedBlocker {
- final Phaser phaser;
- final int phase;
- final long startTime;
- final long nanos;
- final boolean timed;
- final boolean interruptible;
- volatile boolean wasInterrupted = false;
- volatile Thread thread; // nulled to cancel wait
- QNode next;
- QNode(Phaser phaser, int phase, boolean interruptible,
- boolean timed, long startTime, long nanos) {
- this.phaser = phaser;
- this.phase = phase;
- this.timed = timed;
- this.interruptible = interruptible;
- this.startTime = startTime;
- this.nanos = nanos;
- thread = Thread.currentThread();
- }
- public boolean isReleasable() {
- return (thread == null ||
- phaser.getPhase() != phase ||
- (interruptible && wasInterrupted) ||
- (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
- }
- public boolean block() {
- if (Thread.interrupted()) {
- wasInterrupted = true;
- if (interruptible)
- return true;
- }
- if (!timed)
- LockSupport.park();//TR (this);
- else {
- long waitTime = nanos - (System.nanoTime() - startTime);
- if (waitTime <= 0)
- return true;
- LockSupport.parkNanos(waitTime);//TR (this, waitTime);
- }
- return isReleasable();
- }
- void signal() {
- Thread t = thread;
- if (t != null) {
- thread = null;
- LockSupport.unpark(t);
- }
- }
- boolean doWait() {
- if (thread != null) {
- try {
- ForkJoinPool.managedBlock(this, false);
- } catch (InterruptedException ie) {
- }
- }
- return wasInterrupted;
- }
-
- }
-
- /**
- * Removes and signals waiting threads from wait queue
- */
- private void releaseWaiters(int phase) {
- AtomicReference<QNode> head = queueFor(phase);
- QNode q;
- while ((q = head.get()) != null) {
- if (head.compareAndSet(q, q.next))
- q.signal();
- }
- }
-
- /**
- * Tries to enqueue given node in the appropriate wait queue
- * @return true if successful
- */
- private boolean tryEnqueue(QNode node) {
- AtomicReference<QNode> head = queueFor(node.phase);
- return head.compareAndSet(node.next = head.get(), node);
- }
-
- /**
- * Enqueues node and waits unless aborted or signalled.
- * @return current phase
- */
- private int untimedWait(int phase) {
- QNode node = null;
- boolean queued = false;
- boolean interrupted = false;
- int p;
- while ((p = getPhase()) == phase) {
- if (Thread.interrupted())
- interrupted = true;
- else if (node == null)
- node = new QNode(this, phase, false, false, 0, 0);
- else if (!queued)
- queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
- }
- if (node != null)
- node.thread = null;
- releaseWaiters(phase);
- if (interrupted)
- Thread.currentThread().interrupt();
- return p;
- }
-
- /**
- * Interruptible version
- * @return current phase
- */
- private int interruptibleWait(int phase) throws InterruptedException {
- QNode node = null;
- boolean queued = false;
- boolean interrupted = false;
- int p;
- while ((p = getPhase()) == phase && !interrupted) {
- if (Thread.interrupted())
- interrupted = true;
- else if (node == null)
- node = new QNode(this, phase, true, false, 0, 0);
- else if (!queued)
- queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
- }
- if (node != null)
- node.thread = null;
- if (p != phase || (p = getPhase()) != phase)
- releaseWaiters(phase);
- if (interrupted)
- throw new InterruptedException();
- return p;
- }
-
- /**
- * Timeout version.
- * @return current phase
- */
- private int timedWait(int phase, long nanos)
- throws InterruptedException, TimeoutException {
- long startTime = System.nanoTime();
- QNode node = null;
- boolean queued = false;
- boolean interrupted = false;
- int p;
- while ((p = getPhase()) == phase && !interrupted) {
- if (Thread.interrupted())
- interrupted = true;
- else if (nanos - (System.nanoTime() - startTime) <= 0)
- break;
- else if (node == null)
- node = new QNode(this, phase, true, true, startTime, nanos);
- else if (!queued)
- queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
- }
- if (node != null)
- node.thread = null;
- if (p != phase || (p = getPhase()) != phase)
- releaseWaiters(phase);
- if (interrupted)
- throw new InterruptedException();
- if (p == phase)
- throw new TimeoutException();
- return p;
- }
-
- // Temporary Unsafe mechanics for preliminary release
- private static Unsafe getUnsafe() throws Throwable {
- try {
- return Unsafe.getUnsafe();
- } catch (SecurityException se) {
- try {
- return java.security.AccessController.doPrivileged
- (new java.security.PrivilegedExceptionAction<Unsafe>() {
- public Unsafe run() throws Exception {
- return getUnsafePrivileged();
- }});
- } catch (java.security.PrivilegedActionException e) {
- throw e.getCause();
- }
- }
- }
-
- private static Unsafe getUnsafePrivileged()
- throws NoSuchFieldException, IllegalAccessException {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- return (Unsafe) f.get(null);
- }
-
- private static long fieldOffset(String fieldName)
- throws NoSuchFieldException {
- return _unsafe.objectFieldOffset
- (Phaser.class.getDeclaredField(fieldName));
- }
-
- static final Unsafe _unsafe;
- static final long stateOffset;
-
- static {
- try {
- _unsafe = getUnsafe();
- stateOffset = fieldOffset("state");
- } catch (Throwable e) {
- throw new RuntimeException("Could not initialize intrinsics", e);
- }
- }
-
- final boolean casState(long cmp, long val) {
- return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val);
- }
-}