From 0cb9b256f8d30c3a0c2bdf6d08ba10e26aa73f1c Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 21 Jul 2009 09:31:39 +0000 Subject: Moved scheduler source files into new scheduler... Moved scheduler source files into new scheduler directory. --- src/actors/scala/actors/DaemonScheduler.scala | 20 --- .../scala/actors/DefaultExecutorScheduler.scala | 52 ------- src/actors/scala/actors/DelegatingScheduler.scala | 56 -------- .../scala/actors/DrainableForkJoinPool.scala | 11 -- src/actors/scala/actors/ExecutorScheduler.scala | 53 ------- src/actors/scala/actors/ForkJoinScheduler.scala | 152 --------------------- src/actors/scala/actors/SchedulerService.scala | 105 -------------- .../scala/actors/SimpleExecutorScheduler.scala | 42 ------ .../scala/actors/SingleThreadedScheduler.scala | 41 ------ src/actors/scala/actors/TerminationMonitor.scala | 62 --------- src/actors/scala/actors/TerminationService.scala | 71 ---------- src/actors/scala/actors/ThreadPoolConfig.scala | 43 ------ src/actors/scala/actors/ThreadPoolScheduler.scala | 98 ------------- .../scala/actors/scheduler/DaemonScheduler.scala | 20 +++ .../scheduler/DefaultExecutorScheduler.scala | 52 +++++++ .../actors/scheduler/DelegatingScheduler.scala | 56 ++++++++ .../actors/scheduler/DrainableForkJoinPool.scala | 11 ++ .../scala/actors/scheduler/ExecutorScheduler.scala | 53 +++++++ .../scala/actors/scheduler/ForkJoinScheduler.scala | 152 +++++++++++++++++++++ .../scala/actors/scheduler/SchedulerService.scala | 105 ++++++++++++++ .../actors/scheduler/SimpleExecutorScheduler.scala | 42 ++++++ .../actors/scheduler/SingleThreadedScheduler.scala | 41 ++++++ .../actors/scheduler/TerminationMonitor.scala | 62 +++++++++ .../actors/scheduler/TerminationService.scala | 71 ++++++++++ .../scala/actors/scheduler/ThreadPoolConfig.scala | 43 ++++++ .../actors/scheduler/ThreadPoolScheduler.scala | 98 +++++++++++++ 26 files changed, 806 insertions(+), 806 deletions(-) delete mode 100644 src/actors/scala/actors/DaemonScheduler.scala delete mode 100644 src/actors/scala/actors/DefaultExecutorScheduler.scala delete mode 100644 src/actors/scala/actors/DelegatingScheduler.scala delete mode 100644 src/actors/scala/actors/DrainableForkJoinPool.scala delete mode 100644 src/actors/scala/actors/ExecutorScheduler.scala delete mode 100644 src/actors/scala/actors/ForkJoinScheduler.scala delete mode 100644 src/actors/scala/actors/SchedulerService.scala delete mode 100644 src/actors/scala/actors/SimpleExecutorScheduler.scala delete mode 100644 src/actors/scala/actors/SingleThreadedScheduler.scala delete mode 100644 src/actors/scala/actors/TerminationMonitor.scala delete mode 100644 src/actors/scala/actors/TerminationService.scala delete mode 100644 src/actors/scala/actors/ThreadPoolConfig.scala delete mode 100644 src/actors/scala/actors/ThreadPoolScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/DaemonScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/DelegatingScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala create mode 100644 src/actors/scala/actors/scheduler/ExecutorScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/ForkJoinScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/SchedulerService.scala create mode 100644 src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala create mode 100644 src/actors/scala/actors/scheduler/TerminationMonitor.scala create mode 100644 src/actors/scala/actors/scheduler/TerminationService.scala create mode 100644 src/actors/scala/actors/scheduler/ThreadPoolConfig.scala create mode 100644 src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/DaemonScheduler.scala b/src/actors/scala/actors/DaemonScheduler.scala deleted file mode 100644 index ee4df16b47..0000000000 --- a/src/actors/scala/actors/DaemonScheduler.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors.scheduler - -/** - * Default scheduler for actors with daemon semantics, such as those backing futures. - * - * @author Erik Engbrecht - */ -object DaemonScheduler extends DelegatingScheduler { - - def makeNewScheduler(): IScheduler = new DefaultExecutorScheduler(true) - -} diff --git a/src/actors/scala/actors/DefaultExecutorScheduler.scala b/src/actors/scala/actors/DefaultExecutorScheduler.scala deleted file mode 100644 index afda4a91c0..0000000000 --- a/src/actors/scala/actors/DefaultExecutorScheduler.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.scheduler - -import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, ThreadFactory} - -/** - * The DefaultExecutorScheduler class uses a default - * ThreadPoolExecutor for executing Actors. - * - * It can be configured using the two JVM properties - * actors.corePoolSize and - * actors.maxPoolSize that control the initial and - * maximum size of the thread pool, respectively. - * - * @author Philipp Haller - */ -class DefaultExecutorScheduler(daemon: Boolean) - extends SchedulerService(daemon) with ExecutorScheduler { - - def this() = - this(false) - - private val workQueue = new LinkedBlockingQueue[Runnable] - - private val threadFactory = new ThreadFactory { - def newThread(r: Runnable): Thread = { - val result = new Thread(r) - result.setDaemon(daemon) - result - } - } - - private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, - ThreadPoolConfig.maxPoolSize, - 50L, - TimeUnit.MILLISECONDS, - workQueue, - threadFactory) - - val executor = threadPool - - override val CHECK_FREQ = 50 -} diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala deleted file mode 100644 index 21785ff2e4..0000000000 --- a/src/actors/scala/actors/DelegatingScheduler.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors.scheduler - -/** - * @author Erik Engbrecht - */ -trait DelegatingScheduler extends IScheduler { - protected def makeNewScheduler(): IScheduler - - protected var sched: IScheduler = null - - final def impl = synchronized { - if ((sched eq null) || (!sched.isActive)) - sched = makeNewScheduler() - sched - } - - final def impl_= (scheduler: IScheduler): Unit = synchronized { - //TODO: if there is already a scheduler, should it be shutdown? - sched = scheduler - } - - /** - * Always active because it will just make a new scheduler if required - */ - def isActive: Boolean = true - - def execute(fun: => Unit) = impl.execute(fun) - - def execute(task: Runnable) = impl.execute(task) - - def executeFromActor(task: Runnable) = impl.executeFromActor(task) - - def shutdown(): Unit = synchronized { - if (sched ne null) { - sched.shutdown() - sched = null - } - } - - def newActor(actor: Reactor) = impl.newActor(actor) - - def terminated(actor: Reactor) = impl.terminated(actor) - - def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f) - - override def managedBlock(blocker: ManagedBlocker): Unit = - impl.managedBlock(blocker) -} diff --git a/src/actors/scala/actors/DrainableForkJoinPool.scala b/src/actors/scala/actors/DrainableForkJoinPool.scala deleted file mode 100644 index 994bac3dc2..0000000000 --- a/src/actors/scala/actors/DrainableForkJoinPool.scala +++ /dev/null @@ -1,11 +0,0 @@ -package scala.actors.scheduler - -import java.util.Collection -import forkjoin.{ForkJoinPool, ForkJoinTask} - -private class DrainableForkJoinPool extends ForkJoinPool { - - override def drainTasksTo(c: Collection[ForkJoinTask[_]]): Int = - super.drainTasksTo(c) - -} diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala deleted file mode 100644 index 2684b32228..0000000000 --- a/src/actors/scala/actors/ExecutorScheduler.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.scheduler - -import java.util.concurrent.{ExecutorService, RejectedExecutionException} - -/** - * The ExecutorScheduler class uses an - * ExecutorService to execute Actors. - * - * @author Philipp Haller - */ -trait ExecutorScheduler extends IScheduler { - - protected def executor: ExecutorService - - /** Submits a Runnable for execution. - * - * @param task the task to be executed - */ - def execute(task: Runnable) { - try { - executor execute task - } catch { - case ree: RejectedExecutionException => - // run task on current thread - task.run() - } - } - - def executeFromActor(task: Runnable) = - execute(task) - - /** This method is called when the SchedulerService - * shuts down. - */ - def onShutdown(): Unit = - executor.shutdown() - - /** The scheduler is active if the underlying ExecutorService - * has not been shut down. - */ - def isActive = - (executor ne null) && !executor.isShutdown -} diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala deleted file mode 100644 index 089a7639f2..0000000000 --- a/src/actors/scala/actors/ForkJoinScheduler.scala +++ /dev/null @@ -1,152 +0,0 @@ -package scala.actors.scheduler - -import java.lang.Thread.State -import java.util.{Collection, ArrayList} -import forkjoin._ - -/** The ForkJoinScheduler is backed by a lightweight - * fork-join task execution framework. - * - * @author Philipp Haller - */ -class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor { - - private var pool = makeNewPool() - private var terminating = false - private var snapshoting = false - private var drainedTasks: Collection[ForkJoinTask[_]] = null - - private val CHECK_FREQ = 10 - - private def makeNewPool(): DrainableForkJoinPool = { - val p = new DrainableForkJoinPool() - // enable locally FIFO scheduling mode - p.setAsyncMode(true) - Debug.info(this+": parallelism "+p.getParallelism()) - Debug.info(this+": max pool size "+p.getMaximumPoolSize()) - p - } - - /** Starts this scheduler. - */ - def start() { - (new Thread(this)).start() - } - - private def allWorkersBlocked: Boolean = - (pool.workers != null) && - pool.workers.forall(t => { - (t == null) || { - val s = t.getState() - s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING - } - }) - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ) - } catch { - case _: InterruptedException => - } - - if (terminating) - throw new QuitException - - if (allTerminated) { - //Debug.info(this+": all actors terminated") - throw new QuitException - } - - if (!snapshoting) { - val poolSize = pool.getPoolSize() - if (allWorkersBlocked && (poolSize < ThreadPoolConfig.maxPoolSize)) { - pool.setParallelism(poolSize + 1) - } - } else if (pool.isQuiescent()) { - val list = new ArrayList[ForkJoinTask[_]] - val num = pool.drainTasksTo(list) - Debug.info(this+": drained "+num+" tasks") - drainedTasks = list - throw new QuitException - } - } - } - } catch { - case _: QuitException => - Debug.info(this+": initiating shutdown...") - while (!pool.isQuiescent()) { - try { - Thread.sleep(10) - } catch { - case ignore: InterruptedException => - } - } - pool.shutdown() - // allow thread to exit - } - } - - def execute(task: Runnable) { - pool.execute(task) - } - - def executeFromActor(task: Runnable) { - val recAction = new RecursiveAction { - def compute() = task.run() - } - recAction.fork() - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - override def managedBlock(blocker: ManagedBlocker) { - ForkJoinPool.managedBlock(blocker, true) - } - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - - def isActive = - (pool ne null) && !pool.isShutdown() - - /** Suspends the scheduler. All threads that were in use by the - * scheduler and its internal thread pool are terminated. - */ - def snapshot() = synchronized { - snapshoting = true - } - - /** Resumes the execution of the scheduler if it was previously - * suspended using ForkJoinScheduler.snapshot. - */ - def restart() { - synchronized { - if (!snapshoting) - error("snapshot has not been invoked") - else if (isActive) - error("scheduler is still active") - else - snapshoting = false - } - pool = makeNewPool() - val iter = drainedTasks.iterator() - while (iter.hasNext()) { - pool.execute(iter.next()) - } - start() - } - -} diff --git a/src/actors/scala/actors/SchedulerService.scala b/src/actors/scala/actors/SchedulerService.scala deleted file mode 100644 index ac8c621074..0000000000 --- a/src/actors/scala/actors/SchedulerService.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id: $ - -package scala.actors.scheduler - -import scala.util.control.ControlException -import java.lang.{Runnable, Thread, InterruptedException} - -/** - * The abstract SchedulerService class allows subclasses - * to implement a custom onShutdown method, which is - * invoked when the runtime system has detected that all actors have - * been terminated. - * - * @version 0.9.18 - * @author Philipp Haller - */ -abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC { - setDaemon(daemon) - - def this() = - this(false) - - private var terminating = false - - def printActorDump {} - - protected val CHECK_FREQ = 100 - - def onLockup(handler: () => Unit) = - lockupHandler = handler - - def onLockup(millis: Int)(handler: () => Unit) = { - //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ - lockupHandler = handler - } - - private var lockupHandler: () => Unit = null - - def onShutdown(): Unit - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ) - } catch { - case _: InterruptedException => - } - if (terminating) - throw new QuitException - - gc() - - if (allTerminated) - throw new QuitException - } - } - } catch { - case _: QuitException => - Debug.info(this+": initiating shutdown...") - // invoke shutdown hook - onShutdown() - // allow thread to exit - } - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } -} - -/** - * The QuitException class is used to manage control flow - * of certain schedulers and worker threads. - * - * @version 0.9.8 - * @author Philipp Haller - */ -private[actors] class QuitException extends Throwable with ControlException { - /* - For efficiency reasons we do not fill in - the execution stack trace. - */ - override def fillInStackTrace(): Throwable = this -} diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala deleted file mode 100644 index fdcc2cbd4d..0000000000 --- a/src/actors/scala/actors/SimpleExecutorScheduler.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.scheduler - -import java.util.concurrent.ExecutorService - -/** - * The SimpleExecutorScheduler class uses an - * ExecutorService to execute Actors. It - * does not start an additional thread. - * - * A SimpleExecutorScheduler attempts to shut down - * the underlying ExecutorService only if - * terminate is set to true. - * - * Otherwise, the ExecutorService must be shut down either - * directly or by shutting down the - * SimpleExecutorScheduler instance. - * - * @author Philipp Haller - */ -class SimpleExecutorScheduler(protected var executor: ExecutorService, - protected var terminate: Boolean) - extends TerminationService(terminate) with ExecutorScheduler { - - /* This constructor (and the var above) is currently only used to work - * around a bug in scaladoc, which cannot deal with early initializers - * (to be used in subclasses such as DefaultExecutorScheduler) properly. - */ - def this() { - this(null, true) - } - -} diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala deleted file mode 100644 index b6de36b069..0000000000 --- a/src/actors/scala/actors/SingleThreadedScheduler.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.scheduler - -/** - * This scheduler executes the tasks of an actor on a single - * thread (the current thread). - * - * @version 0.9.18 - * @author Philipp Haller - */ -class SingleThreadedScheduler extends IScheduler { - - def execute(task: Runnable) { - task.run() - } - - def executeFromActor(task: Runnable) = - execute(task) - - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - def shutdown() {} - - def newActor(actor: Reactor) {} - def terminated(actor: Reactor) {} - def onTerminate(actor: Reactor)(f: => Unit) {} - - def isActive = true -} diff --git a/src/actors/scala/actors/TerminationMonitor.scala b/src/actors/scala/actors/TerminationMonitor.scala deleted file mode 100644 index 9437a9a168..0000000000 --- a/src/actors/scala/actors/TerminationMonitor.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id:$ - -package scala.actors.scheduler - -import scala.collection.mutable.HashMap - -trait TerminationMonitor { - - private var pendingReactions = 0 - private val termHandlers = new HashMap[Reactor, () => Unit] - private var started = false - - /** newActor is invoked whenever a new actor is started. */ - def newActor(a: Reactor) = synchronized { - pendingReactions += 1 - if (!started) - started = true - } - - /** Registers a closure to be executed when the specified - * actor terminates. - * - * @param a the actor - * @param f the closure to be registered - */ - def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized { - termHandlers += (a -> (() => f)) - } - - def terminated(a: Reactor) = synchronized { - // obtain termination handler (if any) - val todo = synchronized { - termHandlers.get(a) match { - case Some(handler) => - termHandlers -= a - () => handler - case None => - () => { /* do nothing */ } - } - } - - // invoke termination handler (if any) - todo() - - synchronized { - pendingReactions -= 1 - } - } - - protected def allTerminated: Boolean = synchronized { - started && pendingReactions <= 0 - } - -} diff --git a/src/actors/scala/actors/TerminationService.scala b/src/actors/scala/actors/TerminationService.scala deleted file mode 100644 index e3ee8971fc..0000000000 --- a/src/actors/scala/actors/TerminationService.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id: $ - -package scala.actors.scheduler - -import java.lang.{Runnable, Thread, InterruptedException} - -/** - * The TerminationService class starts a new thread - * that is used to check regularly if the scheduler can be - * shut down, because all started actors are known to - * have terminated. - * - * @author Philipp Haller - */ -abstract class TerminationService(terminate: Boolean) - extends Thread with IScheduler with TerminationMonitor { - - private var terminating = false - - protected val CHECK_FREQ = 50 - - def onShutdown(): Unit - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ) - } catch { - case _: InterruptedException => - } - if (terminating) - throw new QuitException - - if (terminate && allTerminated) - throw new QuitException - } - } - } catch { - case _: QuitException => - Debug.info(this+": initiating shutdown...") - // invoke shutdown hook - onShutdown() - // allow thread to exit - } - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } -} diff --git a/src/actors/scala/actors/ThreadPoolConfig.scala b/src/actors/scala/actors/ThreadPoolConfig.scala deleted file mode 100644 index 8a1075a652..0000000000 --- a/src/actors/scala/actors/ThreadPoolConfig.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.scheduler - -/** - * @author Erik Engbrecht - */ -object ThreadPoolConfig { - private val rt = Runtime.getRuntime() - private val minNumThreads = 4 - - private def getIntegerProp(propName: String): Option[Int] = { - try { - val prop = System.getProperty(propName) - Some(Integer.parseInt(prop)) - } catch { - case ace: java.security.AccessControlException => None - case nfe: NumberFormatException => None - } - } - - val corePoolSize = getIntegerProp("actors.corePoolSize") match { - case Some(i) if i > 0 => i - case _ => { - val byCores = rt.availableProcessors() * 2 - if (byCores > minNumThreads) byCores else minNumThreads - } - } - - val maxPoolSize = getIntegerProp("actors.maxPoolSize") match { - case Some(i) if (i >= corePoolSize) => i - case Some(i) if (i < corePoolSize) => corePoolSize - case _ => 256 - } -} diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala deleted file mode 100644 index 568a045631..0000000000 --- a/src/actors/scala/actors/ThreadPoolScheduler.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.scheduler - -import java.util.concurrent.ThreadPoolExecutor - -/** - * The ThreadPoolScheduler class uses an - * ThreadPoolExecutor to execute Actors. - * - * A ThreadPoolScheduler attempts to shut down - * the underlying ExecutorService only if - * terminate is set to true. - * - * Otherwise, the ExecutorService must be shut down either - * directly or by shutting down the - * ThreadPoolScheduler instance. - * - * @author Philipp Haller - */ -class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, - protected var terminate: Boolean) - extends Thread with TerminationMonitor with ExecutorScheduler { - - private var terminating = false - protected val CHECK_FREQ = 10 - - /* This constructor (and the var above) is currently only used to work - * around a bug in scaladoc, which cannot deal with early initializers - * (to be used in subclasses such as DefaultExecutorScheduler) properly. - */ - def this() { - this(null, true) - } - - override def managedBlock(blocker: ManagedBlocker) { - val coreSize = executor.getCorePoolSize() - if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { - executor.setCorePoolSize(coreSize + 1) - } - blocker.block() - } - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ) - } catch { - case _: InterruptedException => - } - - if (terminating) - throw new QuitException - - if (terminate && allTerminated) - throw new QuitException - - val coreSize = executor.getCorePoolSize() - if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { - executor.setCorePoolSize(coreSize + 1) - } - } - } - } catch { - case _: QuitException => - Debug.info(this+": initiating shutdown...") - // invoke shutdown hook - onShutdown() - // allow thread to exit - } - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - -} diff --git a/src/actors/scala/actors/scheduler/DaemonScheduler.scala b/src/actors/scala/actors/scheduler/DaemonScheduler.scala new file mode 100644 index 0000000000..ee4df16b47 --- /dev/null +++ b/src/actors/scala/actors/scheduler/DaemonScheduler.scala @@ -0,0 +1,20 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors.scheduler + +/** + * Default scheduler for actors with daemon semantics, such as those backing futures. + * + * @author Erik Engbrecht + */ +object DaemonScheduler extends DelegatingScheduler { + + def makeNewScheduler(): IScheduler = new DefaultExecutorScheduler(true) + +} diff --git a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala new file mode 100644 index 0000000000..afda4a91c0 --- /dev/null +++ b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala @@ -0,0 +1,52 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.scheduler + +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, ThreadFactory} + +/** + * The DefaultExecutorScheduler class uses a default + * ThreadPoolExecutor for executing Actors. + * + * It can be configured using the two JVM properties + * actors.corePoolSize and + * actors.maxPoolSize that control the initial and + * maximum size of the thread pool, respectively. + * + * @author Philipp Haller + */ +class DefaultExecutorScheduler(daemon: Boolean) + extends SchedulerService(daemon) with ExecutorScheduler { + + def this() = + this(false) + + private val workQueue = new LinkedBlockingQueue[Runnable] + + private val threadFactory = new ThreadFactory { + def newThread(r: Runnable): Thread = { + val result = new Thread(r) + result.setDaemon(daemon) + result + } + } + + private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, + ThreadPoolConfig.maxPoolSize, + 50L, + TimeUnit.MILLISECONDS, + workQueue, + threadFactory) + + val executor = threadPool + + override val CHECK_FREQ = 50 +} diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala new file mode 100644 index 0000000000..21785ff2e4 --- /dev/null +++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala @@ -0,0 +1,56 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors.scheduler + +/** + * @author Erik Engbrecht + */ +trait DelegatingScheduler extends IScheduler { + protected def makeNewScheduler(): IScheduler + + protected var sched: IScheduler = null + + final def impl = synchronized { + if ((sched eq null) || (!sched.isActive)) + sched = makeNewScheduler() + sched + } + + final def impl_= (scheduler: IScheduler): Unit = synchronized { + //TODO: if there is already a scheduler, should it be shutdown? + sched = scheduler + } + + /** + * Always active because it will just make a new scheduler if required + */ + def isActive: Boolean = true + + def execute(fun: => Unit) = impl.execute(fun) + + def execute(task: Runnable) = impl.execute(task) + + def executeFromActor(task: Runnable) = impl.executeFromActor(task) + + def shutdown(): Unit = synchronized { + if (sched ne null) { + sched.shutdown() + sched = null + } + } + + def newActor(actor: Reactor) = impl.newActor(actor) + + def terminated(actor: Reactor) = impl.terminated(actor) + + def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f) + + override def managedBlock(blocker: ManagedBlocker): Unit = + impl.managedBlock(blocker) +} diff --git a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala new file mode 100644 index 0000000000..994bac3dc2 --- /dev/null +++ b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala @@ -0,0 +1,11 @@ +package scala.actors.scheduler + +import java.util.Collection +import forkjoin.{ForkJoinPool, ForkJoinTask} + +private class DrainableForkJoinPool extends ForkJoinPool { + + override def drainTasksTo(c: Collection[ForkJoinTask[_]]): Int = + super.drainTasksTo(c) + +} diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala new file mode 100644 index 0000000000..2684b32228 --- /dev/null +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -0,0 +1,53 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.scheduler + +import java.util.concurrent.{ExecutorService, RejectedExecutionException} + +/** + * The ExecutorScheduler class uses an + * ExecutorService to execute Actors. + * + * @author Philipp Haller + */ +trait ExecutorScheduler extends IScheduler { + + protected def executor: ExecutorService + + /** Submits a Runnable for execution. + * + * @param task the task to be executed + */ + def execute(task: Runnable) { + try { + executor execute task + } catch { + case ree: RejectedExecutionException => + // run task on current thread + task.run() + } + } + + def executeFromActor(task: Runnable) = + execute(task) + + /** This method is called when the SchedulerService + * shuts down. + */ + def onShutdown(): Unit = + executor.shutdown() + + /** The scheduler is active if the underlying ExecutorService + * has not been shut down. + */ + def isActive = + (executor ne null) && !executor.isShutdown +} diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala new file mode 100644 index 0000000000..089a7639f2 --- /dev/null +++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala @@ -0,0 +1,152 @@ +package scala.actors.scheduler + +import java.lang.Thread.State +import java.util.{Collection, ArrayList} +import forkjoin._ + +/** The ForkJoinScheduler is backed by a lightweight + * fork-join task execution framework. + * + * @author Philipp Haller + */ +class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor { + + private var pool = makeNewPool() + private var terminating = false + private var snapshoting = false + private var drainedTasks: Collection[ForkJoinTask[_]] = null + + private val CHECK_FREQ = 10 + + private def makeNewPool(): DrainableForkJoinPool = { + val p = new DrainableForkJoinPool() + // enable locally FIFO scheduling mode + p.setAsyncMode(true) + Debug.info(this+": parallelism "+p.getParallelism()) + Debug.info(this+": max pool size "+p.getMaximumPoolSize()) + p + } + + /** Starts this scheduler. + */ + def start() { + (new Thread(this)).start() + } + + private def allWorkersBlocked: Boolean = + (pool.workers != null) && + pool.workers.forall(t => { + (t == null) || { + val s = t.getState() + s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING + } + }) + + override def run() { + try { + while (true) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + } + + if (terminating) + throw new QuitException + + if (allTerminated) { + //Debug.info(this+": all actors terminated") + throw new QuitException + } + + if (!snapshoting) { + val poolSize = pool.getPoolSize() + if (allWorkersBlocked && (poolSize < ThreadPoolConfig.maxPoolSize)) { + pool.setParallelism(poolSize + 1) + } + } else if (pool.isQuiescent()) { + val list = new ArrayList[ForkJoinTask[_]] + val num = pool.drainTasksTo(list) + Debug.info(this+": drained "+num+" tasks") + drainedTasks = list + throw new QuitException + } + } + } + } catch { + case _: QuitException => + Debug.info(this+": initiating shutdown...") + while (!pool.isQuiescent()) { + try { + Thread.sleep(10) + } catch { + case ignore: InterruptedException => + } + } + pool.shutdown() + // allow thread to exit + } + } + + def execute(task: Runnable) { + pool.execute(task) + } + + def executeFromActor(task: Runnable) { + val recAction = new RecursiveAction { + def compute() = task.run() + } + recAction.fork() + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + override def managedBlock(blocker: ManagedBlocker) { + ForkJoinPool.managedBlock(blocker, true) + } + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = synchronized { + terminating = true + } + + def isActive = + (pool ne null) && !pool.isShutdown() + + /** Suspends the scheduler. All threads that were in use by the + * scheduler and its internal thread pool are terminated. + */ + def snapshot() = synchronized { + snapshoting = true + } + + /** Resumes the execution of the scheduler if it was previously + * suspended using ForkJoinScheduler.snapshot. + */ + def restart() { + synchronized { + if (!snapshoting) + error("snapshot has not been invoked") + else if (isActive) + error("scheduler is still active") + else + snapshoting = false + } + pool = makeNewPool() + val iter = drainedTasks.iterator() + while (iter.hasNext()) { + pool.execute(iter.next()) + } + start() + } + +} diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala new file mode 100644 index 0000000000..ac8c621074 --- /dev/null +++ b/src/actors/scala/actors/scheduler/SchedulerService.scala @@ -0,0 +1,105 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: $ + +package scala.actors.scheduler + +import scala.util.control.ControlException +import java.lang.{Runnable, Thread, InterruptedException} + +/** + * The abstract SchedulerService class allows subclasses + * to implement a custom onShutdown method, which is + * invoked when the runtime system has detected that all actors have + * been terminated. + * + * @version 0.9.18 + * @author Philipp Haller + */ +abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC { + setDaemon(daemon) + + def this() = + this(false) + + private var terminating = false + + def printActorDump {} + + protected val CHECK_FREQ = 100 + + def onLockup(handler: () => Unit) = + lockupHandler = handler + + def onLockup(millis: Int)(handler: () => Unit) = { + //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ + lockupHandler = handler + } + + private var lockupHandler: () => Unit = null + + def onShutdown(): Unit + + override def run() { + try { + while (true) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + } + if (terminating) + throw new QuitException + + gc() + + if (allTerminated) + throw new QuitException + } + } + } catch { + case _: QuitException => + Debug.info(this+": initiating shutdown...") + // invoke shutdown hook + onShutdown() + // allow thread to exit + } + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = synchronized { + terminating = true + } +} + +/** + * The QuitException class is used to manage control flow + * of certain schedulers and worker threads. + * + * @version 0.9.8 + * @author Philipp Haller + */ +private[actors] class QuitException extends Throwable with ControlException { + /* + For efficiency reasons we do not fill in + the execution stack trace. + */ + override def fillInStackTrace(): Throwable = this +} diff --git a/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala b/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala new file mode 100644 index 0000000000..fdcc2cbd4d --- /dev/null +++ b/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala @@ -0,0 +1,42 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.scheduler + +import java.util.concurrent.ExecutorService + +/** + * The SimpleExecutorScheduler class uses an + * ExecutorService to execute Actors. It + * does not start an additional thread. + * + * A SimpleExecutorScheduler attempts to shut down + * the underlying ExecutorService only if + * terminate is set to true. + * + * Otherwise, the ExecutorService must be shut down either + * directly or by shutting down the + * SimpleExecutorScheduler instance. + * + * @author Philipp Haller + */ +class SimpleExecutorScheduler(protected var executor: ExecutorService, + protected var terminate: Boolean) + extends TerminationService(terminate) with ExecutorScheduler { + + /* This constructor (and the var above) is currently only used to work + * around a bug in scaladoc, which cannot deal with early initializers + * (to be used in subclasses such as DefaultExecutorScheduler) properly. + */ + def this() { + this(null, true) + } + +} diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala new file mode 100644 index 0000000000..b6de36b069 --- /dev/null +++ b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala @@ -0,0 +1,41 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.scheduler + +/** + * This scheduler executes the tasks of an actor on a single + * thread (the current thread). + * + * @version 0.9.18 + * @author Philipp Haller + */ +class SingleThreadedScheduler extends IScheduler { + + def execute(task: Runnable) { + task.run() + } + + def executeFromActor(task: Runnable) = + execute(task) + + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + def shutdown() {} + + def newActor(actor: Reactor) {} + def terminated(actor: Reactor) {} + def onTerminate(actor: Reactor)(f: => Unit) {} + + def isActive = true +} diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala new file mode 100644 index 0000000000..9437a9a168 --- /dev/null +++ b/src/actors/scala/actors/scheduler/TerminationMonitor.scala @@ -0,0 +1,62 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id:$ + +package scala.actors.scheduler + +import scala.collection.mutable.HashMap + +trait TerminationMonitor { + + private var pendingReactions = 0 + private val termHandlers = new HashMap[Reactor, () => Unit] + private var started = false + + /** newActor is invoked whenever a new actor is started. */ + def newActor(a: Reactor) = synchronized { + pendingReactions += 1 + if (!started) + started = true + } + + /** Registers a closure to be executed when the specified + * actor terminates. + * + * @param a the actor + * @param f the closure to be registered + */ + def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized { + termHandlers += (a -> (() => f)) + } + + def terminated(a: Reactor) = synchronized { + // obtain termination handler (if any) + val todo = synchronized { + termHandlers.get(a) match { + case Some(handler) => + termHandlers -= a + () => handler + case None => + () => { /* do nothing */ } + } + } + + // invoke termination handler (if any) + todo() + + synchronized { + pendingReactions -= 1 + } + } + + protected def allTerminated: Boolean = synchronized { + started && pendingReactions <= 0 + } + +} diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala new file mode 100644 index 0000000000..e3ee8971fc --- /dev/null +++ b/src/actors/scala/actors/scheduler/TerminationService.scala @@ -0,0 +1,71 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: $ + +package scala.actors.scheduler + +import java.lang.{Runnable, Thread, InterruptedException} + +/** + * The TerminationService class starts a new thread + * that is used to check regularly if the scheduler can be + * shut down, because all started actors are known to + * have terminated. + * + * @author Philipp Haller + */ +abstract class TerminationService(terminate: Boolean) + extends Thread with IScheduler with TerminationMonitor { + + private var terminating = false + + protected val CHECK_FREQ = 50 + + def onShutdown(): Unit + + override def run() { + try { + while (true) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + } + if (terminating) + throw new QuitException + + if (terminate && allTerminated) + throw new QuitException + } + } + } catch { + case _: QuitException => + Debug.info(this+": initiating shutdown...") + // invoke shutdown hook + onShutdown() + // allow thread to exit + } + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = synchronized { + terminating = true + } +} diff --git a/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala b/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala new file mode 100644 index 0000000000..8a1075a652 --- /dev/null +++ b/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala @@ -0,0 +1,43 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.scheduler + +/** + * @author Erik Engbrecht + */ +object ThreadPoolConfig { + private val rt = Runtime.getRuntime() + private val minNumThreads = 4 + + private def getIntegerProp(propName: String): Option[Int] = { + try { + val prop = System.getProperty(propName) + Some(Integer.parseInt(prop)) + } catch { + case ace: java.security.AccessControlException => None + case nfe: NumberFormatException => None + } + } + + val corePoolSize = getIntegerProp("actors.corePoolSize") match { + case Some(i) if i > 0 => i + case _ => { + val byCores = rt.availableProcessors() * 2 + if (byCores > minNumThreads) byCores else minNumThreads + } + } + + val maxPoolSize = getIntegerProp("actors.maxPoolSize") match { + case Some(i) if (i >= corePoolSize) => i + case Some(i) if (i < corePoolSize) => corePoolSize + case _ => 256 + } +} diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala new file mode 100644 index 0000000000..568a045631 --- /dev/null +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -0,0 +1,98 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.scheduler + +import java.util.concurrent.ThreadPoolExecutor + +/** + * The ThreadPoolScheduler class uses an + * ThreadPoolExecutor to execute Actors. + * + * A ThreadPoolScheduler attempts to shut down + * the underlying ExecutorService only if + * terminate is set to true. + * + * Otherwise, the ExecutorService must be shut down either + * directly or by shutting down the + * ThreadPoolScheduler instance. + * + * @author Philipp Haller + */ +class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, + protected var terminate: Boolean) + extends Thread with TerminationMonitor with ExecutorScheduler { + + private var terminating = false + protected val CHECK_FREQ = 10 + + /* This constructor (and the var above) is currently only used to work + * around a bug in scaladoc, which cannot deal with early initializers + * (to be used in subclasses such as DefaultExecutorScheduler) properly. + */ + def this() { + this(null, true) + } + + override def managedBlock(blocker: ManagedBlocker) { + val coreSize = executor.getCorePoolSize() + if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + executor.setCorePoolSize(coreSize + 1) + } + blocker.block() + } + + override def run() { + try { + while (true) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + } + + if (terminating) + throw new QuitException + + if (terminate && allTerminated) + throw new QuitException + + val coreSize = executor.getCorePoolSize() + if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + executor.setCorePoolSize(coreSize + 1) + } + } + } + } catch { + case _: QuitException => + Debug.info(this+": initiating shutdown...") + // invoke shutdown hook + onShutdown() + // allow thread to exit + } + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = synchronized { + terminating = true + } + +} -- cgit v1.2.3