From acf89aafe568f76a44ddb0e44ca91f8180d1d825 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 4 Mar 2010 11:40:16 +0000 Subject: Clean-ups in scheduler hierarchy. --- src/actors/scala/actors/package.scala | 5 ++- .../scheduler/DefaultThreadPoolScheduler.scala | 7 +++- .../scala/actors/scheduler/ExecutorScheduler.scala | 45 +++++++++++++++++++++- .../scala/actors/scheduler/QuitControl.scala | 19 +++++++++ .../scala/actors/scheduler/SchedulerService.scala | 9 ----- .../actors/scheduler/SimpleExecutorScheduler.scala | 6 +-- .../actors/scheduler/TerminationMonitor.scala | 3 +- .../actors/scheduler/TerminationService.scala | 18 ++++++--- .../actors/scheduler/ThreadPoolScheduler.scala | 6 +-- 9 files changed, 91 insertions(+), 27 deletions(-) create mode 100644 src/actors/scala/actors/scheduler/QuitControl.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/package.scala b/src/actors/scala/actors/package.scala index 7a7ef5899e..eadc75c433 100644 --- a/src/actors/scala/actors/package.scala +++ b/src/actors/scala/actors/package.scala @@ -14,7 +14,10 @@ package object actors { @deprecated("this class is going to be removed in a future release") type WorkerThread = java.lang.Thread + @deprecated("use scala.actors.scheduler.SingleThreadedScheduler instead") + type SingleThreadedScheduler = scala.actors.scheduler.SingleThreadedScheduler + @deprecated("this value is going to be removed in a future release") - val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ThreadPoolScheduler] + val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ActorGC] } diff --git a/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala index 4fed00ba26..42942f9344 100644 --- a/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala @@ -24,9 +24,12 @@ import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, * * @author Philipp Haller */ -private[actors] class DefaultThreadPoolScheduler(daemon: Boolean) extends ThreadPoolScheduler(daemon) { +private[actors] class DefaultThreadPoolScheduler(daemon: Boolean) + extends ExecutorScheduler { - executor = { + setDaemon(daemon) + + def executor = { val workQueue = new LinkedBlockingQueue[Runnable] val threadFactory = new ThreadFactory { diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala index bcd524f345..4b5eec21d9 100644 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -11,16 +11,57 @@ package scala.actors package scheduler -import java.util.concurrent.Callable +import java.util.concurrent.{Callable, ExecutorService} import scala.concurrent.ThreadPoolRunner +/** + * The ExecutorScheduler object is used to create + * ExecutorScheduler instances. + * + * @author Philipp Haller + */ +object ExecutorScheduler { + + private def start(sched: ExecutorScheduler): ExecutorScheduler = { + sched.start() + sched + } + + /** Creates an ExecutorScheduler using the provided + * ExecutorService. + * + * @param exec the executor to use + * @return the scheduler + */ + def apply(exec: ExecutorService): ExecutorScheduler = + start(new ExecutorScheduler { + def executor: ExecutorService = exec + }) + + /** Creates an ExecutorScheduler using the provided + * ExecutorService. + * + * @param exec the executor to use + * @param term whether the scheduler should automatically terminate + * @return the scheduler + */ + def apply(exec: ExecutorService, term: Boolean): ExecutorScheduler = + start(new ExecutorScheduler { + def executor: ExecutorService = exec + override val terminate = term + }) + +} + /** * The ExecutorScheduler class uses an * ExecutorService to execute Actors. * * @author Philipp Haller */ -private[scheduler] trait ExecutorScheduler extends IScheduler with ThreadPoolRunner { +trait ExecutorScheduler extends Thread + with IScheduler with TerminationService + with ThreadPoolRunner { def execute(task: Runnable) { super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]]) diff --git a/src/actors/scala/actors/scheduler/QuitControl.scala b/src/actors/scala/actors/scheduler/QuitControl.scala new file mode 100644 index 0000000000..b217094c1e --- /dev/null +++ b/src/actors/scala/actors/scheduler/QuitControl.scala @@ -0,0 +1,19 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2010, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors.scheduler + +import scala.util.control.ControlThrowable + +/** + * The QuitControl class is used to manage control flow + * of certain schedulers. + * + * @author Philipp Haller + */ +private[scheduler] class QuitControl extends ControlThrowable diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala index 1f886dbae9..9fbee3d0b1 100644 --- a/src/actors/scala/actors/scheduler/SchedulerService.scala +++ b/src/actors/scala/actors/scheduler/SchedulerService.scala @@ -11,7 +11,6 @@ package scala.actors package scheduler -import scala.util.control.ControlThrowable import java.lang.{Runnable, Thread, InterruptedException} /** @@ -70,11 +69,3 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler } } -/** - * The QuitControl class is used to manage control flow - * of certain schedulers and worker threads. - * - * @version 0.9.8 - * @author Philipp Haller - */ -private[actors] class QuitControl extends ControlThrowable diff --git a/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala b/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala index 6c8814e90c..7f1c14eacd 100644 --- a/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala @@ -28,9 +28,9 @@ import java.util.concurrent.ExecutorService * * @author Philipp Haller */ -class SimpleExecutorScheduler(protected var executor: ExecutorService, - protected var terminate: Boolean) - extends TerminationService(terminate) with ExecutorScheduler { +class SimpleExecutorScheduler(protected override val executor: ExecutorService, + protected override val terminate: Boolean) + extends ExecutorScheduler { /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala index 5510ebb095..67f72371a3 100644 --- a/src/actors/scala/actors/scheduler/TerminationMonitor.scala +++ b/src/actors/scala/actors/scheduler/TerminationMonitor.scala @@ -13,7 +13,8 @@ package scheduler import scala.collection.mutable.HashMap -trait TerminationMonitor { +private[scheduler] trait TerminationMonitor { + _: IScheduler => protected var activeActors = 0 protected val terminationHandlers = new HashMap[Reactor, () => Unit] diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala index da71b6ac2b..aa047eedfa 100644 --- a/src/actors/scala/actors/scheduler/TerminationService.scala +++ b/src/actors/scala/actors/scheduler/TerminationService.scala @@ -11,7 +11,7 @@ package scala.actors package scheduler -import java.lang.{Runnable, Thread, InterruptedException} +import java.lang.{Thread, InterruptedException} /** * The TerminationService class starts a new thread @@ -21,11 +21,16 @@ import java.lang.{Runnable, Thread, InterruptedException} * * @author Philipp Haller */ -abstract class TerminationService(terminate: Boolean) - extends Thread with IScheduler with TerminationMonitor { +private[scheduler] trait TerminationService extends TerminationMonitor { + _: Thread with IScheduler => private var terminating = false + /** Indicates whether the scheduler should terminate when all + * actors have terminated. + */ + protected val terminate = true + protected val CHECK_FREQ = 50 def onShutdown(): Unit @@ -39,11 +44,11 @@ abstract class TerminationService(terminate: Boolean) } catch { case _: InterruptedException => } - if (terminating) - throw new QuitControl - if (terminate && allActorsTerminated) + if (terminating || (terminate && allActorsTerminated)) throw new QuitControl + + gc() } } } catch { @@ -60,4 +65,5 @@ abstract class TerminationService(terminate: Boolean) def shutdown(): Unit = synchronized { terminating = true } + } diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala index 2061207ee1..41e7b0c451 100644 --- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -29,14 +29,14 @@ import scala.concurrent.ManagedBlocker * @author Philipp Haller */ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, - protected val terminate: Boolean, + protected override val terminate: Boolean, protected val daemon: Boolean) extends Thread with ExecutorScheduler with TerminationMonitor { setDaemon(daemon) private var terminating = false // guarded by this - protected val CHECK_FREQ = 10 + protected override val CHECK_FREQ = 10 /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers @@ -74,7 +74,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, /** Shuts down the scheduler. */ - def shutdown(): Unit = synchronized { + override def shutdown(): Unit = synchronized { terminating = true } -- cgit v1.2.3