From 941d559c7ba17f65d4e07afbbaf8a28db7a71dc5 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 16 Dec 2008 17:13:02 +0000 Subject: Use Thread.getState() instead of timestamps on ... Use Thread.getState() instead of timestamps on JVM 1.5. --- src/actors/scala/actors/Actor.scala | 8 -------- src/actors/scala/actors/FJTaskRunnerGroup.java | 2 +- src/actors/scala/actors/FJTaskScheduler2.scala | 21 +++++++++++---------- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index af94a8a4e7..ad0927195e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -389,7 +389,6 @@ trait Actor extends AbstractActor { * @param replyTo the reply destination */ def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - tick() if (waitingFor(msg)) { received = Some(msg) @@ -424,7 +423,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -454,7 +452,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) @@ -506,7 +503,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -534,7 +530,6 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { - tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) @@ -723,9 +718,6 @@ trait Actor extends AbstractActor { scheduler execute task } - private def tick(): Unit = - scheduler tick this - private[actors] var kill: () => Unit = () => {} private def suspendActor() { diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java index 5485817614..d1f864c54a 100644 --- a/src/actors/scala/actors/FJTaskRunnerGroup.java +++ b/src/actors/scala/actors/FJTaskRunnerGroup.java @@ -122,7 +122,7 @@ package scala.actors; public class FJTaskRunnerGroup implements IFJTaskRunnerGroup { /** The threads in this group **/ - protected /*final*/ FJTaskRunner[] threads; + /*protected*/ /*final*/ FJTaskRunner[] threads; /** Group-wide queue for tasks entered via execute() **/ /*protected*/ final LinkedQueue entryQueue = new LinkedQueue(); diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 54c0f9dc28..d983b7a00f 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -13,6 +13,7 @@ package scala.actors import compat.Platform import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} +import java.lang.Thread.State import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} @@ -20,7 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has /** * FJTaskScheduler2 * - * @version 0.9.18 + * @version 0.9.20 * @author Philipp Haller */ class FJTaskScheduler2 extends Thread with IScheduler { @@ -68,13 +69,10 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var terminating = false private var suspending = false - private var lastActivity = Platform.currentTime - private var submittedTasks = 0 def printActorDump {} - private val TICK_FREQ = 50 private val CHECK_FREQ = 100 def onLockup(handler: () => Unit) = @@ -87,6 +85,12 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var lockupHandler: () => Unit = null + private def allWorkersBlocked: Boolean = + executor.threads.forall(t => { + val s = t.getState() + s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING + }) + override def run() { try { while (!terminating) { @@ -103,12 +107,11 @@ class FJTaskScheduler2 extends Thread with IScheduler { ActorGC.gc() // check if we need more threads - if (Platform.currentTime - lastActivity >= TICK_FREQ - && coreSize < maxSize + if (coreSize < maxSize + && allWorkersBlocked && executor.checkPoolSize()) { //Debug.info(this+": increasing thread pool size") coreSize += 1 - lastActivity = Platform.currentTime } else { if (ActorGC.allTerminated) { @@ -152,9 +155,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { /** * @param a the actor */ - def tick(a: Actor) { - lastActivity = Platform.currentTime - } + def tick(a: Actor) {} /** Shuts down all idle worker threads. */ -- cgit v1.2.3