From d7839e8a6dc23049f9b0cc979c5e2ba54ce07a5a Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 22 Jan 2009 16:43:42 +0000 Subject: ActorGC is no longer a global object; each sche... ActorGC is no longer a global object; each scheduler provides its own ActorGC instance. Prepares for resolution of #1405. --- src/actors/scala/actors/Actor.scala | 9 ++++++++- src/actors/scala/actors/ActorGC.scala | 5 ++--- src/actors/scala/actors/FJTaskScheduler2.scala | 10 ++++++++-- src/actors/scala/actors/Reaction.scala | 4 ++-- src/actors/scala/actors/Scheduler.scala | 22 ++++++++++++++++++++-- src/actors/scala/actors/SchedulerAdapter.scala | 8 +++++++- src/actors/scala/actors/remote/RemoteActor.scala | 2 +- 7 files changed, 48 insertions(+), 12 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index dc9b8110bc..84c9d497a0 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -781,7 +781,7 @@ trait Actor extends AbstractActor { shouldExit = false scheduler execute { - ActorGC.newActor(Actor.this) + scheduler.actorGC.newActor(Actor.this) (new Reaction(Actor.this)).run() } @@ -923,6 +923,13 @@ trait Actor extends AbstractActor { } } + private[actors] def terminated() { + scheduler.actorGC.terminated(this) + } + + private[actors] def onTerminate(f: => Unit) { + scheduler.actorGC.onTerminate(this) { f } + } } diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index 20569190ff..c1c31ba06c 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -11,11 +11,10 @@ package scala.actors import java.lang.ref.{Reference, WeakReference, ReferenceQueue} -import java.util.WeakHashMap import scala.collection.mutable.{HashMap, HashSet} -object ActorGC { +class ActorGC { private var pendingReactions = 0 private val termHandlers = new HashMap[Actor, () => Unit] @@ -44,7 +43,7 @@ object ActorGC { } def status() { - println("ActorGC: size of refSet: "+refSet.size) + println(this+": size of refSet: "+refSet.size) } def allTerminated: Boolean = synchronized { diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index cba71f70e9..096f703ddd 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -66,6 +66,12 @@ class FJTaskScheduler2 extends Thread with IScheduler { private val executor = new FJTaskRunnerGroup(coreSize) + /** The ActorGC instance that keeps track of the + * live actor objects that are managed by this + * scheduler. + */ + val actorGC = new ActorGC + private var terminating = false private var suspending = false @@ -104,7 +110,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { if (!suspending) { - ActorGC.gc() + actorGC.gc() // check if we need more threads if (coreSize < maxSize @@ -114,7 +120,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { coreSize += 1 } else { - if (ActorGC.allTerminated) { + if (actorGC.allTerminated) { // if all worker threads idle terminate if (executor.getActiveCount() == 0) { Debug.info(this+": initiating shutdown...") diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 5940628124..9f44da841c 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -85,14 +85,14 @@ class Reaction extends Runnable { catch { case eae: ExitActorException => { //Debug.info(a+": exiting...") - ActorGC.terminated(a) + a.terminated() } case _: SuspendActorException => { // do nothing (continuation is already saved) } case t: Throwable => { Debug.info(a+": caught "+t) - ActorGC.terminated(a) + a.terminated() // links a.synchronized { if (!a.links.isEmpty) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index bc2c093c0e..77e13a5fcc 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -47,7 +47,7 @@ object Scheduler extends IScheduler { if (sched.isInstanceOf[FJTaskScheduler2]) { val fjts = sched.asInstanceOf[FJTaskScheduler2] tasks = fjts.snapshot() - pendingCount = ActorGC.getPendingCount + pendingCount = actorGC.getPendingCount fjts.shutdown() } else error("snapshot operation not supported.") @@ -58,7 +58,7 @@ object Scheduler extends IScheduler { def restart(): Unit = synchronized { sched = { val s = new FJTaskScheduler2 - ActorGC.setPendingCount(pendingCount) + actorGC.setPendingCount(pendingCount) s.start() s } @@ -93,6 +93,12 @@ object Scheduler extends IScheduler { def shutdown() = sched.shutdown() + /** The ActorGC instance that keeps track of the + * live actor objects that are managed by this + * scheduler. + */ + def actorGC: ActorGC = sched.actorGC + def onLockup(handler: () => Unit) = sched.onLockup(handler) def onLockup(millis: Int)(handler: () => Unit) = sched.onLockup(millis)(handler) def printActorDump = sched.printActorDump @@ -129,6 +135,12 @@ trait IScheduler { */ def shutdown(): Unit + /** The ActorGC instance that keeps track of the + * live actor objects that are managed by this + * IScheduler instance. + */ + def actorGC: ActorGC + def onLockup(handler: () => Unit): Unit def onLockup(millis: Int)(handler: () => Unit): Unit def printActorDump: Unit @@ -160,6 +172,12 @@ class SingleThreadedScheduler extends IScheduler { def shutdown() {} + /** The ActorGC instance that keeps track of the + * live actor objects that are managed by this + * scheduler. + */ + val actorGC: ActorGC = new ActorGC + def onLockup(handler: () => Unit) {} def onLockup(millis: Int)(handler: () => Unit) {} def printActorDump {} diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index 875b06eb24..76133af6b1 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -13,7 +13,7 @@ package scala.actors * * Providing an implementation for the * execute(f: => Unit) method is sufficient to - * obtain a concrete IScheduler class. + * obtain a concrete class that extends IScheduler. * * @version 0.9.18 * @author Philipp Haller @@ -32,6 +32,12 @@ trait SchedulerAdapter extends IScheduler { def shutdown(): Unit = Scheduler.shutdown() + /** The ActorGC instance that keeps track of the + * live actor objects that are managed by this + * scheduler. + */ + val actorGC: ActorGC = new ActorGC + def onLockup(handler: () => Unit) {} def onLockup(millis: Int)(handler: () => Unit) {} diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index e9ec7794d6..a5adee6fff 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -73,7 +73,7 @@ object RemoteActor { val s = Actor.self kernels += Pair(s, kern) - ActorGC.onTerminate(s) { + s.onTerminate { Debug.info("alive actor "+s+" terminated") // remove mapping for `s` kernels -= s -- cgit v1.2.3