diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-11-26 14:16:59 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-11-26 14:16:59 +0000 |
commit | e4d31aed1f5a15cd3da7e6ae4093160142625b77 (patch) | |
tree | 473bd4520692a4e47b9c70112654701c20b85046 /src | |
parent | 83ac2b566929edb17629d20bf4dcba3b8d2bd14e (diff) | |
download | scala-e4d31aed1f5a15cd3da7e6ae4093160142625b77.tar.gz scala-e4d31aed1f5a15cd3da7e6ae4093160142625b77.tar.bz2 scala-e4d31aed1f5a15cd3da7e6ae4093160142625b77.zip |
Fixed ticket #219.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 5 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 19 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 15 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 12 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/Service.scala | 3 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/TcpService.scala | 28 |
6 files changed, 63 insertions, 19 deletions
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index 9f46e9c0ce..1e8321e351 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -56,7 +56,8 @@ private[actors] class ExitActorException extends Throwable } catch { case eae: ExitActorException => { - Scheduler.unPendReaction + //Debug.info(a+": exiting...") + Scheduler.unPendReaction(a) } case _: SuspendActorException => { // do nothing (continuation is already saved) @@ -64,7 +65,7 @@ private[actors] class ExitActorException extends Throwable case t: Throwable => { Debug.info(a+": caught "+t) t.printStackTrace() - Scheduler.unPendReaction + Scheduler.unPendReaction(a) // links a.synchronized { if (!a.links.isEmpty) diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index e92e76a803..ddf6df164b 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * The <code>Scheduler</code> object is used by * <code>Actor</code> to execute tasks of an execution of an actor. * - * @version 0.9.8 + * @version 0.9.10 * @author Philipp Haller */ object Scheduler { @@ -79,7 +79,22 @@ object Scheduler { def tick(a: Actor) = sched.tick(a) def terminated(a: Actor) = sched.terminated(a) def pendReaction: Unit = sched.pendReaction - def unPendReaction: Unit = sched.unPendReaction + + private val termHandlers = new HashMap[Actor, () => Unit] + def onTerminate(a: Actor)(f: => Unit) { + termHandlers += (a -> (() => f)) + } + + def unPendReaction(a: Actor) { + // execute registered termination handler (if any) + termHandlers.get(a) match { + case Some(handler) => handler() + case None => // do nothing + } + + // notify scheduler + sched.unPendReaction + } def shutdown() = sched.shutdown() diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index e259e5b266..f3fa57478c 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -20,9 +20,10 @@ case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte]) case class SendTo(a: Actor, msg: Any) case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol) case class ReplyTo(a: Actor, msg: Any) +case object Terminate /** - * @version 0.9.8 + * @version 0.9.10 * @author Philipp Haller */ class NetKernel(service: Service) { @@ -140,6 +141,14 @@ class NetKernel(service: Service) { } } } + + def terminate() { + // tell all proxies to terminate + proxies.values foreach { p => p.send(Terminate, null) } + + // tell service to terminate + service.terminate() + } } class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { @@ -169,6 +178,10 @@ class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { case cmd@ReplyTo(a, msg) => Debug.info(this+": processing "+cmd) a.replyChannel ! msg + + case cmd@Terminate => + Debug.info(this+": processing "+cmd) + exit() } } } diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index 99865ec5a5..6126ecbc86 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -38,7 +38,7 @@ package scala.actors.remote * } * </pre> * - * @version 0.9.9 + * @version 0.9.10 * @author Philipp Haller */ object RemoteActor { @@ -54,14 +54,14 @@ object RemoteActor { val kern = serv.kernel val s = Actor.self kernels += s -> kern - Debug.info("registering kill handler") - s.kill = () => { + + Scheduler.onTerminate(s) { Debug.info("alive actor "+s+" terminated") kernels -= s if (kernels.isEmpty) { - Debug.info("interrupting "+serv) - // terminate TcpService - serv.interrupt() + Debug.info("terminating "+kern) + // terminate NetKernel + kern.terminate() } } } diff --git a/src/actors/scala/actors/remote/Service.scala b/src/actors/scala/actors/remote/Service.scala index 512d4fd1dd..0220b9c25a 100644 --- a/src/actors/scala/actors/remote/Service.scala +++ b/src/actors/scala/actors/remote/Service.scala @@ -11,7 +11,7 @@ package scala.actors.remote /** - * @version 0.9.8 + * @version 0.9.10 * @author Philipp Haller */ trait Service { @@ -19,4 +19,5 @@ trait Service { val serializer: Serializer def node: Node def send(node: Node, data: Array[Byte]): Unit + def terminate(): Unit } diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index d5a99ae7b5..c678a0c43c 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -61,7 +61,7 @@ object TcpService { /* Class TcpService. * - * @version 0.9.8 + * @version 0.9.10 * @author Philipp Haller */ class TcpService(port: Int) extends Thread with Service { @@ -119,16 +119,26 @@ class TcpService(port: Int) extends Thread with Service { } } + def terminate() { + shouldTerminate = true + new Socket(internalNode.address, internalNode.port) + } + + private var shouldTerminate = false + override def run() { try { val socket = new ServerSocket(port) - while (true) { + while (!shouldTerminate) { Debug.info(this+": waiting for new connection...") val nextClient = socket.accept() - val worker = new TcpServiceWorker(this, nextClient) - Debug.info("Started new "+worker) - worker.readNode - worker.start() + if (!shouldTerminate) { + val worker = new TcpServiceWorker(this, nextClient) + Debug.info("Started new "+worker) + worker.readNode + worker.start() + } else + nextClient.close() } } catch { case ioe: IOException => @@ -139,6 +149,10 @@ class TcpService(port: Int) extends Thread with Service { Debug.info(this+": caught "+e) } finally { Debug.info(this+": shutting down...") + + var workers: List[TcpServiceWorker] = List() + connections.values foreach { w => workers = w :: workers } + workers foreach { w => w.halt } } } @@ -250,6 +264,6 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { Debug.info(this+": caught "+e) parent nodeDown connectedNode } - Debug.info(this+": shutting down...") + Debug.info(this+": terminated") } } |