summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-24 19:30:39 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-24 19:30:39 +0000
commit67ab4b8ece5e83575c63a58dd668ff662b5dc7eb (patch)
treec5c9e21cc3ff019090a154ca2d2212ac986a96af /src
parenta1f098795934a4ccc8a3e72b779e47b911eae0f4 (diff)
downloadscala-67ab4b8ece5e83575c63a58dd668ff662b5dc7eb.tar.gz
scala-67ab4b8ece5e83575c63a58dd668ff662b5dc7eb.tar.bz2
scala-67ab4b8ece5e83575c63a58dd668ff662b5dc7eb.zip
Implemented #2012.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala9
-rw-r--r--src/actors/scala/actors/ActorGC.scala6
-rw-r--r--src/actors/scala/actors/DefaultExecutorScheduler.scala41
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala48
-rw-r--r--src/actors/scala/actors/ExecutorScheduler.scala5
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala53
-rw-r--r--src/actors/scala/actors/IScheduler.scala66
-rw-r--r--src/actors/scala/actors/Scheduler.scala140
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala12
-rw-r--r--src/actors/scala/actors/SchedulerService.scala26
-rw-r--r--src/actors/scala/actors/SingleThreadedScheduler.scala38
-rw-r--r--src/actors/scala/actors/ThreadPoolConfig.scala43
12 files changed, 246 insertions, 241 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 63d36eb7f8..b9e36a574e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -875,7 +875,7 @@ trait Actor extends AbstractActor {
shouldExit = false
scheduler execute {
- scheduler.actorGC.newActor(Actor.this)
+ scheduler.newActor(Actor.this)
(new Reaction(Actor.this)).run()
}
@@ -1020,11 +1020,14 @@ trait Actor extends AbstractActor {
}
private[actors] def terminated() {
- scheduler.actorGC.terminated(this)
+ scheduler.terminated(this)
}
+ /* Requires qualified private, because <code>RemoteActor</code> must
+ * register termination handler.
+ */
private[actors] def onTerminate(f: => Unit) {
- scheduler.actorGC.onTerminate(this) { f }
+ scheduler.onTerminate(this) { f }
}
}
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index 2e36d4a14a..5906b96921 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.{HashMap, HashSet}
* (e.g. act method finishes, exit explicitly called, an exception is thrown),
* the ActorGC is informed via the <code>terminated</code> method.
*/
-class ActorGC {
+trait ActorGC extends IScheduler {
private var pendingReactions = 0
private val termHandlers = new HashMap[Actor, () => Unit]
@@ -70,13 +70,13 @@ class ActorGC {
pendingReactions <= 0
}
- private[actors] def onTerminate(a: Actor)(f: => Unit) = synchronized {
+ def onTerminate(a: Actor)(f: => Unit) = synchronized {
termHandlers += (a -> (() => f))
}
/* Called only from <code>Reaction</code>.
*/
- private[actors] def terminated(a: Actor) = synchronized {
+ def terminated(a: Actor) = synchronized {
// execute registered termination handler (if any)
termHandlers.get(a) match {
case Some(handler) =>
diff --git a/src/actors/scala/actors/DefaultExecutorScheduler.scala b/src/actors/scala/actors/DefaultExecutorScheduler.scala
index 71e405d975..bfd0a09890 100644
--- a/src/actors/scala/actors/DefaultExecutorScheduler.scala
+++ b/src/actors/scala/actors/DefaultExecutorScheduler.scala
@@ -25,47 +25,10 @@ import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue}
*/
class DefaultExecutorScheduler extends ExecutorScheduler {
- private val rt = Runtime.getRuntime()
- private val minNumThreads = 4
-
- /** The value of the actors.corePoolSize JVM property. This property
- * determines the initial thread pool size.
- */
- private val coreProp = try {
- System.getProperty("actors.corePoolSize")
- } catch {
- case ace: java.security.AccessControlException =>
- null
- }
-
- private val maxProp =
- try {
- System.getProperty("actors.maxPoolSize")
- } catch {
- case ace: java.security.AccessControlException =>
- null
- }
-
- private val initCoreSize =
- if (null ne coreProp) Integer.parseInt(coreProp)
- else {
- val numCores = rt.availableProcessors()
- if (2 * numCores > minNumThreads)
- 2 * numCores
- else
- minNumThreads
- }
-
- private val maxSize =
- if (null ne maxProp) Integer.parseInt(maxProp)
- else 256
-
- private val coreSize = initCoreSize
-
private val workQueue = new LinkedBlockingQueue[Runnable]
- private val threadPool = new ThreadPoolExecutor(coreSize,
- maxSize,
+ private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
+ ThreadPoolConfig.maxPoolSize,
50L,
TimeUnit.MILLISECONDS,
workQueue)
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
new file mode 100644
index 0000000000..dfb55957bb
--- /dev/null
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -0,0 +1,48 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.actors
+
+trait DelegatingScheduler extends IScheduler {
+ protected def makeNewScheduler(): IScheduler
+
+ protected var sched: IScheduler = null
+
+ final def impl = synchronized {
+ if ((sched eq null) || (!sched.isActive))
+ sched = makeNewScheduler()
+ sched
+ }
+
+ final def impl_= (scheduler: IScheduler): Unit = synchronized {
+ //TODO: if there is already a scheduler, should it be shutdown?
+ sched = scheduler
+ }
+
+ /**
+ * Always active because it will just make a new scheduler if required
+ */
+ def isActive: Boolean = true
+
+ def execute(fun: => Unit) = impl.execute(fun)
+
+ def execute(task: Runnable) = impl.execute(task)
+
+ def shutdown(): Unit = synchronized {
+ if (sched ne null) {
+ sched.shutdown()
+ sched = null
+ }
+ }
+
+ def newActor(actor: Actor) = impl.newActor(actor)
+
+ def terminated(actor: Actor) = impl.terminated(actor)
+
+ def onTerminate(actor: Actor)(f: => Unit) = impl.onTerminate(actor)(f)
+}
diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala
index 4b4204543d..72d38d1b2f 100644
--- a/src/actors/scala/actors/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/ExecutorScheduler.scala
@@ -18,7 +18,7 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException}
*
* @author Philipp Haller
*/
-class ExecutorScheduler(var executor: ExecutorService) extends SchedulerService {
+class ExecutorScheduler(protected var executor: ExecutorService) extends SchedulerService {
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
@@ -46,4 +46,7 @@ class ExecutorScheduler(var executor: ExecutorService) extends SchedulerService
*/
def onShutdown(): Unit =
executor.shutdown()
+
+ def isActive =
+ !executor.isShutdown
}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 5832b78dd9..3f336c4536 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -24,60 +24,23 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* @version 0.9.18
* @author Philipp Haller
*/
-class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
+class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Thread with ActorGC {
setDaemon(daemon)
/** Default constructor creates a non-daemon thread. */
def this() =
- this(false)
+ this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false)
- var printStats = false
-
- val rt = Runtime.getRuntime()
- val minNumThreads = 4
-
- /** The value of the actors.corePoolSize JVM property. This property
- * determines the initial thread pool size.
- */
- val coreProp = try {
- System.getProperty("actors.corePoolSize")
- } catch {
- case ace: java.security.AccessControlException =>
- null
- }
- val maxProp =
- try {
- System.getProperty("actors.maxPoolSize")
- } catch {
- case ace: java.security.AccessControlException =>
- null
- }
-
- val initCoreSize =
- if (null ne coreProp) Integer.parseInt(coreProp)
- else {
- val numCores = rt.availableProcessors()
- if (2 * numCores > minNumThreads)
- 2 * numCores
- else
- minNumThreads
- }
+ def this(daemon: Boolean) =
+ this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, daemon)
- val maxSize =
- if (null ne maxProp) Integer.parseInt(maxProp)
- else 256
+ var printStats = false
private var coreSize = initCoreSize
private val executor =
new FJTaskRunnerGroup(coreSize)
- /** The <code>ActorGC</code> instance that keeps track of the
- * live actor objects that are managed by <code>this</code>
- * scheduler.
- */
- val actorGC = new ActorGC
-
private var terminating = false
private var suspending = false
@@ -116,7 +79,7 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
if (!suspending) {
- actorGC.gc()
+ gc()
// check if we need more threads
if (coreSize < maxSize
@@ -126,7 +89,7 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
coreSize += 1
}
else {
- if (actorGC.allTerminated) {
+ if (allTerminated) {
// if all worker threads idle terminate
if (executor.getActiveCount() == 0) {
Debug.info(this+": initiating shutdown...")
@@ -172,4 +135,6 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
executor.snapshot()
}
+ def isActive = !terminating && !suspending
+
}
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
new file mode 100644
index 0000000000..dc2e6961fa
--- /dev/null
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -0,0 +1,66 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * The <code>IScheduler</code> trait provides a common interface
+ * for all schedulers used to execute actor tasks.
+ *
+ * Subclasses of <code>Actor</code> that override its
+ * <code>scheduler</code> member must provide
+ * an implementation of the <code>IScheduler</code>
+ * trait.
+ *
+ * @author Philipp Haller
+ */
+trait IScheduler {
+
+ /** Submits a closure for execution.
+ *
+ * @param fun the closure to be executed
+ */
+ def execute(fun: => Unit): Unit
+
+ /** Submits a <code>Runnable</code> for execution.
+ *
+ * @param task the task to be executed
+ */
+ def execute(task: Runnable): Unit
+
+ /** Shuts down the scheduler.
+ */
+ def shutdown(): Unit
+
+ /** When the scheduler is active, it can execute tasks.
+ */
+ def isActive: Boolean
+
+ /** Registers a newly created actor with this scheduler.
+ *
+ * @param a the actor to be registered
+ */
+ def newActor(a: Actor): Unit
+
+ /** Unregisters an actor from this scheduler, because it
+ * has terminated.
+ *
+ * @param a the actor to be registered
+ */
+ def terminated(a: Actor): Unit
+
+ /** Registers a closure to be executed when the specified
+ * actor terminates.
+ *
+ * @param a the actor
+ * @param f the closure to be registered
+ */
+ def onTerminate(a: Actor)(f: => Unit): Unit
+}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 44371879a3..50b8ecbbea 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -20,35 +20,29 @@ import java.lang.Runnable
* @version 0.9.18
* @author Philipp Haller
*/
-object Scheduler extends IScheduler {
+object Scheduler extends DelegatingScheduler {
Debug.info("initializing "+this+"...")
- private var sched: IScheduler = {
- val s = new DefaultExecutorScheduler
- s.start()
- s
- }
-
- def impl = sched
- def impl_= (scheduler: IScheduler) = {
- sched = scheduler
+ def makeNewScheduler: IScheduler = {
+ val sched = new DefaultExecutorScheduler
+ sched.start()
+ sched
}
private var tasks: LinkedQueue = null
- private var pendingCount = 0
/* Assumes <code>sched</code> holds an instance
* of <code>FJTaskScheduler2</code>.
*/
- def snapshot(): Unit =
+ def snapshot(): Unit = synchronized {
if (sched.isInstanceOf[FJTaskScheduler2]) {
val fjts = sched.asInstanceOf[FJTaskScheduler2]
tasks = fjts.snapshot()
- pendingCount = actorGC.getPendingCount
fjts.shutdown()
} else
error("snapshot operation not supported.")
+ }
/** Shuts down the current scheduler and creates and starts a new scheduler.
*
@@ -62,13 +56,13 @@ object Scheduler extends IScheduler {
*/
def restart(): Unit = synchronized {
// 1. shut down current scheduler
- sched.shutdown()
+ if (sched ne null)
+ sched.shutdown()
// 2. create and start new scheduler
- if (sched.isInstanceOf[FJTaskScheduler2]) {
+ if ((sched ne null) && sched.isInstanceOf[FJTaskScheduler2]) {
sched = {
val s = new FJTaskScheduler2
- actorGC.setPendingCount(pendingCount)
s.start()
s
}
@@ -87,118 +81,4 @@ object Scheduler extends IScheduler {
}
}
- def execute(task: Runnable) {
- sched execute task
- }
-
- def execute(fun: => Unit) {
- sched execute { fun }
- }
-
- def shutdown() = sched.shutdown()
-
- /** The <code>ActorGC</code> instance that keeps track of the
- * live actor objects that are managed by <code>this</code>
- * scheduler.
- */
- def actorGC: ActorGC = sched.actorGC
-
- def onLockup(handler: () => Unit) = sched.onLockup(handler)
- def onLockup(millis: Int)(handler: () => Unit) = sched.onLockup(millis)(handler)
- def printActorDump = sched.printActorDump
-}
-
-
-/**
- * The <code>IScheduler</code> trait provides a common interface
- * for all schedulers used to execute actor tasks.
- *
- * Subclasses of <code>Actor</code> that override its
- * <code>scheduler</code> member must provide
- * an implementation of the <code>IScheduler</code>
- * trait.
- *
- * @version 0.9.18
- * @author Philipp Haller
- */
-trait IScheduler {
-
- /** Submits a closure for execution.
- *
- * @param fun the closure to be executed
- */
- def execute(fun: => Unit): Unit
-
- /** Submits a <code>Runnable</code> for execution.
- *
- * @param task the task to be executed
- */
- def execute(task: Runnable): Unit
-
- /** Shuts down the scheduler.
- */
- def shutdown(): Unit
-
- /** The <code>ActorGC</code> instance that keeps track of the
- * live actor objects that are managed by <code>this</code>
- * <code>IScheduler</code> instance.
- */
- def actorGC: ActorGC
-
- def onLockup(handler: () => Unit): Unit
- def onLockup(millis: Int)(handler: () => Unit): Unit
- def printActorDump: Unit
-
- val QUIT_TASK = new Reaction(null) {
- override def run(): Unit = {}
- override def toString() = "QUIT_TASK"
- }
-}
-
-
-/**
- * This scheduler executes the tasks of an actor on a single
- * thread (the current thread).
- *
- * @version 0.9.18
- * @author Philipp Haller
- */
-class SingleThreadedScheduler extends IScheduler {
-
- def execute(task: Runnable) {
- task.run()
- }
-
- def execute(fun: => Unit): Unit =
- execute(new Runnable {
- def run() { fun }
- })
-
- def shutdown() {}
-
- /** The <code>ActorGC</code> instance that keeps track of the
- * live actor objects that are managed by <code>this</code>
- * scheduler.
- */
- val actorGC: ActorGC = new ActorGC
-
- def onLockup(handler: () => Unit) {}
- def onLockup(millis: Int)(handler: () => Unit) {}
- def printActorDump {}
-}
-
-
-/**
- * The <code>QuitException</code> class is used to manage control flow
- * of certain schedulers and worker threads.
- *
- * @version 0.9.8
- * @author Philipp Haller
- */
-private[actors] class QuitException extends Throwable {
- /*
- For efficiency reasons we do not fill in
- the execution stack trace.
- */
- override def fillInStackTrace(): Throwable = this
}
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index 76133af6b1..c56f3b0f9c 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -32,16 +32,4 @@ trait SchedulerAdapter extends IScheduler {
def shutdown(): Unit =
Scheduler.shutdown()
- /** The <code>ActorGC</code> instance that keeps track of the
- * live actor objects that are managed by <code>this</code>
- * scheduler.
- */
- val actorGC: ActorGC = new ActorGC
-
- def onLockup(handler: () => Unit) {}
-
- def onLockup(millis: Int)(handler: () => Unit) {}
-
- def printActorDump {}
-
}
diff --git a/src/actors/scala/actors/SchedulerService.scala b/src/actors/scala/actors/SchedulerService.scala
index 5c177ba233..1826a153da 100644
--- a/src/actors/scala/actors/SchedulerService.scala
+++ b/src/actors/scala/actors/SchedulerService.scala
@@ -21,18 +21,12 @@ import java.lang.{Runnable, Thread, InterruptedException}
* @version 0.9.18
* @author Philipp Haller
*/
-abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler {
+abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC {
setDaemon(daemon)
def this() =
this(false)
- /** The <code>ActorGC</code> instance that keeps track of the
- * live actor objects that are managed by <code>this</code>
- * scheduler.
- */
- val actorGC = new ActorGC
-
private var terminating = false
def printActorDump {}
@@ -63,9 +57,9 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler
if (terminating)
throw new QuitException
- actorGC.gc()
+ gc()
- if (actorGC.allTerminated)
+ if (allTerminated)
throw new QuitException
}
}
@@ -92,5 +86,19 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler
def shutdown(): Unit = synchronized {
terminating = true
}
+}
+/**
+ * The <code>QuitException</code> class is used to manage control flow
+ * of certain schedulers and worker threads.
+ *
+ * @version 0.9.8
+ * @author Philipp Haller
+ */
+private[actors] class QuitException extends Throwable {
+ /*
+ For efficiency reasons we do not fill in
+ the execution stack trace.
+ */
+ override def fillInStackTrace(): Throwable = this
}
diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala
new file mode 100644
index 0000000000..fa41d02736
--- /dev/null
+++ b/src/actors/scala/actors/SingleThreadedScheduler.scala
@@ -0,0 +1,38 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * This scheduler executes the tasks of an actor on a single
+ * thread (the current thread).
+ *
+ * @version 0.9.18
+ * @author Philipp Haller
+ */
+class SingleThreadedScheduler extends IScheduler {
+
+ def execute(task: Runnable) {
+ task.run()
+ }
+
+ def execute(fun: => Unit): Unit =
+ execute(new Runnable {
+ def run() { fun }
+ })
+
+ def shutdown() {}
+
+ def newActor(actor: Actor) {}
+ def terminated(actor: Actor) {}
+ def onTerminate(actor: Actor)(f: => Unit) {}
+
+ def isActive = true
+}
diff --git a/src/actors/scala/actors/ThreadPoolConfig.scala b/src/actors/scala/actors/ThreadPoolConfig.scala
new file mode 100644
index 0000000000..502ad4e13b
--- /dev/null
+++ b/src/actors/scala/actors/ThreadPoolConfig.scala
@@ -0,0 +1,43 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * @author Erik Engbrecht
+ */
+object ThreadPoolConfig {
+ private val rt = Runtime.getRuntime()
+ private val minNumThreads = 4
+
+ private def getIntegerProp(propName: String): Option[Int] = {
+ try {
+ val prop = System.getProperty(propName)
+ Some(Integer.parseInt(prop))
+ } catch {
+ case ace: java.security.AccessControlException => None
+ case nfe: NumberFormatException => None
+ }
+ }
+
+ val corePoolSize = getIntegerProp("actors.corePoolSize") match {
+ case Some(i) if i > 0 => i
+ case _ => {
+ val byCores = rt.availableProcessors() * 2
+ if (byCores > minNumThreads) byCores else minNumThreads
+ }
+ }
+
+ val maxPoolSize = getIntegerProp("actors.maxPoolSize") match {
+ case Some(i) if (i >= corePoolSize) => i
+ case Some(i) if (i < corePoolSize) => corePoolSize
+ case _ => 256
+ }
+}