From fa3010ed335f5a8647c5fdcecbfbdb01b26fec02 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Sun, 24 May 2009 20:14:52 +0000 Subject: Added SimpleExecutorScheduler which does not te... Added SimpleExecutorScheduler which does not terminate automatically. --- src/actors/scala/actors/Actor.scala | 2 +- src/actors/scala/actors/ActorGC.scala | 6 +- src/actors/scala/actors/DelegatingScheduler.scala | 3 + .../scala/actors/SimpleExecutorScheduler.scala | 105 +++++++++++++++++++++ 4 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 src/actors/scala/actors/SimpleExecutorScheduler.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index b9e36a574e..47df95057b 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -1024,7 +1024,7 @@ trait Actor extends AbstractActor { } /* Requires qualified private, because RemoteActor must - * register termination handler. + * register a termination handler. */ private[actors] def onTerminate(f: => Unit) { scheduler.onTerminate(this) { f } diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index 5906b96921..a5160d14e2 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -48,7 +48,7 @@ trait ActorGC extends IScheduler { } /** Removes unreachable actors from refSet. */ - def gc() = synchronized { + protected def gc() = synchronized { // check for unreachable actors def drainRefQ() { val wr = refQ.poll @@ -62,11 +62,11 @@ trait ActorGC extends IScheduler { drainRefQ() } - def status() { + protected def status() { println(this+": size of refSet: "+refSet.size) } - def allTerminated: Boolean = synchronized { + protected def allTerminated: Boolean = synchronized { pendingReactions <= 0 } diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala index dfb55957bb..782ef0da42 100644 --- a/src/actors/scala/actors/DelegatingScheduler.scala +++ b/src/actors/scala/actors/DelegatingScheduler.scala @@ -8,6 +8,9 @@ package scala.actors +/** + * @author Erik Engbrecht + */ trait DelegatingScheduler extends IScheduler { protected def makeNewScheduler(): IScheduler diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala new file mode 100644 index 0000000000..d6b2104939 --- /dev/null +++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala @@ -0,0 +1,105 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +import scala.collection.mutable.HashMap +import java.util.concurrent.{ExecutorService, RejectedExecutionException} + +/** + * The SimpleExecutorScheduler class uses an + * ExecutorService to execute Actors. It + * does not start an additional thread. Also, the underlying + * ExecutorService is not shut down automatically; + * instead, the ExecutorService must be shut down either + * directly or by shutting down the + * SimpleExecutorScheduler instance. + * + * @author Philipp Haller + */ +class SimpleExecutorScheduler(protected var executor: ExecutorService) extends IScheduler { + + /* Maintains at most one closure per actor that is executed + * when the actor terminates. + */ + protected val termHandlers = new HashMap[Actor, () => Unit] + + /* This constructor (and the var above) is currently only used to work + * around a bug in scaladoc, which cannot deal with early initializers + * (to be used in subclasses such as DefaultExecutorScheduler) properly. + */ + def this() { + this(null) + } + + /** Submits a Runnable for execution. + * + * @param task the task to be executed + */ + def execute(task: Runnable) { + try { + executor execute task + } catch { + case ree: RejectedExecutionException => + Debug.info("caught "+ree) + } + } + + /** Submits a closure for execution. + * + * @param block the closure to be executed + */ + def execute(block: => Unit) { + val task = new Runnable { + def run() { block } + } + execute(task) + } + + /** Shuts down the scheduler. + */ + def shutdown() { + executor.shutdown() + } + + /** The scheduler is active if the underlying ExecutorService + * has not been shut down. + */ + def isActive = + !executor.isShutdown() + + def newActor(a: Actor) {} + + def terminated(a: Actor) { + // obtain termination handler (if any) + val todo = synchronized { + termHandlers.get(a) match { + case Some(handler) => + termHandlers -= a + () => handler + case None => + () => { /* do nothing */ } + } + } + + // invoke termination handler (if any) + todo() + } + + /** Registers a closure to be executed when the specified + * actor terminates. + * + * @param a the actor + * @param f the closure to be registered + */ + def onTerminate(a: Actor)(block: => Unit) = synchronized { + termHandlers += (a -> (() => block)) + } +} -- cgit v1.2.3