summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/ActorGC.scala30
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala4
-rw-r--r--src/actors/scala/actors/Scheduler.scala6
3 files changed, 35 insertions, 5 deletions
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index c1c31ba06c..2e36d4a14a 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -14,20 +14,40 @@ import java.lang.ref.{Reference, WeakReference, ReferenceQueue}
import scala.collection.mutable.{HashMap, HashSet}
+/**
+ * ActorGC keeps track of the number of live actors being managed by a
+ * a scheduler so that it can shutdown when all of the actors it manages have
+ * either been explicitly terminated or garbage collected.
+ *
+ * When an actor is started, it is registered with the ActorGC via the
+ * <code>newActor</code> method, and when an actor is knowingly terminated
+ * (e.g. act method finishes, exit explicitly called, an exception is thrown),
+ * the ActorGC is informed via the <code>terminated</code> method.
+ */
class ActorGC {
private var pendingReactions = 0
private val termHandlers = new HashMap[Actor, () => Unit]
+ /** Actors are added to refQ in newActor. */
private val refQ = new ReferenceQueue[Actor]
+
+ /**
+ * This is a set of references to all the actors registered with
+ * this ActorGC. It is maintained so that the WeakReferences will not be GC'd
+ * before the actors to which they point.
+ */
private val refSet = new HashSet[Reference[t] forSome { type t <: Actor }]
+ /** newActor is invoked whenever a new actor is started. */
def newActor(a: Actor) = synchronized {
+ // registers a reference to the actor with the ReferenceQueue
val wr = new WeakReference[Actor](a, refQ)
refSet += wr
pendingReactions += 1
}
+ /** Removes unreachable actors from refSet. */
def gc() = synchronized {
// check for unreachable actors
def drainRefQ() {
@@ -67,6 +87,16 @@ class ActorGC {
// do nothing
}
+ // find the weak reference that points to the terminated actor, if any
+ refSet.find((ref: Reference[t] forSome { type t <: Actor }) => ref.get() == a) match {
+ case Some(r) =>
+ // invoking clear will not cause r to be enqueued
+ r.clear()
+ refSet -= r
+ case None =>
+ // do nothing
+ }
+
pendingReactions -= 1
}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 21915eb1a3..5832b78dd9 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
setDaemon(daemon)
+ /** Default constructor creates a non-daemon thread. */
def this() =
this(false)
@@ -35,6 +36,9 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler {
val rt = Runtime.getRuntime()
val minNumThreads = 4
+ /** The value of the actors.corePoolSize JVM property. This property
+ * determines the initial thread pool size.
+ */
val coreProp = try {
System.getProperty("actors.corePoolSize")
} catch {
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 9db7b48568..3a6a205a34 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -11,11 +11,7 @@
package scala.actors
import compat.Platform
-
-import java.lang.{Runnable, Thread, InterruptedException}
-
-import scala.collection.Set
-import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
+import java.lang.Runnable
/**
* The <code>Scheduler</code> object is used by