From e1424d97d5386275003b40c60a7b36c5f5fd87d7 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 28 May 2009 19:08:24 +0000 Subject: Improved SimpleExecutorScheduler with non-leaki... Improved SimpleExecutorScheduler with non-leaking termination monitor. --- src/actors/scala/actors/ExecutorScheduler.scala | 5 +- src/actors/scala/actors/Scheduler.scala | 24 ++++---- .../scala/actors/SimpleExecutorScheduler.scala | 62 +++++-------------- src/actors/scala/actors/TerminationMonitor.scala | 59 ++++++++++++++++++ src/actors/scala/actors/TerminationService.scala | 70 ++++++++++++++++++++++ 5 files changed, 160 insertions(+), 60 deletions(-) create mode 100644 src/actors/scala/actors/TerminationMonitor.scala create mode 100644 src/actors/scala/actors/TerminationService.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala index 72d38d1b2f..c414d80eba 100644 --- a/src/actors/scala/actors/ExecutorScheduler.scala +++ b/src/actors/scala/actors/ExecutorScheduler.scala @@ -37,7 +37,8 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul executor execute task } catch { case ree: RejectedExecutionException => - Debug.info("caught "+ree) + // run task on current thread + task.run() } } @@ -48,5 +49,5 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul executor.shutdown() def isActive = - !executor.isShutdown + (executor ne null) && !executor.isShutdown } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 50b8ecbbea..a7935edabf 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -10,8 +10,8 @@ package scala.actors -import compat.Platform import java.lang.Runnable +import java.util.concurrent._ /** * The Scheduler object is used by Actor to @@ -25,9 +25,15 @@ object Scheduler extends DelegatingScheduler { Debug.info("initializing "+this+"...") def makeNewScheduler: IScheduler = { - val sched = new DefaultExecutorScheduler - sched.start() - sched + val workQueue = new LinkedBlockingQueue[Runnable](100000) + val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, + ThreadPoolConfig.maxPoolSize, + 50L, + TimeUnit.MILLISECONDS, + workQueue) + val s = new SimpleExecutorScheduler(threadPool, true) + s.start() + s } private var tasks: LinkedQueue = null @@ -56,8 +62,9 @@ object Scheduler extends DelegatingScheduler { */ def restart(): Unit = synchronized { // 1. shut down current scheduler - if (sched ne null) + if (sched ne null) { sched.shutdown() + } // 2. create and start new scheduler if ((sched ne null) && sched.isInstanceOf[FJTaskScheduler2]) { @@ -73,11 +80,8 @@ object Scheduler extends DelegatingScheduler { tasks = null } } else { - sched = { - val s = new DefaultExecutorScheduler - s.start() - s - } + // will trigger creation of new delegate scheduler + sched = null } } diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala index e83cb5cfb9..dfa1bdaf73 100644 --- a/src/actors/scala/actors/SimpleExecutorScheduler.scala +++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala @@ -16,27 +16,34 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException} /** * The SimpleExecutorScheduler class uses an * ExecutorService to execute Actors. It - * does not start an additional thread. Also, the underlying - * ExecutorService is not shut down automatically; - * instead, the ExecutorService must be shut down either + * does not start an additional thread. + * + * A SimpleExecutorScheduler attempts to shut down + * the underlying ExecutorService only if + * terminate is set to true. + * + * Otherwise, the ExecutorService must be shut down either * directly or by shutting down the * SimpleExecutorScheduler instance. * * @author Philipp Haller */ -class SimpleExecutorScheduler(protected var executor: ExecutorService) extends IScheduler { +class SimpleExecutorScheduler(protected var executor: ExecutorService, + protected var terminate: Boolean) extends TerminationService(terminate) { - /* Maintains at most one closure per actor that is executed + /* Maintains per actor one closure that is executed * when the actor terminates. */ protected val termHandlers = new HashMap[OutputChannelActor, () => Unit] + private var pendingReactions = 0 + /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers * (to be used in subclasses such as DefaultExecutorScheduler) properly. */ def this() { - this(null) + this(null, true) } /** Submits a Runnable for execution. @@ -53,20 +60,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I } } - /** Submits a closure for execution. - * - * @param block the closure to be executed - */ - def execute(block: => Unit) { - val task = new Runnable { - def run() { block } - } - execute(task) - } - - /** Shuts down the scheduler. - */ - def shutdown() { + def onShutdown() { executor.shutdown() } @@ -75,32 +69,4 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I */ def isActive = (executor ne null) && !executor.isShutdown() - - def newActor(a: OutputChannelActor) {} - - def terminated(a: OutputChannelActor) { - // obtain termination handler (if any) - val todo = synchronized { - termHandlers.get(a) match { - case Some(handler) => - termHandlers -= a - () => handler - case None => - () => { /* do nothing */ } - } - } - - // invoke termination handler (if any) - todo() - } - - /** Registers a closure to be executed when the specified - * actor terminates. - * - * @param a the actor - * @param f the closure to be registered - */ - def onTerminate(a: OutputChannelActor)(block: => Unit) = synchronized { - termHandlers += (a -> (() => block)) - } } diff --git a/src/actors/scala/actors/TerminationMonitor.scala b/src/actors/scala/actors/TerminationMonitor.scala new file mode 100644 index 0000000000..b7cb748bf0 --- /dev/null +++ b/src/actors/scala/actors/TerminationMonitor.scala @@ -0,0 +1,59 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id:$ + +package scala.actors + +import scala.collection.mutable.HashMap + +trait TerminationMonitor extends IScheduler { + + private var pendingReactions = 0 + private val termHandlers = new HashMap[OutputChannelActor, () => Unit] + + /** newActor is invoked whenever a new actor is started. */ + def newActor(a: OutputChannelActor) = synchronized { + pendingReactions += 1 + } + + /** Registers a closure to be executed when the specified + * actor terminates. + * + * @param a the actor + * @param f the closure to be registered + */ + def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized { + termHandlers += (a -> (() => f)) + } + + def terminated(a: OutputChannelActor) = synchronized { + // obtain termination handler (if any) + val todo = synchronized { + termHandlers.get(a) match { + case Some(handler) => + termHandlers -= a + () => handler + case None => + () => { /* do nothing */ } + } + } + + // invoke termination handler (if any) + todo() + + synchronized { + pendingReactions -= 1 + } + } + + protected def allTerminated: Boolean = synchronized { + pendingReactions <= 0 + } + +} diff --git a/src/actors/scala/actors/TerminationService.scala b/src/actors/scala/actors/TerminationService.scala new file mode 100644 index 0000000000..c983d2d558 --- /dev/null +++ b/src/actors/scala/actors/TerminationService.scala @@ -0,0 +1,70 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: $ + +package scala.actors + +import java.lang.{Runnable, Thread, InterruptedException} + +/** + * The TerminationService class starts a new thread + * that is used to check regularly if the scheduler can be + * shut down, because all started actors are known to + * have terminated. + * + * @author Philipp Haller + */ +abstract class TerminationService(terminate: Boolean) extends Thread with TerminationMonitor { + + private var terminating = false + + protected val CHECK_FREQ = 50 + + def onShutdown(): Unit + + override def run() { + try { + while (true) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + } + if (terminating) + throw new QuitException + + if (terminate && allTerminated) + throw new QuitException + } + } + } catch { + case _: QuitException => + Debug.info(this+": initiating shutdown...") + // invoke shutdown hook + onShutdown() + // allow thread to exit + } + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = synchronized { + terminating = true + } +} -- cgit v1.2.3