diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-05-24 19:30:39 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-05-24 19:30:39 +0000 |
commit | 67ab4b8ece5e83575c63a58dd668ff662b5dc7eb (patch) | |
tree | c5c9e21cc3ff019090a154ca2d2212ac986a96af /src | |
parent | a1f098795934a4ccc8a3e72b779e47b911eae0f4 (diff) | |
download | scala-67ab4b8ece5e83575c63a58dd668ff662b5dc7eb.tar.gz scala-67ab4b8ece5e83575c63a58dd668ff662b5dc7eb.tar.bz2 scala-67ab4b8ece5e83575c63a58dd668ff662b5dc7eb.zip |
Implemented #2012.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 9 | ||||
-rw-r--r-- | src/actors/scala/actors/ActorGC.scala | 6 | ||||
-rw-r--r-- | src/actors/scala/actors/DefaultExecutorScheduler.scala | 41 | ||||
-rw-r--r-- | src/actors/scala/actors/DelegatingScheduler.scala | 48 | ||||
-rw-r--r-- | src/actors/scala/actors/ExecutorScheduler.scala | 5 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 53 | ||||
-rw-r--r-- | src/actors/scala/actors/IScheduler.scala | 66 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 140 | ||||
-rw-r--r-- | src/actors/scala/actors/SchedulerAdapter.scala | 12 | ||||
-rw-r--r-- | src/actors/scala/actors/SchedulerService.scala | 26 | ||||
-rw-r--r-- | src/actors/scala/actors/SingleThreadedScheduler.scala | 38 | ||||
-rw-r--r-- | src/actors/scala/actors/ThreadPoolConfig.scala | 43 |
12 files changed, 246 insertions, 241 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 63d36eb7f8..b9e36a574e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -875,7 +875,7 @@ trait Actor extends AbstractActor { shouldExit = false scheduler execute { - scheduler.actorGC.newActor(Actor.this) + scheduler.newActor(Actor.this) (new Reaction(Actor.this)).run() } @@ -1020,11 +1020,14 @@ trait Actor extends AbstractActor { } private[actors] def terminated() { - scheduler.actorGC.terminated(this) + scheduler.terminated(this) } + /* Requires qualified private, because <code>RemoteActor</code> must + * register termination handler. + */ private[actors] def onTerminate(f: => Unit) { - scheduler.actorGC.onTerminate(this) { f } + scheduler.onTerminate(this) { f } } } diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index 2e36d4a14a..5906b96921 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.{HashMap, HashSet} * (e.g. act method finishes, exit explicitly called, an exception is thrown), * the ActorGC is informed via the <code>terminated</code> method. */ -class ActorGC { +trait ActorGC extends IScheduler { private var pendingReactions = 0 private val termHandlers = new HashMap[Actor, () => Unit] @@ -70,13 +70,13 @@ class ActorGC { pendingReactions <= 0 } - private[actors] def onTerminate(a: Actor)(f: => Unit) = synchronized { + def onTerminate(a: Actor)(f: => Unit) = synchronized { termHandlers += (a -> (() => f)) } /* Called only from <code>Reaction</code>. */ - private[actors] def terminated(a: Actor) = synchronized { + def terminated(a: Actor) = synchronized { // execute registered termination handler (if any) termHandlers.get(a) match { case Some(handler) => diff --git a/src/actors/scala/actors/DefaultExecutorScheduler.scala b/src/actors/scala/actors/DefaultExecutorScheduler.scala index 71e405d975..bfd0a09890 100644 --- a/src/actors/scala/actors/DefaultExecutorScheduler.scala +++ b/src/actors/scala/actors/DefaultExecutorScheduler.scala @@ -25,47 +25,10 @@ import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue} */ class DefaultExecutorScheduler extends ExecutorScheduler { - private val rt = Runtime.getRuntime() - private val minNumThreads = 4 - - /** The value of the actors.corePoolSize JVM property. This property - * determines the initial thread pool size. - */ - private val coreProp = try { - System.getProperty("actors.corePoolSize") - } catch { - case ace: java.security.AccessControlException => - null - } - - private val maxProp = - try { - System.getProperty("actors.maxPoolSize") - } catch { - case ace: java.security.AccessControlException => - null - } - - private val initCoreSize = - if (null ne coreProp) Integer.parseInt(coreProp) - else { - val numCores = rt.availableProcessors() - if (2 * numCores > minNumThreads) - 2 * numCores - else - minNumThreads - } - - private val maxSize = - if (null ne maxProp) Integer.parseInt(maxProp) - else 256 - - private val coreSize = initCoreSize - private val workQueue = new LinkedBlockingQueue[Runnable] - private val threadPool = new ThreadPoolExecutor(coreSize, - maxSize, + private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, + ThreadPoolConfig.maxPoolSize, 50L, TimeUnit.MILLISECONDS, workQueue) diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala new file mode 100644 index 0000000000..dfb55957bb --- /dev/null +++ b/src/actors/scala/actors/DelegatingScheduler.scala @@ -0,0 +1,48 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors + +trait DelegatingScheduler extends IScheduler { + protected def makeNewScheduler(): IScheduler + + protected var sched: IScheduler = null + + final def impl = synchronized { + if ((sched eq null) || (!sched.isActive)) + sched = makeNewScheduler() + sched + } + + final def impl_= (scheduler: IScheduler): Unit = synchronized { + //TODO: if there is already a scheduler, should it be shutdown? + sched = scheduler + } + + /** + * Always active because it will just make a new scheduler if required + */ + def isActive: Boolean = true + + def execute(fun: => Unit) = impl.execute(fun) + + def execute(task: Runnable) = impl.execute(task) + + def shutdown(): Unit = synchronized { + if (sched ne null) { + sched.shutdown() + sched = null + } + } + + def newActor(actor: Actor) = impl.newActor(actor) + + def terminated(actor: Actor) = impl.terminated(actor) + + def onTerminate(actor: Actor)(f: => Unit) = impl.onTerminate(actor)(f) +} diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala index 4b4204543d..72d38d1b2f 100644 --- a/src/actors/scala/actors/ExecutorScheduler.scala +++ b/src/actors/scala/actors/ExecutorScheduler.scala @@ -18,7 +18,7 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException} * * @author Philipp Haller */ -class ExecutorScheduler(var executor: ExecutorService) extends SchedulerService { +class ExecutorScheduler(protected var executor: ExecutorService) extends SchedulerService { /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers @@ -46,4 +46,7 @@ class ExecutorScheduler(var executor: ExecutorService) extends SchedulerService */ def onShutdown(): Unit = executor.shutdown() + + def isActive = + !executor.isShutdown } diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 5832b78dd9..3f336c4536 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -24,60 +24,23 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * @version 0.9.18 * @author Philipp Haller */ -class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler { +class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Thread with ActorGC { setDaemon(daemon) /** Default constructor creates a non-daemon thread. */ def this() = - this(false) + this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false) - var printStats = false - - val rt = Runtime.getRuntime() - val minNumThreads = 4 - - /** The value of the actors.corePoolSize JVM property. This property - * determines the initial thread pool size. - */ - val coreProp = try { - System.getProperty("actors.corePoolSize") - } catch { - case ace: java.security.AccessControlException => - null - } - val maxProp = - try { - System.getProperty("actors.maxPoolSize") - } catch { - case ace: java.security.AccessControlException => - null - } - - val initCoreSize = - if (null ne coreProp) Integer.parseInt(coreProp) - else { - val numCores = rt.availableProcessors() - if (2 * numCores > minNumThreads) - 2 * numCores - else - minNumThreads - } + def this(daemon: Boolean) = + this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, daemon) - val maxSize = - if (null ne maxProp) Integer.parseInt(maxProp) - else 256 + var printStats = false private var coreSize = initCoreSize private val executor = new FJTaskRunnerGroup(coreSize) - /** The <code>ActorGC</code> instance that keeps track of the - * live actor objects that are managed by <code>this</code> - * scheduler. - */ - val actorGC = new ActorGC - private var terminating = false private var suspending = false @@ -116,7 +79,7 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler { if (!suspending) { - actorGC.gc() + gc() // check if we need more threads if (coreSize < maxSize @@ -126,7 +89,7 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler { coreSize += 1 } else { - if (actorGC.allTerminated) { + if (allTerminated) { // if all worker threads idle terminate if (executor.getActiveCount() == 0) { Debug.info(this+": initiating shutdown...") @@ -172,4 +135,6 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler { executor.snapshot() } + def isActive = !terminating && !suspending + } diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala new file mode 100644 index 0000000000..dc2e6961fa --- /dev/null +++ b/src/actors/scala/actors/IScheduler.scala @@ -0,0 +1,66 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * The <code>IScheduler</code> trait provides a common interface + * for all schedulers used to execute actor tasks. + * + * Subclasses of <code>Actor</code> that override its + * <code>scheduler</code> member must provide + * an implementation of the <code>IScheduler</code> + * trait. + * + * @author Philipp Haller + */ +trait IScheduler { + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + def execute(fun: => Unit): Unit + + /** Submits a <code>Runnable</code> for execution. + * + * @param task the task to be executed + */ + def execute(task: Runnable): Unit + + /** Shuts down the scheduler. + */ + def shutdown(): Unit + + /** When the scheduler is active, it can execute tasks. + */ + def isActive: Boolean + + /** Registers a newly created actor with this scheduler. + * + * @param a the actor to be registered + */ + def newActor(a: Actor): Unit + + /** Unregisters an actor from this scheduler, because it + * has terminated. + * + * @param a the actor to be registered + */ + def terminated(a: Actor): Unit + + /** Registers a closure to be executed when the specified + * actor terminates. + * + * @param a the actor + * @param f the closure to be registered + */ + def onTerminate(a: Actor)(f: => Unit): Unit +} diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 44371879a3..50b8ecbbea 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -20,35 +20,29 @@ import java.lang.Runnable * @version 0.9.18 * @author Philipp Haller */ -object Scheduler extends IScheduler { +object Scheduler extends DelegatingScheduler { Debug.info("initializing "+this+"...") - private var sched: IScheduler = { - val s = new DefaultExecutorScheduler - s.start() - s - } - - def impl = sched - def impl_= (scheduler: IScheduler) = { - sched = scheduler + def makeNewScheduler: IScheduler = { + val sched = new DefaultExecutorScheduler + sched.start() + sched } private var tasks: LinkedQueue = null - private var pendingCount = 0 /* Assumes <code>sched</code> holds an instance * of <code>FJTaskScheduler2</code>. */ - def snapshot(): Unit = + def snapshot(): Unit = synchronized { if (sched.isInstanceOf[FJTaskScheduler2]) { val fjts = sched.asInstanceOf[FJTaskScheduler2] tasks = fjts.snapshot() - pendingCount = actorGC.getPendingCount fjts.shutdown() } else error("snapshot operation not supported.") + } /** Shuts down the current scheduler and creates and starts a new scheduler. * @@ -62,13 +56,13 @@ object Scheduler extends IScheduler { */ def restart(): Unit = synchronized { // 1. shut down current scheduler - sched.shutdown() + if (sched ne null) + sched.shutdown() // 2. create and start new scheduler - if (sched.isInstanceOf[FJTaskScheduler2]) { + if ((sched ne null) && sched.isInstanceOf[FJTaskScheduler2]) { sched = { val s = new FJTaskScheduler2 - actorGC.setPendingCount(pendingCount) s.start() s } @@ -87,118 +81,4 @@ object Scheduler extends IScheduler { } } - def execute(task: Runnable) { - sched execute task - } - - def execute(fun: => Unit) { - sched execute { fun } - } - - def shutdown() = sched.shutdown() - - /** The <code>ActorGC</code> instance that keeps track of the - * live actor objects that are managed by <code>this</code> - * scheduler. - */ - def actorGC: ActorGC = sched.actorGC - - def onLockup(handler: () => Unit) = sched.onLockup(handler) - def onLockup(millis: Int)(handler: () => Unit) = sched.onLockup(millis)(handler) - def printActorDump = sched.printActorDump -} - - -/** - * The <code>IScheduler</code> trait provides a common interface - * for all schedulers used to execute actor tasks. - * - * Subclasses of <code>Actor</code> that override its - * <code>scheduler</code> member must provide - * an implementation of the <code>IScheduler</code> - * trait. - * - * @version 0.9.18 - * @author Philipp Haller - */ -trait IScheduler { - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit - - /** Submits a <code>Runnable</code> for execution. - * - * @param task the task to be executed - */ - def execute(task: Runnable): Unit - - /** Shuts down the scheduler. - */ - def shutdown(): Unit - - /** The <code>ActorGC</code> instance that keeps track of the - * live actor objects that are managed by <code>this</code> - * <code>IScheduler</code> instance. - */ - def actorGC: ActorGC - - def onLockup(handler: () => Unit): Unit - def onLockup(millis: Int)(handler: () => Unit): Unit - def printActorDump: Unit - - val QUIT_TASK = new Reaction(null) { - override def run(): Unit = {} - override def toString() = "QUIT_TASK" - } -} - - -/** - * This scheduler executes the tasks of an actor on a single - * thread (the current thread). - * - * @version 0.9.18 - * @author Philipp Haller - */ -class SingleThreadedScheduler extends IScheduler { - - def execute(task: Runnable) { - task.run() - } - - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - def shutdown() {} - - /** The <code>ActorGC</code> instance that keeps track of the - * live actor objects that are managed by <code>this</code> - * scheduler. - */ - val actorGC: ActorGC = new ActorGC - - def onLockup(handler: () => Unit) {} - def onLockup(millis: Int)(handler: () => Unit) {} - def printActorDump {} -} - - -/** - * The <code>QuitException</code> class is used to manage control flow - * of certain schedulers and worker threads. - * - * @version 0.9.8 - * @author Philipp Haller - */ -private[actors] class QuitException extends Throwable { - /* - For efficiency reasons we do not fill in - the execution stack trace. - */ - override def fillInStackTrace(): Throwable = this } diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index 76133af6b1..c56f3b0f9c 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -32,16 +32,4 @@ trait SchedulerAdapter extends IScheduler { def shutdown(): Unit = Scheduler.shutdown() - /** The <code>ActorGC</code> instance that keeps track of the - * live actor objects that are managed by <code>this</code> - * scheduler. - */ - val actorGC: ActorGC = new ActorGC - - def onLockup(handler: () => Unit) {} - - def onLockup(millis: Int)(handler: () => Unit) {} - - def printActorDump {} - } diff --git a/src/actors/scala/actors/SchedulerService.scala b/src/actors/scala/actors/SchedulerService.scala index 5c177ba233..1826a153da 100644 --- a/src/actors/scala/actors/SchedulerService.scala +++ b/src/actors/scala/actors/SchedulerService.scala @@ -21,18 +21,12 @@ import java.lang.{Runnable, Thread, InterruptedException} * @version 0.9.18 * @author Philipp Haller */ -abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler { +abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC { setDaemon(daemon) def this() = this(false) - /** The <code>ActorGC</code> instance that keeps track of the - * live actor objects that are managed by <code>this</code> - * scheduler. - */ - val actorGC = new ActorGC - private var terminating = false def printActorDump {} @@ -63,9 +57,9 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler if (terminating) throw new QuitException - actorGC.gc() + gc() - if (actorGC.allTerminated) + if (allTerminated) throw new QuitException } } @@ -92,5 +86,19 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler def shutdown(): Unit = synchronized { terminating = true } +} +/** + * The <code>QuitException</code> class is used to manage control flow + * of certain schedulers and worker threads. + * + * @version 0.9.8 + * @author Philipp Haller + */ +private[actors] class QuitException extends Throwable { + /* + For efficiency reasons we do not fill in + the execution stack trace. + */ + override def fillInStackTrace(): Throwable = this } diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala new file mode 100644 index 0000000000..fa41d02736 --- /dev/null +++ b/src/actors/scala/actors/SingleThreadedScheduler.scala @@ -0,0 +1,38 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * This scheduler executes the tasks of an actor on a single + * thread (the current thread). + * + * @version 0.9.18 + * @author Philipp Haller + */ +class SingleThreadedScheduler extends IScheduler { + + def execute(task: Runnable) { + task.run() + } + + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) + + def shutdown() {} + + def newActor(actor: Actor) {} + def terminated(actor: Actor) {} + def onTerminate(actor: Actor)(f: => Unit) {} + + def isActive = true +} diff --git a/src/actors/scala/actors/ThreadPoolConfig.scala b/src/actors/scala/actors/ThreadPoolConfig.scala new file mode 100644 index 0000000000..502ad4e13b --- /dev/null +++ b/src/actors/scala/actors/ThreadPoolConfig.scala @@ -0,0 +1,43 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * @author Erik Engbrecht + */ +object ThreadPoolConfig { + private val rt = Runtime.getRuntime() + private val minNumThreads = 4 + + private def getIntegerProp(propName: String): Option[Int] = { + try { + val prop = System.getProperty(propName) + Some(Integer.parseInt(prop)) + } catch { + case ace: java.security.AccessControlException => None + case nfe: NumberFormatException => None + } + } + + val corePoolSize = getIntegerProp("actors.corePoolSize") match { + case Some(i) if i > 0 => i + case _ => { + val byCores = rt.availableProcessors() * 2 + if (byCores > minNumThreads) byCores else minNumThreads + } + } + + val maxPoolSize = getIntegerProp("actors.maxPoolSize") match { + case Some(i) if (i >= corePoolSize) => i + case Some(i) if (i < corePoolSize) => corePoolSize + case _ => 256 + } +} |