diff options
author | Philipp Haller <hallerp@gmail.com> | 2008-12-17 16:05:17 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2008-12-17 16:05:17 +0000 |
commit | ba2043b7edcebe59ddf8db8da15114d3b9476521 (patch) | |
tree | 137eac832a9f766b0f3d94d989c0cbdd1c4610d3 | |
parent | 825c2e5522c8ccf5fa4823903637ee1228615ec8 (diff) | |
download | scala-ba2043b7edcebe59ddf8db8da15114d3b9476521.tar.gz scala-ba2043b7edcebe59ddf8db8da15114d3b9476521.tar.bz2 scala-ba2043b7edcebe59ddf8db8da15114d3b9476521.zip |
Revert Thread.getState optimization.
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 27 |
2 files changed, 24 insertions, 11 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index ad0927195e..af94a8a4e7 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -389,6 +389,7 @@ trait Actor extends AbstractActor { * @param replyTo the reply destination */ def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { + tick() if (waitingFor(msg)) { received = Some(msg) @@ -423,6 +424,7 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { + tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -452,6 +454,7 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "receive from channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { + tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) @@ -503,6 +506,7 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { + tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -530,6 +534,7 @@ trait Actor extends AbstractActor { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links this.synchronized { + tick() // first, remove spurious TIMEOUT message from mailbox if any val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) @@ -718,6 +723,9 @@ trait Actor extends AbstractActor { scheduler execute task } + private def tick(): Unit = + scheduler tick this + private[actors] var kill: () => Unit = () => {} private def suspendActor() { diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index d983b7a00f..24a2f119d5 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -13,7 +13,6 @@ package scala.actors import compat.Platform import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} -import java.lang.Thread.State import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} @@ -21,7 +20,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has /** * FJTaskScheduler2 * - * @version 0.9.20 + * @version 0.9.18 * @author Philipp Haller */ class FJTaskScheduler2 extends Thread with IScheduler { @@ -69,10 +68,13 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var terminating = false private var suspending = false + private var lastActivity = Platform.currentTime + private var submittedTasks = 0 def printActorDump {} + private val TICK_FREQ = 50 private val CHECK_FREQ = 100 def onLockup(handler: () => Unit) = @@ -85,12 +87,6 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var lockupHandler: () => Unit = null - private def allWorkersBlocked: Boolean = - executor.threads.forall(t => { - val s = t.getState() - s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING - }) - override def run() { try { while (!terminating) { @@ -107,11 +103,12 @@ class FJTaskScheduler2 extends Thread with IScheduler { ActorGC.gc() // check if we need more threads - if (coreSize < maxSize - && allWorkersBlocked + if (Platform.currentTime - lastActivity >= TICK_FREQ + && coreSize < maxSize && executor.checkPoolSize()) { //Debug.info(this+": increasing thread pool size") coreSize += 1 + lastActivity = Platform.currentTime } else { if (ActorGC.allTerminated) { @@ -152,10 +149,18 @@ class FJTaskScheduler2 extends Thread with IScheduler { def run() { fun } }) + private var tickCnt = 0 + /** * @param a the actor */ - def tick(a: Actor) {} + def tick(a: Actor) = synchronized { + if (tickCnt == 100) { + tickCnt = 0 + lastActivity = Platform.currentTime + } else + tickCnt += 1 + } /** Shuts down all idle worker threads. */ |