summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-11-26 14:16:59 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-11-26 14:16:59 +0000
commite4d31aed1f5a15cd3da7e6ae4093160142625b77 (patch)
tree473bd4520692a4e47b9c70112654701c20b85046
parent83ac2b566929edb17629d20bf4dcba3b8d2bd14e (diff)
downloadscala-e4d31aed1f5a15cd3da7e6ae4093160142625b77.tar.gz
scala-e4d31aed1f5a15cd3da7e6ae4093160142625b77.tar.bz2
scala-e4d31aed1f5a15cd3da7e6ae4093160142625b77.zip
Fixed ticket #219.
-rw-r--r--src/actors/scala/actors/Reaction.scala5
-rw-r--r--src/actors/scala/actors/Scheduler.scala19
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala15
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala12
-rw-r--r--src/actors/scala/actors/remote/Service.scala3
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala28
6 files changed, 63 insertions, 19 deletions
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 9f46e9c0ce..1e8321e351 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -56,7 +56,8 @@ private[actors] class ExitActorException extends Throwable
}
catch {
case eae: ExitActorException => {
- Scheduler.unPendReaction
+ //Debug.info(a+": exiting...")
+ Scheduler.unPendReaction(a)
}
case _: SuspendActorException => {
// do nothing (continuation is already saved)
@@ -64,7 +65,7 @@ private[actors] class ExitActorException extends Throwable
case t: Throwable => {
Debug.info(a+": caught "+t)
t.printStackTrace()
- Scheduler.unPendReaction
+ Scheduler.unPendReaction(a)
// links
a.synchronized {
if (!a.links.isEmpty)
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index e92e76a803..ddf6df164b 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* The <code>Scheduler</code> object is used by
* <code>Actor</code> to execute tasks of an execution of an actor.
*
- * @version 0.9.8
+ * @version 0.9.10
* @author Philipp Haller
*/
object Scheduler {
@@ -79,7 +79,22 @@ object Scheduler {
def tick(a: Actor) = sched.tick(a)
def terminated(a: Actor) = sched.terminated(a)
def pendReaction: Unit = sched.pendReaction
- def unPendReaction: Unit = sched.unPendReaction
+
+ private val termHandlers = new HashMap[Actor, () => Unit]
+ def onTerminate(a: Actor)(f: => Unit) {
+ termHandlers += (a -> (() => f))
+ }
+
+ def unPendReaction(a: Actor) {
+ // execute registered termination handler (if any)
+ termHandlers.get(a) match {
+ case Some(handler) => handler()
+ case None => // do nothing
+ }
+
+ // notify scheduler
+ sched.unPendReaction
+ }
def shutdown() = sched.shutdown()
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index e259e5b266..f3fa57478c 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -20,9 +20,10 @@ case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte])
case class SendTo(a: Actor, msg: Any)
case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol)
case class ReplyTo(a: Actor, msg: Any)
+case object Terminate
/**
- * @version 0.9.8
+ * @version 0.9.10
* @author Philipp Haller
*/
class NetKernel(service: Service) {
@@ -140,6 +141,14 @@ class NetKernel(service: Service) {
}
}
}
+
+ def terminate() {
+ // tell all proxies to terminate
+ proxies.values foreach { p => p.send(Terminate, null) }
+
+ // tell service to terminate
+ service.terminate()
+ }
}
class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
@@ -169,6 +178,10 @@ class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
case cmd@ReplyTo(a, msg) =>
Debug.info(this+": processing "+cmd)
a.replyChannel ! msg
+
+ case cmd@Terminate =>
+ Debug.info(this+": processing "+cmd)
+ exit()
}
}
}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 99865ec5a5..6126ecbc86 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -38,7 +38,7 @@ package scala.actors.remote
* }
* </pre>
*
- * @version 0.9.9
+ * @version 0.9.10
* @author Philipp Haller
*/
object RemoteActor {
@@ -54,14 +54,14 @@ object RemoteActor {
val kern = serv.kernel
val s = Actor.self
kernels += s -> kern
- Debug.info("registering kill handler")
- s.kill = () => {
+
+ Scheduler.onTerminate(s) {
Debug.info("alive actor "+s+" terminated")
kernels -= s
if (kernels.isEmpty) {
- Debug.info("interrupting "+serv)
- // terminate TcpService
- serv.interrupt()
+ Debug.info("terminating "+kern)
+ // terminate NetKernel
+ kern.terminate()
}
}
}
diff --git a/src/actors/scala/actors/remote/Service.scala b/src/actors/scala/actors/remote/Service.scala
index 512d4fd1dd..0220b9c25a 100644
--- a/src/actors/scala/actors/remote/Service.scala
+++ b/src/actors/scala/actors/remote/Service.scala
@@ -11,7 +11,7 @@
package scala.actors.remote
/**
- * @version 0.9.8
+ * @version 0.9.10
* @author Philipp Haller
*/
trait Service {
@@ -19,4 +19,5 @@ trait Service {
val serializer: Serializer
def node: Node
def send(node: Node, data: Array[Byte]): Unit
+ def terminate(): Unit
}
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index d5a99ae7b5..c678a0c43c 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -61,7 +61,7 @@ object TcpService {
/* Class TcpService.
*
- * @version 0.9.8
+ * @version 0.9.10
* @author Philipp Haller
*/
class TcpService(port: Int) extends Thread with Service {
@@ -119,16 +119,26 @@ class TcpService(port: Int) extends Thread with Service {
}
}
+ def terminate() {
+ shouldTerminate = true
+ new Socket(internalNode.address, internalNode.port)
+ }
+
+ private var shouldTerminate = false
+
override def run() {
try {
val socket = new ServerSocket(port)
- while (true) {
+ while (!shouldTerminate) {
Debug.info(this+": waiting for new connection...")
val nextClient = socket.accept()
- val worker = new TcpServiceWorker(this, nextClient)
- Debug.info("Started new "+worker)
- worker.readNode
- worker.start()
+ if (!shouldTerminate) {
+ val worker = new TcpServiceWorker(this, nextClient)
+ Debug.info("Started new "+worker)
+ worker.readNode
+ worker.start()
+ } else
+ nextClient.close()
}
} catch {
case ioe: IOException =>
@@ -139,6 +149,10 @@ class TcpService(port: Int) extends Thread with Service {
Debug.info(this+": caught "+e)
} finally {
Debug.info(this+": shutting down...")
+
+ var workers: List[TcpServiceWorker] = List()
+ connections.values foreach { w => workers = w :: workers }
+ workers foreach { w => w.halt }
}
}
@@ -250,6 +264,6 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
Debug.info(this+": caught "+e)
parent nodeDown connectedNode
}
- Debug.info(this+": shutting down...")
+ Debug.info(this+": terminated")
}
}