summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-24 20:14:52 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-24 20:14:52 +0000
commitfa3010ed335f5a8647c5fdcecbfbdb01b26fec02 (patch)
tree1373a3339fb60253ed85883df0d4ab01ea08b662 /src/actors
parent67ab4b8ece5e83575c63a58dd668ff662b5dc7eb (diff)
downloadscala-fa3010ed335f5a8647c5fdcecbfbdb01b26fec02.tar.gz
scala-fa3010ed335f5a8647c5fdcecbfbdb01b26fec02.tar.bz2
scala-fa3010ed335f5a8647c5fdcecbfbdb01b26fec02.zip
Added SimpleExecutorScheduler which does not te...
Added SimpleExecutorScheduler which does not terminate automatically.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/ActorGC.scala6
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala3
-rw-r--r--src/actors/scala/actors/SimpleExecutorScheduler.scala105
4 files changed, 112 insertions, 4 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index b9e36a574e..47df95057b 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -1024,7 +1024,7 @@ trait Actor extends AbstractActor {
}
/* Requires qualified private, because <code>RemoteActor</code> must
- * register termination handler.
+ * register a termination handler.
*/
private[actors] def onTerminate(f: => Unit) {
scheduler.onTerminate(this) { f }
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index 5906b96921..a5160d14e2 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -48,7 +48,7 @@ trait ActorGC extends IScheduler {
}
/** Removes unreachable actors from refSet. */
- def gc() = synchronized {
+ protected def gc() = synchronized {
// check for unreachable actors
def drainRefQ() {
val wr = refQ.poll
@@ -62,11 +62,11 @@ trait ActorGC extends IScheduler {
drainRefQ()
}
- def status() {
+ protected def status() {
println(this+": size of refSet: "+refSet.size)
}
- def allTerminated: Boolean = synchronized {
+ protected def allTerminated: Boolean = synchronized {
pendingReactions <= 0
}
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
index dfb55957bb..782ef0da42 100644
--- a/src/actors/scala/actors/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -8,6 +8,9 @@
package scala.actors
+/**
+ * @author Erik Engbrecht
+ */
trait DelegatingScheduler extends IScheduler {
protected def makeNewScheduler(): IScheduler
diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala
new file mode 100644
index 0000000000..d6b2104939
--- /dev/null
+++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala
@@ -0,0 +1,105 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+import scala.collection.mutable.HashMap
+import java.util.concurrent.{ExecutorService, RejectedExecutionException}
+
+/**
+ * The <code>SimpleExecutorScheduler</code> class uses an
+ * <code>ExecutorService</code> to execute <code>Actor</code>s. It
+ * does not start an additional thread. Also, the underlying
+ * <code>ExecutorService</code> is not shut down automatically;
+ * instead, the <code>ExecutorService</code> must be shut down either
+ * directly or by shutting down the
+ * <code>SimpleExecutorScheduler</code> instance.
+ *
+ * @author Philipp Haller
+ */
+class SimpleExecutorScheduler(protected var executor: ExecutorService) extends IScheduler {
+
+ /* Maintains at most one closure per actor that is executed
+ * when the actor terminates.
+ */
+ protected val termHandlers = new HashMap[Actor, () => Unit]
+
+ /* This constructor (and the var above) is currently only used to work
+ * around a bug in scaladoc, which cannot deal with early initializers
+ * (to be used in subclasses such as DefaultExecutorScheduler) properly.
+ */
+ def this() {
+ this(null)
+ }
+
+ /** Submits a <code>Runnable</code> for execution.
+ *
+ * @param task the task to be executed
+ */
+ def execute(task: Runnable) {
+ try {
+ executor execute task
+ } catch {
+ case ree: RejectedExecutionException =>
+ Debug.info("caught "+ree)
+ }
+ }
+
+ /** Submits a closure for execution.
+ *
+ * @param block the closure to be executed
+ */
+ def execute(block: => Unit) {
+ val task = new Runnable {
+ def run() { block }
+ }
+ execute(task)
+ }
+
+ /** Shuts down the scheduler.
+ */
+ def shutdown() {
+ executor.shutdown()
+ }
+
+ /** The scheduler is active if the underlying <code>ExecutorService</code>
+ * has not been shut down.
+ */
+ def isActive =
+ !executor.isShutdown()
+
+ def newActor(a: Actor) {}
+
+ def terminated(a: Actor) {
+ // obtain termination handler (if any)
+ val todo = synchronized {
+ termHandlers.get(a) match {
+ case Some(handler) =>
+ termHandlers -= a
+ () => handler
+ case None =>
+ () => { /* do nothing */ }
+ }
+ }
+
+ // invoke termination handler (if any)
+ todo()
+ }
+
+ /** Registers a closure to be executed when the specified
+ * actor terminates.
+ *
+ * @param a the actor
+ * @param f the closure to be registered
+ */
+ def onTerminate(a: Actor)(block: => Unit) = synchronized {
+ termHandlers += (a -> (() => block))
+ }
+}