diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-08-06 13:26:33 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-08-06 13:26:33 +0000 |
commit | 53f715896d18988615e30f643cbea9024e74e4e8 (patch) | |
tree | 18b70258aa19be1a8381aeea2f08f792ae70269f /src/actors | |
parent | de47e4d7a9735614f57ab3de9ed239584745d0b1 (diff) | |
download | scala-53f715896d18988615e30f643cbea9024e74e4e8.tar.gz scala-53f715896d18988615e30f643cbea9024e74e4e8.tar.bz2 scala-53f715896d18988615e30f643cbea9024e74e4e8.zip |
Remote actor proxies are now cached.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 109 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 20 |
2 files changed, 82 insertions, 47 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index 376c79e883..8c28fdb2cc 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -11,11 +11,16 @@ package scala.actors.remote import scala.collection.mutable.{HashMap, HashSet} +import scala.actors.Actor.loop case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte]) +case class SendTo(a: Actor, msg: Any) +case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol) +case class ReplyTo(a: Actor, msg: Any) + /** * @version 0.9.8 * @author Philipp Haller @@ -71,23 +76,31 @@ class NetKernel(service: Service) { namedSyncSend(node, senderName, name, msg) } + def createProxy(node: Node, sym: Symbol): Actor = { + val p = new Proxy(node, sym, this) + proxies += Pair(node, sym) -> p + p + } + + val proxies = new HashMap[(Node, Symbol), Actor] + + def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized { + proxies.get((senderNode, senderName)) match { + case Some(senderProxy) => senderProxy + case None => createProxy(senderNode, senderName) + } + } + def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { case NamedSend(senderName, receiver, data) => actors.get(receiver) match { case Some(a) => val msg = service.serializer.deserialize(data) - val senderProxy = new Actor { - def act() = { a ! msg } - override def !(msg: Any) { - msg match { - case refmsg: AnyRef => - // node, senderName, receiver, msg - namedSend(senderNode, receiver, senderName, refmsg) - } - } - } - senderProxy.start(); {} + + val senderProxy = getOrCreateProxy(senderNode, senderName) + senderProxy.send(SendTo(a, msg), null) + case None => // message is lost } @@ -95,17 +108,10 @@ class NetKernel(service: Service) { actors.get(receiver) match { case Some(a) => val msg = service.serializer.deserialize(data) - val senderProxy = new Actor { - def act() = { - val res = a !? msg - res match { - case refmsg: AnyRef => - // node, senderName, receiver, msg - sendReply(senderNode, receiver, senderName, refmsg) - } - } - } - senderProxy.start(); {} + + val senderProxy = getOrCreateProxy(senderNode, senderName) + senderProxy.send(SyncSendTo(a, msg, receiver), null) + case None => // message is lost } @@ -113,15 +119,62 @@ class NetKernel(service: Service) { actors.get(receiver) match { case Some(a) => val msg = service.serializer.deserialize(data) - val senderProxy = new Actor { - def act() = { - a.replyChannel ! msg - } - } - senderProxy.start(); {} + + val senderProxy = getOrCreateProxy(senderNode, senderName) + senderProxy.send(ReplyTo(a, msg), null) + case None => // message is lost } } } } + +class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { + start() + + override def act() { + loop { + react { + case SendTo(a, msg) => + a ! msg + + case SyncSendTo(a, msg, receiver) => + val replyCh = new Channel[Any](this) + a.send(msg, replyCh) + val res = replyCh.receive { + case x => x + } + + res match { + case refmsg: AnyRef => + kernel.sendReply(node, receiver, name, refmsg) + } + + case ReplyTo(a, msg) => + a.replyChannel ! msg + } + } + } + + override def !(msg: Any): Unit = msg match { + case ch ! m => + // do not send remotely + this.send(msg, Actor.self.replyChannel) + case a: AnyRef => + kernel.send(node, name, a) + case other => + error("Cannot send non-AnyRef value remotely.") + } + + override def !?(msg: Any): Any = msg match { + case a: AnyRef => + val replyCh = Actor.self.freshReplyChannel + kernel.syncSend(node, name, a) + replyCh.receive { + case x => x + } + case other => + error("Cannot send non-AnyRef value remotely.") + } +} diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index 4a69da6911..0edd154c49 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -89,25 +89,7 @@ object RemoteActor { * <code>name</code> on <code>node</code>. */ def select(node: Node, sym: Symbol): Actor = - new Actor { - def act() {} - override def !(msg: Any): Unit = msg match { - case a: AnyRef => - selfKernel.send(node, sym, a) - case other => - error("Cannot send non-AnyRef value remotely.") - } - override def !?(msg: Any): Any = msg match { - case a: AnyRef => - val replyCh = Actor.self.freshReplyChannel - selfKernel.syncSend(node, sym, a) - replyCh.receive { - case x => x - } - case other => - error("Cannot send non-AnyRef value remotely.") - } - } + selfKernel.getOrCreateProxy(node, sym) } |