diff options
-rw-r--r-- | src/actors/scala/actors/ActorGC.scala | 30 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 6 |
3 files changed, 35 insertions, 5 deletions
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index c1c31ba06c..2e36d4a14a 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -14,20 +14,40 @@ import java.lang.ref.{Reference, WeakReference, ReferenceQueue} import scala.collection.mutable.{HashMap, HashSet} +/** + * ActorGC keeps track of the number of live actors being managed by a + * a scheduler so that it can shutdown when all of the actors it manages have + * either been explicitly terminated or garbage collected. + * + * When an actor is started, it is registered with the ActorGC via the + * <code>newActor</code> method, and when an actor is knowingly terminated + * (e.g. act method finishes, exit explicitly called, an exception is thrown), + * the ActorGC is informed via the <code>terminated</code> method. + */ class ActorGC { private var pendingReactions = 0 private val termHandlers = new HashMap[Actor, () => Unit] + /** Actors are added to refQ in newActor. */ private val refQ = new ReferenceQueue[Actor] + + /** + * This is a set of references to all the actors registered with + * this ActorGC. It is maintained so that the WeakReferences will not be GC'd + * before the actors to which they point. + */ private val refSet = new HashSet[Reference[t] forSome { type t <: Actor }] + /** newActor is invoked whenever a new actor is started. */ def newActor(a: Actor) = synchronized { + // registers a reference to the actor with the ReferenceQueue val wr = new WeakReference[Actor](a, refQ) refSet += wr pendingReactions += 1 } + /** Removes unreachable actors from refSet. */ def gc() = synchronized { // check for unreachable actors def drainRefQ() { @@ -67,6 +87,16 @@ class ActorGC { // do nothing } + // find the weak reference that points to the terminated actor, if any + refSet.find((ref: Reference[t] forSome { type t <: Actor }) => ref.get() == a) match { + case Some(r) => + // invoking clear will not cause r to be enqueued + r.clear() + refSet -= r + case None => + // do nothing + } + pendingReactions -= 1 } diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 21915eb1a3..5832b78dd9 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler { setDaemon(daemon) + /** Default constructor creates a non-daemon thread. */ def this() = this(false) @@ -35,6 +36,9 @@ class FJTaskScheduler2(daemon: Boolean) extends Thread with IScheduler { val rt = Runtime.getRuntime() val minNumThreads = 4 + /** The value of the actors.corePoolSize JVM property. This property + * determines the initial thread pool size. + */ val coreProp = try { System.getProperty("actors.corePoolSize") } catch { diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 9db7b48568..3a6a205a34 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -11,11 +11,7 @@ package scala.actors import compat.Platform - -import java.lang.{Runnable, Thread, InterruptedException} - -import scala.collection.Set -import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} +import java.lang.Runnable /** * The <code>Scheduler</code> object is used by |