From 59a0cce0c00c3cde6c3bb6753c47a6ae9266891f Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 27 Nov 2007 10:56:51 +0000 Subject: Fixed issue with task scheduler creating too ma... Fixed issue with task scheduler creating too many threads. Improved actor termination code. Added size query to MessageQueue. --- src/actors/scala/actors/FJTaskScheduler2.scala | 3 ++- src/actors/scala/actors/MessageQueue.scala | 18 +++++++++++++++++- src/actors/scala/actors/Scheduler.scala | 13 +++++++++---- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 0600f4d7b2..8b99b83f7c 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -114,7 +114,8 @@ class FJTaskScheduler2 extends Thread with IScheduler { if (Platform.currentTime - lastActivity >= TICK_FREQ && coreSize < maxSize && executor.checkPoolSize()) { - // do nothing + coreSize += 1 + lastActivity = Platform.currentTime } else { if (pendingReactions <= 0) { diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala index 4beacfe70b..8f2f6c72cc 100644 --- a/src/actors/scala/actors/MessageQueue.scala +++ b/src/actors/scala/actors/MessageQueue.scala @@ -41,7 +41,16 @@ class MessageQueue { def isEmpty = null eq last + private var _size = 0 + def size = _size + + protected def changeSize(diff: Int) = { + _size += diff + } + def append(msg: Any, session: OutputChannel[Any]) = { + changeSize(1) // size always increases by 1 + if (null eq last) { // list empty val el = new MessageQueueElement el.msg = msg @@ -59,7 +68,9 @@ class MessageQueue { } def extractFirst(p: Any => Boolean): MessageQueueElement = { - if (null eq last) null + changeSize(-1) // assume size decreases by 1 + + val msg = if (null eq last) null else { // test first element if (p(first.msg)) { @@ -95,5 +106,10 @@ class MessageQueue { null } } + + if (null eq msg) + changeSize(1) // correct wrong assumption + + msg } } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index ddf6df164b..cb10795899 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -85,11 +85,15 @@ object Scheduler { termHandlers += (a -> (() => f)) } - def unPendReaction(a: Actor) { + def unPendReaction(a: Actor) = synchronized { // execute registered termination handler (if any) termHandlers.get(a) match { - case Some(handler) => handler() - case None => // do nothing + case Some(handler) => + handler() + // remove mapping + termHandlers -= a + case None => + // do nothing } // notify scheduler @@ -159,7 +163,8 @@ class SingleThreadedScheduler extends IScheduler { } def execute(task: Runnable) { - if (Actor.tl.get.isInstanceOf[ActorProxy]) { + val a = Actor.tl.get.asInstanceOf[Actor] + if ((null ne a) && a.isInstanceOf[ActorProxy]) { // execute task immediately on same thread task.run() while (taskQ.length > 0) { -- cgit v1.2.3