summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-12-16 11:11:09 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-12-16 11:11:09 +0000
commitee740145d84970cb1942801fbac2afba9a590a3e (patch)
treee1c3a25d02862e5ebbcf799494a244f8e04bfbba /src/actors
parentc70776c00670f7b9ab5aff4a518d16f7f07c365d (diff)
downloadscala-ee740145d84970cb1942801fbac2afba9a590a3e.tar.gz
scala-ee740145d84970cb1942801fbac2afba9a590a3e.tar.bz2
scala-ee740145d84970cb1942801fbac2afba9a590a3e.zip
Use Thread.getState() instead of timestamps to ...
Use Thread.getState() instead of timestamps to detect blocked worker threads.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala8
-rw-r--r--src/actors/scala/actors/FJTaskRunnerGroup.java2
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala22
-rw-r--r--src/actors/scala/actors/Scheduler.scala120
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala8
5 files changed, 10 insertions, 150 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index af94a8a4e7..ad0927195e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -389,7 +389,6 @@ trait Actor extends AbstractActor {
* @param replyTo the reply destination
*/
def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
- tick()
if (waitingFor(msg)) {
received = Some(msg)
@@ -424,7 +423,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "receive from channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
- tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@@ -454,7 +452,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "receive from channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
- tick()
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@@ -506,7 +503,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
- tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@@ -534,7 +530,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
- tick()
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@@ -723,9 +718,6 @@ trait Actor extends AbstractActor {
scheduler execute task
}
- private def tick(): Unit =
- scheduler tick this
-
private[actors] var kill: () => Unit = () => {}
private def suspendActor() {
diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java
index 5485817614..d1f864c54a 100644
--- a/src/actors/scala/actors/FJTaskRunnerGroup.java
+++ b/src/actors/scala/actors/FJTaskRunnerGroup.java
@@ -122,7 +122,7 @@ package scala.actors;
public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
/** The threads in this group **/
- protected /*final*/ FJTaskRunner[] threads;
+ /*protected*/ /*final*/ FJTaskRunner[] threads;
/** Group-wide queue for tasks entered via execute() **/
/*protected*/ final LinkedQueue entryQueue = new LinkedQueue();
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 54c0f9dc28..e1cde0f2b1 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -13,6 +13,7 @@ package scala.actors
import compat.Platform
import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
+import java.lang.Thread.State
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
@@ -68,13 +69,10 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var terminating = false
private var suspending = false
- private var lastActivity = Platform.currentTime
-
private var submittedTasks = 0
def printActorDump {}
- private val TICK_FREQ = 50
private val CHECK_FREQ = 100
def onLockup(handler: () => Unit) =
@@ -87,6 +85,12 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var lockupHandler: () => Unit = null
+ private def allWorkersBlocked: Boolean =
+ executor.threads.forall(t => {
+ val s = t.getState()
+ s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
+ })
+
override def run() {
try {
while (!terminating) {
@@ -103,12 +107,11 @@ class FJTaskScheduler2 extends Thread with IScheduler {
ActorGC.gc()
// check if we need more threads
- if (Platform.currentTime - lastActivity >= TICK_FREQ
- && coreSize < maxSize
+ if (coreSize < maxSize
+ && allWorkersBlocked
&& executor.checkPoolSize()) {
//Debug.info(this+": increasing thread pool size")
coreSize += 1
- lastActivity = Platform.currentTime
}
else {
if (ActorGC.allTerminated) {
@@ -149,13 +152,6 @@ class FJTaskScheduler2 extends Thread with IScheduler {
def run() { fun }
})
- /**
- * @param a the actor
- */
- def tick(a: Actor) {
- lastActivity = Platform.currentTime
- }
-
/** Shuts down all idle worker threads.
*/
def shutdown(): Unit = synchronized {
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 764236f9d8..cdc2644adf 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -91,11 +91,6 @@ object Scheduler extends IScheduler {
sched execute { fun }
}
- /* This method is used to notify the scheduler
- * of library activity by the argument Actor.
- */
- def tick(a: Actor) = sched tick a
-
def shutdown() = sched.shutdown()
def onLockup(handler: () => Unit) = sched.onLockup(handler)
@@ -130,13 +125,6 @@ trait IScheduler {
*/
def execute(task: Runnable): Unit
- /** Notifies the scheduler about activity of the
- * executing actor.
- *
- * @param a the active actor
- */
- def tick(a: Actor): Unit
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit
@@ -152,15 +140,6 @@ trait IScheduler {
}
-trait WorkerThreadScheduler extends IScheduler {
- /**
- * @param worker the worker thread executing tasks
- * @return the task to be executed
- */
- def getTask(worker: WorkerThread): Runnable
-}
-
-
/**
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
@@ -179,8 +158,6 @@ class SingleThreadedScheduler extends IScheduler {
def run() { fun }
})
- def tick(a: Actor) {}
-
def shutdown() {}
def onLockup(handler: () => Unit) {}
@@ -203,100 +180,3 @@ private[actors] class QuitException extends Throwable {
*/
override def fillInStackTrace(): Throwable = this
}
-
-
-/**
- * <p>
- * The class <code>WorkerThread</code> is used by schedulers to execute
- * actor tasks on multiple threads.
- * </p>
- * <p>
- * !!ACHTUNG: If you change this, make sure you understand the following
- * proof of deadlock-freedom!!
- * </p>
- * <p>
- * We proof that there is no deadlock between the scheduler and
- * any worker thread possible. For this, note that the scheduler
- * only acquires the lock of a worker thread by calling
- * <code>execute</code>. This method is only called when the worker thread
- * is in the idle queue of the scheduler. On the other hand, a
- * worker thread only acquires the lock of the scheduler when it
- * calls <code>getTask</code>. At the only callsite of <code>getTask</code>,
- * the worker thread holds its own lock.
- * </p>
- * <p>
- * Thus, deadlock can only occur when a worker thread calls
- * <code>getTask</code> while it is in the idle queue of the scheduler,
- * because then the scheduler might call (at any time!) <code>execute</code>
- * which tries to acquire the lock of the worker thread. In such
- * a situation the worker thread would be waiting to acquire the
- * lock of the scheduler and vice versa.
- * </p>
- * <p>
- * Therefore, to prove deadlock-freedom, it suffices to ensure
- * that a worker thread will never call <code>getTask</code> when
- * it is in the idle queue of the scheduler.
- * </p>
- * <p>
- * A worker thread enters the idle queue of the scheduler when
- * <code>getTask</code> returns <code>null</code>. Then it will also stay
- * in the while-loop W (<code>while (task eq null)</code>) until
- * <code>task</code> becomes non-null. The only way this can happen is
- * through a call of <code>execute</code> by the scheduler. Before every
- * call of <code>execute</code> the worker thread is removed from the idle
- * queue of the scheduler. Only then--after executing its task--
- * the worker thread may call <code>getTask</code>. However, the scheduler
- * is unable to call <code>execute</code> as the worker thread is not in
- * the idle queue any more. In fact, the scheduler made sure
- * that this is the case even _before_ calling <code>execute</code> and
- * thus releasing the worker thread from the while-loop W. Thus,
- * the property holds for every possible interleaving of thread
- * execution. QED
- * </p>
- *
- * @version 0.9.18
- * @author Philipp Haller
- */
-class WorkerThread(sched: WorkerThreadScheduler) extends Thread {
- private var task: Runnable = null
- private[actors] var running = true
-
- def execute(r: Runnable) = synchronized {
- task = r
- notify()
- }
-
- override def run(): Unit =
- try {
- while (running) {
- if (task ne null) {
- try {
- task.run()
- } catch {
- case consumed: InterruptedException =>
- if (!running) throw new QuitException
- }
- }
- this.synchronized {
- task = sched getTask this
-
- while (task eq null) {
- try {
- wait()
- } catch {
- case consumed: InterruptedException =>
- if (!running) throw new QuitException
- }
- }
-
- if (task == sched.QUIT_TASK) {
- running = false
- }
- }
- }
- } catch {
- case consumed: QuitException =>
- // allow thread to quit
- }
-
-}
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index dc1e442f9a..cadc3b5e26 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -27,14 +27,6 @@ trait SchedulerAdapter extends IScheduler {
def execute(task: Runnable): Unit =
execute { task.run() }
- /** Notifies the scheduler about activity of the
- * executing actor.
- *
- * @param a the active actor
- */
- def tick(a: Actor): Unit =
- Scheduler tick a
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit =