From ee740145d84970cb1942801fbac2afba9a590a3e Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 16 Dec 2008 11:11:09 +0000 Subject: Use Thread.getState() instead of timestamps to ... Use Thread.getState() instead of timestamps to detect blocked worker threads. --- src/actors/scala/actors/Actor.scala | 8 -- src/actors/scala/actors/FJTaskRunnerGroup.java | 2 +- src/actors/scala/actors/FJTaskScheduler2.scala | 22 ++--- src/actors/scala/actors/Scheduler.scala | 120 ------------------------- src/actors/scala/actors/SchedulerAdapter.scala | 8 -- 5 files changed, 10 insertions(+), 150 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index af94a8a4e7..ad0927195e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -389,7 +389,6 @@ trait Actor extends AbstractActor { * @param replyTo the reply destination */ def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - tick() if (waitingFor(msg)) { received = Some(msg) @@ -424,7 +423,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -454,7 +452,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) @@ -506,7 +503,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -534,7 +530,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) @@ -723,9 +718,6 @@ trait Actor extends AbstractActor { scheduler execute task } - private def tick(): Unit = - scheduler tick this - private[actors] var kill: () => Unit = () => {} private def suspendActor() { diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java index 5485817614..d1f864c54a 100644 --- a/src/actors/scala/actors/FJTaskRunnerGroup.java +++ b/src/actors/scala/actors/FJTaskRunnerGroup.java @@ -122,7 +122,7 @@ package scala.actors; public class FJTaskRunnerGroup implements IFJTaskRunnerGroup { /** The threads in this group **/ - protected /*final*/ FJTaskRunner[] threads; + /*protected*/ /*final*/ FJTaskRunner[] threads; /** Group-wide queue for tasks entered via execute() **/ /*protected*/ final LinkedQueue entryQueue = new LinkedQueue(); diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 54c0f9dc28..e1cde0f2b1 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -13,6 +13,7 @@ package scala.actors import compat.Platform import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} +import java.lang.Thread.State import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} @@ -68,13 +69,10 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var terminating = false private var suspending = false - private var lastActivity = Platform.currentTime - private var submittedTasks = 0 def printActorDump {} - private val TICK_FREQ = 50 private val CHECK_FREQ = 100 def onLockup(handler: () => Unit) = @@ -87,6 +85,12 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var lockupHandler: () => Unit = null + private def allWorkersBlocked: Boolean = + executor.threads.forall(t => { + val s = t.getState() + s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING + }) + override def run() { try { while (!terminating) { @@ -103,12 +107,11 @@ class FJTaskScheduler2 extends Thread with IScheduler { ActorGC.gc() // check if we need more threads - if (Platform.currentTime - lastActivity >= TICK_FREQ - && coreSize < maxSize + if (coreSize < maxSize + && allWorkersBlocked && executor.checkPoolSize()) { //Debug.info(this+": increasing thread pool size") coreSize += 1 - lastActivity = Platform.currentTime } else { if (ActorGC.allTerminated) { @@ -149,13 +152,6 @@ class FJTaskScheduler2 extends Thread with IScheduler { def run() { fun } }) - /** - * @param a the actor - */ - def tick(a: Actor) { - lastActivity = Platform.currentTime - } - /** Shuts down all idle worker threads. */ def shutdown(): Unit = synchronized { diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 764236f9d8..cdc2644adf 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -91,11 +91,6 @@ object Scheduler extends IScheduler { sched execute { fun } } - /* This method is used to notify the scheduler - * of library activity by the argument Actor. - */ - def tick(a: Actor) = sched tick a - def shutdown() = sched.shutdown() def onLockup(handler: () => Unit) = sched.onLockup(handler) @@ -130,13 +125,6 @@ trait IScheduler { */ def execute(task: Runnable): Unit - /** Notifies the scheduler about activity of the - * executing actor. - * - * @param a the active actor - */ - def tick(a: Actor): Unit - /** Shuts down the scheduler. */ def shutdown(): Unit @@ -152,15 +140,6 @@ trait IScheduler { } -trait WorkerThreadScheduler extends IScheduler { - /** - * @param worker the worker thread executing tasks - * @return the task to be executed - */ - def getTask(worker: WorkerThread): Runnable -} - - /** * This scheduler executes the tasks of an actor on a single * thread (the current thread). @@ -179,8 +158,6 @@ class SingleThreadedScheduler extends IScheduler { def run() { fun } }) - def tick(a: Actor) {} - def shutdown() {} def onLockup(handler: () => Unit) {} @@ -203,100 +180,3 @@ private[actors] class QuitException extends Throwable { */ override def fillInStackTrace(): Throwable = this } - - -/** - *

- * The class WorkerThread is used by schedulers to execute - * actor tasks on multiple threads. - *

- *

- * !!ACHTUNG: If you change this, make sure you understand the following - * proof of deadlock-freedom!! - *

- *

- * We proof that there is no deadlock between the scheduler and - * any worker thread possible. For this, note that the scheduler - * only acquires the lock of a worker thread by calling - * execute. This method is only called when the worker thread - * is in the idle queue of the scheduler. On the other hand, a - * worker thread only acquires the lock of the scheduler when it - * calls getTask. At the only callsite of getTask, - * the worker thread holds its own lock. - *

- *

- * Thus, deadlock can only occur when a worker thread calls - * getTask while it is in the idle queue of the scheduler, - * because then the scheduler might call (at any time!) execute - * which tries to acquire the lock of the worker thread. In such - * a situation the worker thread would be waiting to acquire the - * lock of the scheduler and vice versa. - *

- *

- * Therefore, to prove deadlock-freedom, it suffices to ensure - * that a worker thread will never call getTask when - * it is in the idle queue of the scheduler. - *

- *

- * A worker thread enters the idle queue of the scheduler when - * getTask returns null. Then it will also stay - * in the while-loop W (while (task eq null)) until - * task becomes non-null. The only way this can happen is - * through a call of execute by the scheduler. Before every - * call of execute the worker thread is removed from the idle - * queue of the scheduler. Only then--after executing its task-- - * the worker thread may call getTask. However, the scheduler - * is unable to call execute as the worker thread is not in - * the idle queue any more. In fact, the scheduler made sure - * that this is the case even _before_ calling execute and - * thus releasing the worker thread from the while-loop W. Thus, - * the property holds for every possible interleaving of thread - * execution. QED - *

- * - * @version 0.9.18 - * @author Philipp Haller - */ -class WorkerThread(sched: WorkerThreadScheduler) extends Thread { - private var task: Runnable = null - private[actors] var running = true - - def execute(r: Runnable) = synchronized { - task = r - notify() - } - - override def run(): Unit = - try { - while (running) { - if (task ne null) { - try { - task.run() - } catch { - case consumed: InterruptedException => - if (!running) throw new QuitException - } - } - this.synchronized { - task = sched getTask this - - while (task eq null) { - try { - wait() - } catch { - case consumed: InterruptedException => - if (!running) throw new QuitException - } - } - - if (task == sched.QUIT_TASK) { - running = false - } - } - } - } catch { - case consumed: QuitException => - // allow thread to quit - } - -} diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index dc1e442f9a..cadc3b5e26 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -27,14 +27,6 @@ trait SchedulerAdapter extends IScheduler { def execute(task: Runnable): Unit = execute { task.run() } - /** Notifies the scheduler about activity of the - * executing actor. - * - * @param a the active actor - */ - def tick(a: Actor): Unit = - Scheduler tick a - /** Shuts down the scheduler. */ def shutdown(): Unit = -- cgit v1.2.3