From f046863f53a08bfde42733a27d2bc483a73765d1 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 4 Aug 2009 15:45:20 +0000 Subject: Fixed #2028 and #2100. --- src/actors/scala/actors/ActorTask.scala | 2 +- src/actors/scala/actors/LightReaction.scala | 2 +- src/actors/scala/actors/MessageQueue.scala | 2 + src/actors/scala/actors/Reaction.scala | 2 +- src/actors/scala/actors/ReactorTask.scala | 2 +- src/actors/scala/actors/ReplyableActor.scala | 2 +- .../SingleThreadedEventLoopScheduler.scala | 69 ++++++++++++++++++++++ 7 files changed, 76 insertions(+), 5 deletions(-) create mode 100644 src/actors/scala/actors/scheduler/SingleThreadedEventLoopScheduler.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index 10bc270b61..331c84a3ee 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -19,7 +19,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -class ActorTask extends Runnable { +private[actors] class ActorTask extends Runnable { private var a: Actor = null private var fun: () => Unit = null diff --git a/src/actors/scala/actors/LightReaction.scala b/src/actors/scala/actors/LightReaction.scala index 820f666c52..b6deec3b7b 100644 --- a/src/actors/scala/actors/LightReaction.scala +++ b/src/actors/scala/actors/LightReaction.scala @@ -22,7 +22,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -class LightReaction(a: Reactor, f: PartialFunction[Any, Unit], msg: Any) extends ReactorTask(a, () => { +private[actors] class LightReaction(a: Reactor, f: PartialFunction[Any, Unit], msg: Any) extends ReactorTask(a, () => { if (f == null) a.act() else diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala index 19ab904e80..d83b6e3eeb 100644 --- a/src/actors/scala/actors/MessageQueue.scala +++ b/src/actors/scala/actors/MessageQueue.scala @@ -143,6 +143,7 @@ class MessageQueue { if (tmp eq last) { last = null } + changeSize(-1) Some((tmp.msg, tmp.session)) } else { var curr = first @@ -157,6 +158,7 @@ class MessageQueue { if (curr eq last) { last = prev } + changeSize(-1) found = Some((curr.msg, curr.session)) } } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index c8e19400eb..2c8fa391ed 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -32,7 +32,7 @@ private[actors] class KillActorException extends Throwable with ControlException * @version 0.9.10 * @author Philipp Haller */ -class Reaction(a: Actor, f: PartialFunction[Any, Unit], msg: Any) extends ActorTask(a, () => { +private[actors] class Reaction(a: Actor, f: PartialFunction[Any, Unit], msg: Any) extends ActorTask(a, () => { if (f == null) a.act() else diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala index d8d23d4e11..30a0fc988b 100644 --- a/src/actors/scala/actors/ReactorTask.scala +++ b/src/actors/scala/actors/ReactorTask.scala @@ -19,7 +19,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -class ReactorTask extends Runnable { +private[actors] class ReactorTask extends Runnable { private var reactor: Reactor = null private var fun: () => Unit = null diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala index 8ba4f13842..a7fb44dd3c 100644 --- a/src/actors/scala/actors/ReplyableActor.scala +++ b/src/actors/scala/actors/ReplyableActor.scala @@ -19,7 +19,7 @@ import java.util.concurrent.ExecutionException * * @author Philipp Haller */ -trait ReplyableActor extends ReplyableReactor { +private[actors] trait ReplyableActor extends ReplyableReactor { thiz: AbstractActor with ReplyReactor => /** diff --git a/src/actors/scala/actors/scheduler/SingleThreadedEventLoopScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedEventLoopScheduler.scala new file mode 100644 index 0000000000..539916590e --- /dev/null +++ b/src/actors/scala/actors/scheduler/SingleThreadedEventLoopScheduler.scala @@ -0,0 +1,69 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors +package scheduler + +import scala.collection.mutable.Queue + +/** + * This scheduler executes actor tasks on the current thread. + * + * @author Philipp Haller + */ +class SingleThreadedEventLoopScheduler extends IScheduler { + + private val tasks = new Queue[Runnable] + + /** The maximum number of nested tasks that are run + * without unwinding the call stack. + */ + protected var maxNesting = 10 + + private var curNest = 0 + private var isShutdown = false + + def execute(task: Runnable) { + if (curNest < maxNesting) { + curNest += 1 + task.run() + } else { + curNest = 0 + tasks += task + } + } + + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + def shutdown() { + isShutdown = false + while (!tasks.isEmpty) { + val task = tasks.dequeue() + task.run() + } + isShutdown = true + } + + def newActor(actor: Reactor) {} + def terminated(actor: Reactor) {} + + // TODO: run termination handlers at end of shutdown. + def onTerminate(actor: Reactor)(f: => Unit) {} + + def isActive = + !isShutdown + + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } +} -- cgit v1.2.3