From a31e3c23a1d00b87b7e205740e956fad3e988eee Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 12 Jun 2007 10:51:46 +0000 Subject: Added buffering of msgs in case remote net kern... Added buffering of msgs in case remote net kernel is not yet up. --- src/actors/scala/actors/remote/RemoteActor.scala | 4 +-- src/actors/scala/actors/remote/TcpService.scala | 38 ++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index b30e363b46..19d85df491 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -48,7 +48,7 @@ object RemoteActor { * Makes self remotely accessible on TCP port * port. */ - def alive(port: int) { + def alive(port: int): unit = synchronized { val serv = new TcpService(port) serv.start() kernels += Actor.self -> serv.kernel @@ -58,7 +58,7 @@ object RemoteActor { * Registers a under name on this * node. */ - def register(name: Symbol, a: Actor) { + def register(name: Symbol, a: Actor): unit = synchronized { val kernel = kernels.get(Actor.self) match { case None => val serv = new TcpService(TcpService.generatePort) diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index a3721bc40d..ba1ee9202a 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -21,6 +21,8 @@ import java.net.{InetAddress, ServerSocket, Socket, UnknownHostException} import compat.Platform +import scala.collection.mutable.HashMap + object TcpService { val random = new java.util.Random(Platform.currentTime) @@ -50,13 +52,45 @@ class TcpService(port: Int) extends Thread with Service { private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port) def node: Node = internalNode + private val pendingSends = new HashMap[Node, List[Array[byte]]] + def send(node: Node, data: Array[byte]): unit = synchronized { + + def bufferMsg(t: Throwable) = { + // buffer message, so that it can be re-sent + // when remote net kernel comes up + pendingSends.get(node) match { + case None => + pendingSends += node -> (data :: Nil) + case Some(msgs) => + pendingSends += node -> (data :: msgs) + } + } + // retrieve worker thread (if any) that already has connection getConnection(node) match { case None => // we are not connected, yet - val newWorker = connect(node) - newWorker transmit data + try { + val newWorker = connect(node) + newWorker transmit data + + // any pending sends? + pendingSends.get(node) match { + case None => + // do nothing + case Some(msgs) => + msgs foreach {newWorker transmit _} + pendingSends -= node + } + } catch { + case uhe: UnknownHostException => + bufferMsg(uhe) + case ioe: IOException => + bufferMsg(ioe) + case se: SecurityException => + // do nothing + } case Some(worker) => worker transmit data } -- cgit v1.2.3