summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-08-06 13:26:33 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-08-06 13:26:33 +0000
commit53f715896d18988615e30f643cbea9024e74e4e8 (patch)
tree18b70258aa19be1a8381aeea2f08f792ae70269f
parentde47e4d7a9735614f57ab3de9ed239584745d0b1 (diff)
downloadscala-53f715896d18988615e30f643cbea9024e74e4e8.tar.gz
scala-53f715896d18988615e30f643cbea9024e74e4e8.tar.bz2
scala-53f715896d18988615e30f643cbea9024e74e4e8.zip
Remote actor proxies are now cached.
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala109
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala20
2 files changed, 82 insertions, 47 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 376c79e883..8c28fdb2cc 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -11,11 +11,16 @@
package scala.actors.remote
import scala.collection.mutable.{HashMap, HashSet}
+import scala.actors.Actor.loop
case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+case class SendTo(a: Actor, msg: Any)
+case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol)
+case class ReplyTo(a: Actor, msg: Any)
+
/**
* @version 0.9.8
* @author Philipp Haller
@@ -71,23 +76,31 @@ class NetKernel(service: Service) {
namedSyncSend(node, senderName, name, msg)
}
+ def createProxy(node: Node, sym: Symbol): Actor = {
+ val p = new Proxy(node, sym, this)
+ proxies += Pair(node, sym) -> p
+ p
+ }
+
+ val proxies = new HashMap[(Node, Symbol), Actor]
+
+ def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized {
+ proxies.get((senderNode, senderName)) match {
+ case Some(senderProxy) => senderProxy
+ case None => createProxy(senderNode, senderName)
+ }
+ }
+
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
case NamedSend(senderName, receiver, data) =>
actors.get(receiver) match {
case Some(a) =>
val msg = service.serializer.deserialize(data)
- val senderProxy = new Actor {
- def act() = { a ! msg }
- override def !(msg: Any) {
- msg match {
- case refmsg: AnyRef =>
- // node, senderName, receiver, msg
- namedSend(senderNode, receiver, senderName, refmsg)
- }
- }
- }
- senderProxy.start(); {}
+
+ val senderProxy = getOrCreateProxy(senderNode, senderName)
+ senderProxy.send(SendTo(a, msg), null)
+
case None =>
// message is lost
}
@@ -95,17 +108,10 @@ class NetKernel(service: Service) {
actors.get(receiver) match {
case Some(a) =>
val msg = service.serializer.deserialize(data)
- val senderProxy = new Actor {
- def act() = {
- val res = a !? msg
- res match {
- case refmsg: AnyRef =>
- // node, senderName, receiver, msg
- sendReply(senderNode, receiver, senderName, refmsg)
- }
- }
- }
- senderProxy.start(); {}
+
+ val senderProxy = getOrCreateProxy(senderNode, senderName)
+ senderProxy.send(SyncSendTo(a, msg, receiver), null)
+
case None =>
// message is lost
}
@@ -113,15 +119,62 @@ class NetKernel(service: Service) {
actors.get(receiver) match {
case Some(a) =>
val msg = service.serializer.deserialize(data)
- val senderProxy = new Actor {
- def act() = {
- a.replyChannel ! msg
- }
- }
- senderProxy.start(); {}
+
+ val senderProxy = getOrCreateProxy(senderNode, senderName)
+ senderProxy.send(ReplyTo(a, msg), null)
+
case None =>
// message is lost
}
}
}
}
+
+class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
+ start()
+
+ override def act() {
+ loop {
+ react {
+ case SendTo(a, msg) =>
+ a ! msg
+
+ case SyncSendTo(a, msg, receiver) =>
+ val replyCh = new Channel[Any](this)
+ a.send(msg, replyCh)
+ val res = replyCh.receive {
+ case x => x
+ }
+
+ res match {
+ case refmsg: AnyRef =>
+ kernel.sendReply(node, receiver, name, refmsg)
+ }
+
+ case ReplyTo(a, msg) =>
+ a.replyChannel ! msg
+ }
+ }
+ }
+
+ override def !(msg: Any): Unit = msg match {
+ case ch ! m =>
+ // do not send remotely
+ this.send(msg, Actor.self.replyChannel)
+ case a: AnyRef =>
+ kernel.send(node, name, a)
+ case other =>
+ error("Cannot send non-AnyRef value remotely.")
+ }
+
+ override def !?(msg: Any): Any = msg match {
+ case a: AnyRef =>
+ val replyCh = Actor.self.freshReplyChannel
+ kernel.syncSend(node, name, a)
+ replyCh.receive {
+ case x => x
+ }
+ case other =>
+ error("Cannot send non-AnyRef value remotely.")
+ }
+}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 4a69da6911..0edd154c49 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -89,25 +89,7 @@ object RemoteActor {
* <code>name</code> on <code>node</code>.
*/
def select(node: Node, sym: Symbol): Actor =
- new Actor {
- def act() {}
- override def !(msg: Any): Unit = msg match {
- case a: AnyRef =>
- selfKernel.send(node, sym, a)
- case other =>
- error("Cannot send non-AnyRef value remotely.")
- }
- override def !?(msg: Any): Any = msg match {
- case a: AnyRef =>
- val replyCh = Actor.self.freshReplyChannel
- selfKernel.syncSend(node, sym, a)
- replyCh.receive {
- case x => x
- }
- case other =>
- error("Cannot send non-AnyRef value remotely.")
- }
- }
+ selfKernel.getOrCreateProxy(node, sym)
}