From 699e811f1a1a9970ebed6ee5ec366aa8aefea6cf Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Sun, 3 Aug 2008 21:15:01 +0000 Subject: Separated actor GC from scheduling. --- src/actors/scala/actors/Actor.scala | 2 +- src/actors/scala/actors/ActorGC.scala | 78 ++++++++++++++++++++++++ src/actors/scala/actors/FJTaskScheduler2.scala | 42 +------------ src/actors/scala/actors/Reaction.scala | 4 +- src/actors/scala/actors/Scheduler.scala | 55 ++++++----------- src/actors/scala/actors/remote/RemoteActor.scala | 2 +- 6 files changed, 104 insertions(+), 79 deletions(-) create mode 100644 src/actors/scala/actors/ActorGC.scala diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 3ec610e624..2edb360ae9 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -278,7 +278,7 @@ object Actor { * @param to the actor to link to * @return */ - def link(to: Actor): Actor = self.link(to) + def link(to: AbstractActor): AbstractActor = self.link(to) /** * Links self to actor defined by body. diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala new file mode 100644 index 0000000000..d6561c331b --- /dev/null +++ b/src/actors/scala/actors/ActorGC.scala @@ -0,0 +1,78 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id:$ + +package scala.actors + +import java.lang.ref.{WeakReference, ReferenceQueue} + +import scala.collection.mutable.HashMap + +object ActorGC { + + private var pendingReactions = 0 + private val termHandlers = new HashMap[Actor, () => Unit] + + private val refQ = new ReferenceQueue[Actor] + private var storedRefs: List[WeakReference[Actor]] = List() + + def newActor(a: Actor) = synchronized { + val wr = new WeakReference[Actor](a, refQ) + //Debug.info("created "+wr+" pointing to "+a) + storedRefs = wr :: storedRefs + + pendingReactions += 1 + } + + def gc() = synchronized { + // check for unreachable actors + def drainRefQ() { + val wr = refQ.poll + if (wr != null) { + pendingReactions -= 1 + // continue draining + drainRefQ() + } + } + drainRefQ() + } + + def allTerminated: Boolean = synchronized { + pendingReactions <= 0 + } + + private[actors] def onTerminate(a: Actor)(f: => Unit) = synchronized { + termHandlers += (a -> (() => f)) + } + + /* Called only from Reaction. + */ + private[actors] def terminated(a: Actor) = synchronized { + // execute registered termination handler (if any) + termHandlers.get(a) match { + case Some(handler) => + handler() + // remove mapping + termHandlers -= a + case None => + // do nothing + } + + pendingReactions -= 1 + } + + private[actors] def getPendingCount = synchronized { + pendingReactions + } + + private[actors] def setPendingCount(cnt: Int) = synchronized { + pendingReactions = cnt + } + +} diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index fb34081c20..34095da299 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -17,8 +17,6 @@ import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} -import java.lang.ref.{WeakReference, ReferenceQueue} - /** * FJTaskScheduler2 * @@ -74,26 +72,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { private var submittedTasks = 0 - private var pendingReactions = 0 - - def pendReaction: Unit = synchronized { - pendingReactions += 1 - } - - def unPendReaction: Unit = synchronized { - pendingReactions -= 1 - } - - def getPendingCount = synchronized { - pendingReactions - } - - def setPendingCount(cnt: Int) = synchronized { - pendingReactions = cnt - } - def printActorDump {} - def terminated(a: Actor) {} private val TICK_FREQ = 50 private val CHECK_FREQ = 100 @@ -121,16 +100,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { if (!suspending) { - // check for unreachable actors - def drainRefQ() { - val wr = refQ.poll - if (wr != null) { - unPendReaction - // continue draining - drainRefQ() - } - } - drainRefQ() + ActorGC.gc() // check if we need more threads if (Platform.currentTime - lastActivity >= TICK_FREQ @@ -141,7 +111,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { lastActivity = Platform.currentTime } else { - if (pendingReactions <= 0) { + if (ActorGC.allTerminated) { // if all worker threads idle terminate if (executor.getActiveCount() == 0) { Debug.info(this+": initiating shutdown...") @@ -175,17 +145,11 @@ class FJTaskScheduler2 extends Thread with IScheduler { executor.execute(task) } - private val refQ = new ReferenceQueue[Actor] - private var storedRefs: List[WeakReference[Actor]] = List() - def start(task: Runnable) { if (task.isInstanceOf[Reaction]) { val reaction = task.asInstanceOf[Reaction] - val wr = new WeakReference[Actor](reaction.a, refQ) - //Debug.info("created "+wr+" pointing to "+reaction.a) - storedRefs = wr :: storedRefs + ActorGC.newActor(reaction.a) } - pendReaction executor.execute(task) } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index dc64ed1f2e..f344cfba18 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -85,14 +85,14 @@ class Reaction extends Runnable { catch { case eae: ExitActorException => { //Debug.info(a+": exiting...") - Scheduler.unPendReaction(a) + ActorGC.terminated(a) } case _: SuspendActorException => { // do nothing (continuation is already saved) } case t: Throwable => { Debug.info(a+": caught "+t) - Scheduler.unPendReaction(a) + ActorGC.terminated(a) // links a.synchronized { if (!a.links.isEmpty) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 8396c20f47..41a69d12e0 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -26,12 +26,11 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * @author Philipp Haller */ object Scheduler { - private var sched: IScheduler = - { - var s: IScheduler = new FJTaskScheduler2 - s.start() - s - } + private var sched: IScheduler = { + var s: IScheduler = new FJTaskScheduler2 + s.start() + s + } def impl = sched def impl_= (scheduler: IScheduler) = { @@ -44,14 +43,17 @@ object Scheduler { def snapshot(): Unit = { tasks = sched.snapshot() - pendingCount = sched.asInstanceOf[FJTaskScheduler2].getPendingCount + pendingCount = ActorGC.getPendingCount sched.shutdown() } + /* Creates an instance of class FJTaskScheduler2 + * and submits tasks for execution. + */ def restart(): Unit = synchronized { sched = { var s: IScheduler = new FJTaskScheduler2 - s.asInstanceOf[FJTaskScheduler2].setPendingCount(pendingCount) + ActorGC.setPendingCount(pendingCount) s.start() s } @@ -62,6 +64,10 @@ object Scheduler { tasks = null } + /* The following two methods (start and + * execute) are called from within + * Actor to submit tasks for execution. + */ def start(task: Runnable) = sched.start(task) def execute(task: Runnable) = { @@ -76,29 +82,12 @@ object Scheduler { } else sched.execute(task) } + /* This method is used to notify the scheduler + * of library activity by the argument Actor. + * + * It is only called from within Actor. + */ def tick(a: Actor) = sched.tick(a) - def terminated(a: Actor) = sched.terminated(a) - def pendReaction: Unit = sched.pendReaction - - private val termHandlers = new HashMap[Actor, () => Unit] - def onTerminate(a: Actor)(f: => Unit) { - termHandlers += (a -> (() => f)) - } - - def unPendReaction(a: Actor) = synchronized { - // execute registered termination handler (if any) - termHandlers.get(a) match { - case Some(handler) => - handler() - // remove mapping - termHandlers -= a - case None => - // do nothing - } - - // notify scheduler - sched.unPendReaction - } def shutdown() = sched.shutdown() @@ -123,9 +112,6 @@ trait IScheduler { def getTask(worker: WorkerThread): Runnable def tick(a: Actor): Unit - def terminated(a: Actor): Unit - def pendReaction: Unit - def unPendReaction: Unit def snapshot(): LinkedQueue def shutdown(): Unit @@ -179,9 +165,6 @@ class SingleThreadedScheduler extends IScheduler { def getTask(worker: WorkerThread): Runnable = null def tick(a: Actor) {} - def terminated(a: Actor) {} - def pendReaction {} - def unPendReaction {} def shutdown() {} def snapshot(): LinkedQueue = { null } diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index ee64f1d9a8..65c322c975 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -73,7 +73,7 @@ object RemoteActor { val s = Actor.self kernels += Pair(s, kern) - Scheduler.onTerminate(s) { + ActorGC.onTerminate(s) { Debug.info("alive actor "+s+" terminated") // remove mapping for `s` kernels -= s -- cgit v1.2.3