summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-08-03 21:15:01 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-08-03 21:15:01 +0000
commit699e811f1a1a9970ebed6ee5ec366aa8aefea6cf (patch)
treeb0f519ff490c9d9ea8ea4aab5dc4fed0adc9d49a /src/actors
parent10582aff64958a99ee3819cfffc49fa168df478d (diff)
downloadscala-699e811f1a1a9970ebed6ee5ec366aa8aefea6cf.tar.gz
scala-699e811f1a1a9970ebed6ee5ec366aa8aefea6cf.tar.bz2
scala-699e811f1a1a9970ebed6ee5ec366aa8aefea6cf.zip
Separated actor GC from scheduling.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/ActorGC.scala78
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala42
-rw-r--r--src/actors/scala/actors/Reaction.scala4
-rw-r--r--src/actors/scala/actors/Scheduler.scala55
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala2
6 files changed, 104 insertions, 79 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 3ec610e624..2edb360ae9 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -278,7 +278,7 @@ object Actor {
* @param to the actor to link to
* @return
*/
- def link(to: Actor): Actor = self.link(to)
+ def link(to: AbstractActor): AbstractActor = self.link(to)
/**
* Links <code>self</code> to actor defined by <code>body</code>.
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
new file mode 100644
index 0000000000..d6561c331b
--- /dev/null
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -0,0 +1,78 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id:$
+
+package scala.actors
+
+import java.lang.ref.{WeakReference, ReferenceQueue}
+
+import scala.collection.mutable.HashMap
+
+object ActorGC {
+
+ private var pendingReactions = 0
+ private val termHandlers = new HashMap[Actor, () => Unit]
+
+ private val refQ = new ReferenceQueue[Actor]
+ private var storedRefs: List[WeakReference[Actor]] = List()
+
+ def newActor(a: Actor) = synchronized {
+ val wr = new WeakReference[Actor](a, refQ)
+ //Debug.info("created "+wr+" pointing to "+a)
+ storedRefs = wr :: storedRefs
+
+ pendingReactions += 1
+ }
+
+ def gc() = synchronized {
+ // check for unreachable actors
+ def drainRefQ() {
+ val wr = refQ.poll
+ if (wr != null) {
+ pendingReactions -= 1
+ // continue draining
+ drainRefQ()
+ }
+ }
+ drainRefQ()
+ }
+
+ def allTerminated: Boolean = synchronized {
+ pendingReactions <= 0
+ }
+
+ private[actors] def onTerminate(a: Actor)(f: => Unit) = synchronized {
+ termHandlers += (a -> (() => f))
+ }
+
+ /* Called only from <code>Reaction</code>.
+ */
+ private[actors] def terminated(a: Actor) = synchronized {
+ // execute registered termination handler (if any)
+ termHandlers.get(a) match {
+ case Some(handler) =>
+ handler()
+ // remove mapping
+ termHandlers -= a
+ case None =>
+ // do nothing
+ }
+
+ pendingReactions -= 1
+ }
+
+ private[actors] def getPendingCount = synchronized {
+ pendingReactions
+ }
+
+ private[actors] def setPendingCount(cnt: Int) = synchronized {
+ pendingReactions = cnt
+ }
+
+}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index fb34081c20..34095da299 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -17,8 +17,6 @@ import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
-import java.lang.ref.{WeakReference, ReferenceQueue}
-
/**
* FJTaskScheduler2
*
@@ -74,26 +72,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var submittedTasks = 0
- private var pendingReactions = 0
-
- def pendReaction: Unit = synchronized {
- pendingReactions += 1
- }
-
- def unPendReaction: Unit = synchronized {
- pendingReactions -= 1
- }
-
- def getPendingCount = synchronized {
- pendingReactions
- }
-
- def setPendingCount(cnt: Int) = synchronized {
- pendingReactions = cnt
- }
-
def printActorDump {}
- def terminated(a: Actor) {}
private val TICK_FREQ = 50
private val CHECK_FREQ = 100
@@ -121,16 +100,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
if (!suspending) {
- // check for unreachable actors
- def drainRefQ() {
- val wr = refQ.poll
- if (wr != null) {
- unPendReaction
- // continue draining
- drainRefQ()
- }
- }
- drainRefQ()
+ ActorGC.gc()
// check if we need more threads
if (Platform.currentTime - lastActivity >= TICK_FREQ
@@ -141,7 +111,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
lastActivity = Platform.currentTime
}
else {
- if (pendingReactions <= 0) {
+ if (ActorGC.allTerminated) {
// if all worker threads idle terminate
if (executor.getActiveCount() == 0) {
Debug.info(this+": initiating shutdown...")
@@ -175,17 +145,11 @@ class FJTaskScheduler2 extends Thread with IScheduler {
executor.execute(task)
}
- private val refQ = new ReferenceQueue[Actor]
- private var storedRefs: List[WeakReference[Actor]] = List()
-
def start(task: Runnable) {
if (task.isInstanceOf[Reaction]) {
val reaction = task.asInstanceOf[Reaction]
- val wr = new WeakReference[Actor](reaction.a, refQ)
- //Debug.info("created "+wr+" pointing to "+reaction.a)
- storedRefs = wr :: storedRefs
+ ActorGC.newActor(reaction.a)
}
- pendReaction
executor.execute(task)
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index dc64ed1f2e..f344cfba18 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -85,14 +85,14 @@ class Reaction extends Runnable {
catch {
case eae: ExitActorException => {
//Debug.info(a+": exiting...")
- Scheduler.unPendReaction(a)
+ ActorGC.terminated(a)
}
case _: SuspendActorException => {
// do nothing (continuation is already saved)
}
case t: Throwable => {
Debug.info(a+": caught "+t)
- Scheduler.unPendReaction(a)
+ ActorGC.terminated(a)
// links
a.synchronized {
if (!a.links.isEmpty)
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 8396c20f47..41a69d12e0 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -26,12 +26,11 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* @author Philipp Haller
*/
object Scheduler {
- private var sched: IScheduler =
- {
- var s: IScheduler = new FJTaskScheduler2
- s.start()
- s
- }
+ private var sched: IScheduler = {
+ var s: IScheduler = new FJTaskScheduler2
+ s.start()
+ s
+ }
def impl = sched
def impl_= (scheduler: IScheduler) = {
@@ -44,14 +43,17 @@ object Scheduler {
def snapshot(): Unit = {
tasks = sched.snapshot()
- pendingCount = sched.asInstanceOf[FJTaskScheduler2].getPendingCount
+ pendingCount = ActorGC.getPendingCount
sched.shutdown()
}
+ /* Creates an instance of class <code>FJTaskScheduler2</code>
+ * and submits <code>tasks</code> for execution.
+ */
def restart(): Unit = synchronized {
sched = {
var s: IScheduler = new FJTaskScheduler2
- s.asInstanceOf[FJTaskScheduler2].setPendingCount(pendingCount)
+ ActorGC.setPendingCount(pendingCount)
s.start()
s
}
@@ -62,6 +64,10 @@ object Scheduler {
tasks = null
}
+ /* The following two methods (<code>start</code> and
+ * <code>execute</code>) are called from within
+ * <code>Actor</code> to submit tasks for execution.
+ */
def start(task: Runnable) = sched.start(task)
def execute(task: Runnable) = {
@@ -76,29 +82,12 @@ object Scheduler {
} else sched.execute(task)
}
+ /* This method is used to notify the scheduler
+ * of library activity by the argument Actor.
+ *
+ * It is only called from within <code>Actor</code>.
+ */
def tick(a: Actor) = sched.tick(a)
- def terminated(a: Actor) = sched.terminated(a)
- def pendReaction: Unit = sched.pendReaction
-
- private val termHandlers = new HashMap[Actor, () => Unit]
- def onTerminate(a: Actor)(f: => Unit) {
- termHandlers += (a -> (() => f))
- }
-
- def unPendReaction(a: Actor) = synchronized {
- // execute registered termination handler (if any)
- termHandlers.get(a) match {
- case Some(handler) =>
- handler()
- // remove mapping
- termHandlers -= a
- case None =>
- // do nothing
- }
-
- // notify scheduler
- sched.unPendReaction
- }
def shutdown() = sched.shutdown()
@@ -123,9 +112,6 @@ trait IScheduler {
def getTask(worker: WorkerThread): Runnable
def tick(a: Actor): Unit
- def terminated(a: Actor): Unit
- def pendReaction: Unit
- def unPendReaction: Unit
def snapshot(): LinkedQueue
def shutdown(): Unit
@@ -179,9 +165,6 @@ class SingleThreadedScheduler extends IScheduler {
def getTask(worker: WorkerThread): Runnable = null
def tick(a: Actor) {}
- def terminated(a: Actor) {}
- def pendReaction {}
- def unPendReaction {}
def shutdown() {}
def snapshot(): LinkedQueue = { null }
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index ee64f1d9a8..65c322c975 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -73,7 +73,7 @@ object RemoteActor {
val s = Actor.self
kernels += Pair(s, kern)
- Scheduler.onTerminate(s) {
+ ActorGC.onTerminate(s) {
Debug.info("alive actor "+s+" terminated")
// remove mapping for `s`
kernels -= s