diff options
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 9 | ||||
-rw-r--r-- | src/actors/scala/actors/ActorGC.scala | 5 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 10 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 22 | ||||
-rw-r--r-- | src/actors/scala/actors/SchedulerAdapter.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 2 |
7 files changed, 48 insertions, 12 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index dc9b8110bc..84c9d497a0 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -781,7 +781,7 @@ trait Actor extends AbstractActor { shouldExit = false scheduler execute { - ActorGC.newActor(Actor.this) + scheduler.actorGC.newActor(Actor.this) (new Reaction(Actor.this)).run() } @@ -923,6 +923,13 @@ trait Actor extends AbstractActor { } } + private[actors] def terminated() { + scheduler.actorGC.terminated(this) + } + + private[actors] def onTerminate(f: => Unit) { + scheduler.actorGC.onTerminate(this) { f } + } } diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala index 20569190ff..c1c31ba06c 100644 --- a/src/actors/scala/actors/ActorGC.scala +++ b/src/actors/scala/actors/ActorGC.scala @@ -11,11 +11,10 @@ package scala.actors import java.lang.ref.{Reference, WeakReference, ReferenceQueue} -import java.util.WeakHashMap import scala.collection.mutable.{HashMap, HashSet} -object ActorGC { +class ActorGC { private var pendingReactions = 0 private val termHandlers = new HashMap[Actor, () => Unit] @@ -44,7 +43,7 @@ object ActorGC { } def status() { - println("ActorGC: size of refSet: "+refSet.size) + println(this+": size of refSet: "+refSet.size) } def allTerminated: Boolean = synchronized { diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index cba71f70e9..096f703ddd 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -66,6 +66,12 @@ class FJTaskScheduler2 extends Thread with IScheduler { private val executor = new FJTaskRunnerGroup(coreSize) + /** The <code>ActorGC</code> instance that keeps track of the + * live actor objects that are managed by <code>this</code> + * scheduler. + */ + val actorGC = new ActorGC + private var terminating = false private var suspending = false @@ -104,7 +110,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { if (!suspending) { - ActorGC.gc() + actorGC.gc() // check if we need more threads if (coreSize < maxSize @@ -114,7 +120,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { coreSize += 1 } else { - if (ActorGC.allTerminated) { + if (actorGC.allTerminated) { // if all worker threads idle terminate if (executor.getActiveCount() == 0) { Debug.info(this+": initiating shutdown...") diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 5940628124..9f44da841c 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -85,14 +85,14 @@ class Reaction extends Runnable { catch { case eae: ExitActorException => { //Debug.info(a+": exiting...") - ActorGC.terminated(a) + a.terminated() } case _: SuspendActorException => { // do nothing (continuation is already saved) } case t: Throwable => { Debug.info(a+": caught "+t) - ActorGC.terminated(a) + a.terminated() // links a.synchronized { if (!a.links.isEmpty) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index bc2c093c0e..77e13a5fcc 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -47,7 +47,7 @@ object Scheduler extends IScheduler { if (sched.isInstanceOf[FJTaskScheduler2]) { val fjts = sched.asInstanceOf[FJTaskScheduler2] tasks = fjts.snapshot() - pendingCount = ActorGC.getPendingCount + pendingCount = actorGC.getPendingCount fjts.shutdown() } else error("snapshot operation not supported.") @@ -58,7 +58,7 @@ object Scheduler extends IScheduler { def restart(): Unit = synchronized { sched = { val s = new FJTaskScheduler2 - ActorGC.setPendingCount(pendingCount) + actorGC.setPendingCount(pendingCount) s.start() s } @@ -93,6 +93,12 @@ object Scheduler extends IScheduler { def shutdown() = sched.shutdown() + /** The <code>ActorGC</code> instance that keeps track of the + * live actor objects that are managed by <code>this</code> + * scheduler. + */ + def actorGC: ActorGC = sched.actorGC + def onLockup(handler: () => Unit) = sched.onLockup(handler) def onLockup(millis: Int)(handler: () => Unit) = sched.onLockup(millis)(handler) def printActorDump = sched.printActorDump @@ -129,6 +135,12 @@ trait IScheduler { */ def shutdown(): Unit + /** The <code>ActorGC</code> instance that keeps track of the + * live actor objects that are managed by <code>this</code> + * <code>IScheduler</code> instance. + */ + def actorGC: ActorGC + def onLockup(handler: () => Unit): Unit def onLockup(millis: Int)(handler: () => Unit): Unit def printActorDump: Unit @@ -160,6 +172,12 @@ class SingleThreadedScheduler extends IScheduler { def shutdown() {} + /** The <code>ActorGC</code> instance that keeps track of the + * live actor objects that are managed by <code>this</code> + * scheduler. + */ + val actorGC: ActorGC = new ActorGC + def onLockup(handler: () => Unit) {} def onLockup(millis: Int)(handler: () => Unit) {} def printActorDump {} diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index 875b06eb24..76133af6b1 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -13,7 +13,7 @@ package scala.actors * * Providing an implementation for the * <code>execute(f: => Unit)</code> method is sufficient to - * obtain a concrete <code>IScheduler</code> class. + * obtain a concrete class that extends <code>IScheduler</code>. * * @version 0.9.18 * @author Philipp Haller @@ -32,6 +32,12 @@ trait SchedulerAdapter extends IScheduler { def shutdown(): Unit = Scheduler.shutdown() + /** The <code>ActorGC</code> instance that keeps track of the + * live actor objects that are managed by <code>this</code> + * scheduler. + */ + val actorGC: ActorGC = new ActorGC + def onLockup(handler: () => Unit) {} def onLockup(millis: Int)(handler: () => Unit) {} diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index e9ec7794d6..a5adee6fff 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -73,7 +73,7 @@ object RemoteActor { val s = Actor.self kernels += Pair(s, kern) - ActorGC.onTerminate(s) { + s.onTerminate { Debug.info("alive actor "+s+" terminated") // remove mapping for `s` kernels -= s |