From 5295a1a8caa8b67e9492ad2882d857ff899eb527 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Mon, 4 Feb 2008 10:44:05 +0000 Subject: Fixed #219 --- src/actors/scala/actors/FJTaskScheduler2.scala | 32 ++++++++++++++++++++++++ src/actors/scala/actors/Reaction.scala | 16 +++++++++--- src/actors/scala/actors/remote/RemoteActor.scala | 11 +++++--- 3 files changed, 52 insertions(+), 7 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 848eebe169..382aee5f54 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -17,6 +17,8 @@ import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} +import scala.ref._ + /** * FJTaskScheduler2 * @@ -118,6 +120,29 @@ class FJTaskScheduler2 extends Thread with IScheduler { } if (!suspending) { + + // check whether some actors have become + // unreachable + /*def drainRefQ: Unit = + refQ.poll match { + case None => + // do nothing + case refWrapper => + refWrapper.get match { + case None => + // can't get hold of the actor + // count down anyways + unPendReaction + // continue draining + drainRefQ + case Some(a) => + Debug.info(this+": actor is unreachable: "+a) + unPendReaction + // continue draining + drainRefQ + } + }*/ + // check if we need more threads if (Platform.currentTime - lastActivity >= TICK_FREQ && coreSize < maxSize @@ -161,7 +186,14 @@ class FJTaskScheduler2 extends Thread with IScheduler { executor.execute(task) } + //private val refQ = new ReferenceQueue[Actor] + //private var storedRefs: List[WeakReference[Actor]] = List() + def start(task: Runnable) { + /*if (task.isInstanceOf[Reaction]) { + val reaction = task.asInstanceOf[Reaction] + storedRefs = new WeakReference(reaction.a, refQ) :: storedRefs + }*/ pendReaction executor.execute(task) } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 8807efa074..cec1cd9bf3 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -48,9 +48,19 @@ private[actors] class KillActorException extends Throwable { * @version 0.9.10 * @author Philipp Haller */ -/*private[actors]*/ class Reaction(a: Actor, - f: PartialFunction[Any, Unit], - msg: Any) extends Runnable { +class Reaction extends Runnable { + + private[actors] var a: Actor = _ + private var f: PartialFunction[Any, Unit] = _ + private var msg: Any = _ + + def this(a: Actor, f: PartialFunction[Any, Unit], msg: Any) = { + this() + this.a = a + this.f = f + this.msg = msg + } + def this(a: Actor) = this(a, null, null) def run() { diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index b1a2587aa1..f69acaa4ea 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -50,6 +50,10 @@ object RemoteActor { * port. */ def alive(port: Int): Unit = synchronized { + createKernelOnPort(port) + } + + def createKernelOnPort(port: Int): NetKernel = { val serv = TcpService(port) val kern = serv.kernel val s = Actor.self @@ -64,6 +68,8 @@ object RemoteActor { kern.terminate() } } + + kern } /** @@ -87,10 +93,7 @@ object RemoteActor { case None => // establish remotely accessible // return path (sender) - val serv = new TcpService(TcpService.generatePort) - serv.start() - kernels += Pair(Actor.self, serv.kernel) - serv.kernel + createKernelOnPort(TcpService.generatePort) case Some(k) => k } -- cgit v1.2.3