diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-11-27 10:56:51 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-11-27 10:56:51 +0000 |
commit | 59a0cce0c00c3cde6c3bb6753c47a6ae9266891f (patch) | |
tree | 91f9d326510ab30e917064d8d96b3070cdd3ad78 /src | |
parent | 090482dae2e111084a1ac2d2d58113388dbbbffe (diff) | |
download | scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.tar.gz scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.tar.bz2 scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.zip |
Fixed issue with task scheduler creating too ma...
Fixed issue with task scheduler creating too many threads. Improved
actor termination code. Added size query to MessageQueue.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 3 | ||||
-rw-r--r-- | src/actors/scala/actors/MessageQueue.scala | 18 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 13 |
3 files changed, 28 insertions, 6 deletions
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 0600f4d7b2..8b99b83f7c 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -114,7 +114,8 @@ class FJTaskScheduler2 extends Thread with IScheduler { if (Platform.currentTime - lastActivity >= TICK_FREQ && coreSize < maxSize && executor.checkPoolSize()) { - // do nothing + coreSize += 1 + lastActivity = Platform.currentTime } else { if (pendingReactions <= 0) { diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala index 4beacfe70b..8f2f6c72cc 100644 --- a/src/actors/scala/actors/MessageQueue.scala +++ b/src/actors/scala/actors/MessageQueue.scala @@ -41,7 +41,16 @@ class MessageQueue { def isEmpty = null eq last + private var _size = 0 + def size = _size + + protected def changeSize(diff: Int) = { + _size += diff + } + def append(msg: Any, session: OutputChannel[Any]) = { + changeSize(1) // size always increases by 1 + if (null eq last) { // list empty val el = new MessageQueueElement el.msg = msg @@ -59,7 +68,9 @@ class MessageQueue { } def extractFirst(p: Any => Boolean): MessageQueueElement = { - if (null eq last) null + changeSize(-1) // assume size decreases by 1 + + val msg = if (null eq last) null else { // test first element if (p(first.msg)) { @@ -95,5 +106,10 @@ class MessageQueue { null } } + + if (null eq msg) + changeSize(1) // correct wrong assumption + + msg } } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index ddf6df164b..cb10795899 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -85,11 +85,15 @@ object Scheduler { termHandlers += (a -> (() => f)) } - def unPendReaction(a: Actor) { + def unPendReaction(a: Actor) = synchronized { // execute registered termination handler (if any) termHandlers.get(a) match { - case Some(handler) => handler() - case None => // do nothing + case Some(handler) => + handler() + // remove mapping + termHandlers -= a + case None => + // do nothing } // notify scheduler @@ -159,7 +163,8 @@ class SingleThreadedScheduler extends IScheduler { } def execute(task: Runnable) { - if (Actor.tl.get.isInstanceOf[ActorProxy]) { + val a = Actor.tl.get.asInstanceOf[Actor] + if ((null ne a) && a.isInstanceOf[ActorProxy]) { // execute task immediately on same thread task.run() while (taskQ.length > 0) { |