summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-06-12 10:51:46 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-06-12 10:51:46 +0000
commita31e3c23a1d00b87b7e205740e956fad3e988eee (patch)
tree4223dab0dce3b2ede48a37650de8ca73c525134c /src/actors
parent835fab52247cae811c2ef547f79dc121acc21e66 (diff)
downloadscala-a31e3c23a1d00b87b7e205740e956fad3e988eee.tar.gz
scala-a31e3c23a1d00b87b7e205740e956fad3e988eee.tar.bz2
scala-a31e3c23a1d00b87b7e205740e956fad3e988eee.zip
Added buffering of msgs in case remote net kern...
Added buffering of msgs in case remote net kernel is not yet up.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala4
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala38
2 files changed, 38 insertions, 4 deletions
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index b30e363b46..19d85df491 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -48,7 +48,7 @@ object RemoteActor {
* Makes <code>self</code> remotely accessible on TCP port
* <code>port</code>.
*/
- def alive(port: int) {
+ def alive(port: int): unit = synchronized {
val serv = new TcpService(port)
serv.start()
kernels += Actor.self -> serv.kernel
@@ -58,7 +58,7 @@ object RemoteActor {
* Registers <code>a</code> under <code>name</code> on this
* node.
*/
- def register(name: Symbol, a: Actor) {
+ def register(name: Symbol, a: Actor): unit = synchronized {
val kernel = kernels.get(Actor.self) match {
case None =>
val serv = new TcpService(TcpService.generatePort)
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index a3721bc40d..ba1ee9202a 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -21,6 +21,8 @@ import java.net.{InetAddress, ServerSocket, Socket, UnknownHostException}
import compat.Platform
+import scala.collection.mutable.HashMap
+
object TcpService {
val random = new java.util.Random(Platform.currentTime)
@@ -50,13 +52,45 @@ class TcpService(port: Int) extends Thread with Service {
private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port)
def node: Node = internalNode
+ private val pendingSends = new HashMap[Node, List[Array[byte]]]
+
def send(node: Node, data: Array[byte]): unit = synchronized {
+
+ def bufferMsg(t: Throwable) = {
+ // buffer message, so that it can be re-sent
+ // when remote net kernel comes up
+ pendingSends.get(node) match {
+ case None =>
+ pendingSends += node -> (data :: Nil)
+ case Some(msgs) =>
+ pendingSends += node -> (data :: msgs)
+ }
+ }
+
// retrieve worker thread (if any) that already has connection
getConnection(node) match {
case None =>
// we are not connected, yet
- val newWorker = connect(node)
- newWorker transmit data
+ try {
+ val newWorker = connect(node)
+ newWorker transmit data
+
+ // any pending sends?
+ pendingSends.get(node) match {
+ case None =>
+ // do nothing
+ case Some(msgs) =>
+ msgs foreach {newWorker transmit _}
+ pendingSends -= node
+ }
+ } catch {
+ case uhe: UnknownHostException =>
+ bufferMsg(uhe)
+ case ioe: IOException =>
+ bufferMsg(ioe)
+ case se: SecurityException =>
+ // do nothing
+ }
case Some(worker) =>
worker transmit data
}