diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-06-12 10:51:46 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-06-12 10:51:46 +0000 |
commit | a31e3c23a1d00b87b7e205740e956fad3e988eee (patch) | |
tree | 4223dab0dce3b2ede48a37650de8ca73c525134c /src/actors | |
parent | 835fab52247cae811c2ef547f79dc121acc21e66 (diff) | |
download | scala-a31e3c23a1d00b87b7e205740e956fad3e988eee.tar.gz scala-a31e3c23a1d00b87b7e205740e956fad3e988eee.tar.bz2 scala-a31e3c23a1d00b87b7e205740e956fad3e988eee.zip |
Added buffering of msgs in case remote net kern...
Added buffering of msgs in case remote net kernel is not yet up.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/TcpService.scala | 38 |
2 files changed, 38 insertions, 4 deletions
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index b30e363b46..19d85df491 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -48,7 +48,7 @@ object RemoteActor { * Makes <code>self</code> remotely accessible on TCP port * <code>port</code>. */ - def alive(port: int) { + def alive(port: int): unit = synchronized { val serv = new TcpService(port) serv.start() kernels += Actor.self -> serv.kernel @@ -58,7 +58,7 @@ object RemoteActor { * Registers <code>a</code> under <code>name</code> on this * node. */ - def register(name: Symbol, a: Actor) { + def register(name: Symbol, a: Actor): unit = synchronized { val kernel = kernels.get(Actor.self) match { case None => val serv = new TcpService(TcpService.generatePort) diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index a3721bc40d..ba1ee9202a 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -21,6 +21,8 @@ import java.net.{InetAddress, ServerSocket, Socket, UnknownHostException} import compat.Platform +import scala.collection.mutable.HashMap + object TcpService { val random = new java.util.Random(Platform.currentTime) @@ -50,13 +52,45 @@ class TcpService(port: Int) extends Thread with Service { private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port) def node: Node = internalNode + private val pendingSends = new HashMap[Node, List[Array[byte]]] + def send(node: Node, data: Array[byte]): unit = synchronized { + + def bufferMsg(t: Throwable) = { + // buffer message, so that it can be re-sent + // when remote net kernel comes up + pendingSends.get(node) match { + case None => + pendingSends += node -> (data :: Nil) + case Some(msgs) => + pendingSends += node -> (data :: msgs) + } + } + // retrieve worker thread (if any) that already has connection getConnection(node) match { case None => // we are not connected, yet - val newWorker = connect(node) - newWorker transmit data + try { + val newWorker = connect(node) + newWorker transmit data + + // any pending sends? + pendingSends.get(node) match { + case None => + // do nothing + case Some(msgs) => + msgs foreach {newWorker transmit _} + pendingSends -= node + } + } catch { + case uhe: UnknownHostException => + bufferMsg(uhe) + case ioe: IOException => + bufferMsg(ioe) + case se: SecurityException => + // do nothing + } case Some(worker) => worker transmit data } |