summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-06-18 19:58:32 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-06-18 19:58:32 +0000
commit85db410e24a8c64195911b6f2824258321a63442 (patch)
tree88d28422b40c5aa3a812408fa694330ecdfbdc11 /src/actors
parente2b7b044c533e295309dc8ba9fcdb6d8dbc52fd4 (diff)
downloadscala-85db410e24a8c64195911b6f2824258321a63442.tar.gz
scala-85db410e24a8c64195911b6f2824258321a63442.tar.bz2
scala-85db410e24a8c64195911b6f2824258321a63442.zip
Cleaned up handling of reply destinations for r...
Cleaned up handling of reply destinations for remote actors. Remote actor proxies now support all message send operations of the Actor trait in a general way.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala183
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala1
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala7
3 files changed, 91 insertions, 100 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index c2d3723033..7cb776257e 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -13,15 +13,13 @@ package scala.actors.remote
import scala.collection.mutable.{HashMap, HashSet}
import scala.actors.Actor.loop
-case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
-case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
-case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)
-case class SendTo(a: Actor, msg: Any)
-case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol)
-case class ReplyTo(a: Actor, msg: Any)
+case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
case object Terminate
+case class Locator(node: Node, name: Symbol)
+
/**
* @version 0.9.10
* @author Philipp Haller
@@ -33,52 +31,46 @@ class NetKernel(service: Service) {
service.send(node, bytes)
}
- def namedSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
- val bytes = service.serializer.serialize(msg)
- sendToNode(node, NamedSend(sender, receiver, bytes))
- }
-
- def namedSyncSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
- val bytes = service.serializer.serialize(msg)
- val toSend = SyncSend(sender, receiver, bytes)
- sendToNode(node, toSend)
- }
-
- def sendReply(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
+ def namedSend(senderLoc: Locator, receiverLoc: Locator,
+ msg: AnyRef, session: Symbol) {
val bytes = service.serializer.serialize(msg)
- val toSend = Reply(sender, receiver, bytes)
- sendToNode(node, toSend)
+ sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session))
}
- private val actors = new HashMap[Symbol, Actor]
- private val names = new HashMap[Actor, Symbol]
+ private val actors = new HashMap[Symbol, OutputChannel[Any]]
+ private val names = new HashMap[OutputChannel[Any], Symbol]
- def register(name: Symbol, a: Actor): Unit = synchronized {
+ def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized {
actors += Pair(name, a)
names += Pair(a, name)
}
- def selfName = names.get(Actor.self) match {
+ def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match {
case None =>
val freshName = FreshNameCreator.newName("remotesender")
- register(freshName, Actor.self)
+ register(freshName, from)
freshName
case Some(name) =>
name
}
- def send(node: Node, name: Symbol, msg: AnyRef) {
- val senderName = selfName
- namedSend(node, senderName, name, msg)
+ def send(node: Node, name: Symbol, msg: AnyRef): Unit =
+ send(node, name, msg, 'nosession)
+
+ def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
+ val senderLoc = Locator(service.node, getOrCreateName(Actor.self))
+ val receiverLoc = Locator(node, name)
+ namedSend(senderLoc, receiverLoc, msg, session)
}
- def syncSend(node: Node, name: Symbol, msg: AnyRef) {
- val senderName = selfName
- namedSyncSend(node, senderName, name, msg)
+ def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
+ val senderLoc = Locator(service.node, getOrCreateName(from))
+ val receiverLoc = Locator(node, name)
+ namedSend(senderLoc, receiverLoc, msg, session)
}
def createProxy(node: Node, sym: Symbol): Actor = {
- val p = new Proxy(node, sym, this)
+ val p = Proxy(node, sym, this)
proxies += Pair((node, sym), p)
p
}
@@ -94,18 +86,18 @@ class NetKernel(service: Service) {
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
- case cmd@NamedSend(senderName, receiver, data) =>
+ case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
Debug.info(this+": processing "+cmd)
- actors.get(receiver) match {
+ actors.get(receiverLoc.name) match {
case Some(a) =>
try {
Debug.info(this+": receiver is "+a)
val msg = service.serializer.deserialize(data)
Debug.info(this+": deserialized msg is "+msg)
- val senderProxy = getOrCreateProxy(senderNode, senderName)
+ val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
Debug.info(this+": created "+senderProxy)
- senderProxy.send(SendTo(a, msg), null)
+ senderProxy.send(SendTo(a, msg, session), null)
} catch {
case e: Exception =>
Debug.error(this+": caught "+e)
@@ -115,30 +107,6 @@ class NetKernel(service: Service) {
// message is lost
Debug.info(this+": lost message")
}
- case cmd@SyncSend(senderName, receiver, data) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiver) match {
- case Some(a) =>
- val msg = service.serializer.deserialize(data)
-
- val senderProxy = getOrCreateProxy(senderNode, senderName)
- senderProxy.send(SyncSendTo(a, msg, receiver), null)
-
- case None =>
- // message is lost
- }
- case cmd@Reply(senderName, receiver, data) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiver) match {
- case Some(a) =>
- val msg = service.serializer.deserialize(data)
-
- val senderProxy = getOrCreateProxy(senderNode, senderName)
- senderProxy.send(ReplyTo(a, msg), null)
-
- case None =>
- // message is lost
- }
}
}
@@ -151,59 +119,76 @@ class NetKernel(service: Service) {
}
}
-class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
+case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
start()
- override def act() {
+ @transient
+ private var channelMap = new HashMap[Symbol, OutputChannel[Any]]
+
+ @transient
+ private var sessionMap = new HashMap[Channel[Any], Symbol]
+
+ def act() {
Debug.info(this+": waiting to process commands")
loop {
react {
- case cmd@SendTo(a, msg) =>
+ // Request from remote proxy.
+ // `this` is local proxy.
+ case cmd@SendTo(out, msg, session) =>
Debug.info(this+": processing "+cmd)
- a ! msg
- case cmd@SyncSendTo(a, msg, receiver) =>
- Debug.info(this+": processing "+cmd)
- val replyCh = new Channel[Any](this)
- a.send(msg, replyCh)
- val res = replyCh.receive {
- case x => x
- }
-
- res match {
- case refmsg: AnyRef =>
- kernel.sendReply(node, receiver, name, refmsg)
+ // is this an active session?
+ channelMap.get(session) match {
+ case None =>
+ // create a new reply channel...
+ val replyCh = new Channel[Any](this)
+
+ // ...that maps to session
+ sessionMap += Pair(replyCh, session)
+
+ // local send to actor
+ val a = out.asInstanceOf[Actor]
+ a.send(msg, replyCh)
+
+ case Some(replyCh) =>
+ replyCh ! msg
+ // TODO:
+ // remove `replyCh` from mapping
+ // to avoid memory leak (always safe?)
+ // or: use WeakHashMap
+ // however, it's the value (channel)
+ // that should be weak!
}
- case cmd@ReplyTo(a, msg) =>
- Debug.info(this+": processing "+cmd)
- a.replyChannel ! msg
-
case cmd@Terminate =>
Debug.info(this+": processing "+cmd)
exit()
- }
- }
- }
- override def !(msg: Any): Unit = msg match {
- case ch ! m =>
- // do not send remotely
- this.send(msg, Actor.self.replyChannel)
- case a: AnyRef =>
- kernel.send(node, name, a)
- case other =>
- error("Cannot send non-AnyRef value remotely.")
- }
+ // local proxy receives response to
+ // reply channel
+ case ch ! resp =>
+ // lookup session ID
+ sessionMap.get(ch) match {
+ case Some(sid) =>
+ val msg = resp.asInstanceOf[AnyRef]
+ // send back response
+ kernel.forward(sender, node, name, msg, sid)
+
+ case None =>
+ Debug.info(this+": cannot find session for "+ch)
+ }
- override def !?(msg: Any): Any = msg match {
- case a: AnyRef =>
- val replyCh = Actor.self.freshReplyChannel
- kernel.syncSend(node, name, a)
- replyCh.receive {
- case x => x
+ // remote proxy receives request
+ case msg: AnyRef =>
+ // create fresh session ID...
+ val sid = FreshNameCreator.newName(node+"@"+name)
+
+ // ...that maps to reply channel
+ channelMap += Pair(sid, sender)
+
+ kernel.forward(sender, node, name, msg, sid)
}
- case other =>
- error("Cannot send non-AnyRef value remotely.")
+ }
}
+
}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 62d983a25d..06b460d5e6 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -69,6 +69,7 @@ object RemoteActor {
def createKernelOnPort(port: Int): NetKernel = {
val serv = TcpService(port, cl)
+ Debug.info("created service at "+serv.node)
val kern = serv.kernel
val s = Actor.self
kernels += Pair(s, kern)
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index 9377bf1f71..1f1dec4369 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -122,7 +122,12 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service {
def terminate() {
shouldTerminate = true
- new Socket(internalNode.address, internalNode.port)
+ try {
+ new Socket(internalNode.address, internalNode.port)
+ } catch {
+ case ce: java.net.ConnectException =>
+ Debug.info(this+": caught "+ce)
+ }
}
private var shouldTerminate = false