From 85db410e24a8c64195911b6f2824258321a63442 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 18 Jun 2008 19:58:32 +0000 Subject: Cleaned up handling of reply destinations for r... Cleaned up handling of reply destinations for remote actors. Remote actor proxies now support all message send operations of the Actor trait in a general way. --- src/actors/scala/actors/remote/NetKernel.scala | 183 +++++++++++------------ src/actors/scala/actors/remote/RemoteActor.scala | 1 + src/actors/scala/actors/remote/TcpService.scala | 7 +- 3 files changed, 91 insertions(+), 100 deletions(-) (limited to 'src') diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index c2d3723033..7cb776257e 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -13,15 +13,13 @@ package scala.actors.remote import scala.collection.mutable.{HashMap, HashSet} import scala.actors.Actor.loop -case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) -case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) -case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte]) +case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol) -case class SendTo(a: Actor, msg: Any) -case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol) -case class ReplyTo(a: Actor, msg: Any) +case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol) case object Terminate +case class Locator(node: Node, name: Symbol) + /** * @version 0.9.10 * @author Philipp Haller @@ -33,52 +31,46 @@ class NetKernel(service: Service) { service.send(node, bytes) } - def namedSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { - val bytes = service.serializer.serialize(msg) - sendToNode(node, NamedSend(sender, receiver, bytes)) - } - - def namedSyncSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { - val bytes = service.serializer.serialize(msg) - val toSend = SyncSend(sender, receiver, bytes) - sendToNode(node, toSend) - } - - def sendReply(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { + def namedSend(senderLoc: Locator, receiverLoc: Locator, + msg: AnyRef, session: Symbol) { val bytes = service.serializer.serialize(msg) - val toSend = Reply(sender, receiver, bytes) - sendToNode(node, toSend) + sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session)) } - private val actors = new HashMap[Symbol, Actor] - private val names = new HashMap[Actor, Symbol] + private val actors = new HashMap[Symbol, OutputChannel[Any]] + private val names = new HashMap[OutputChannel[Any], Symbol] - def register(name: Symbol, a: Actor): Unit = synchronized { + def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized { actors += Pair(name, a) names += Pair(a, name) } - def selfName = names.get(Actor.self) match { + def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match { case None => val freshName = FreshNameCreator.newName("remotesender") - register(freshName, Actor.self) + register(freshName, from) freshName case Some(name) => name } - def send(node: Node, name: Symbol, msg: AnyRef) { - val senderName = selfName - namedSend(node, senderName, name, msg) + def send(node: Node, name: Symbol, msg: AnyRef): Unit = + send(node, name, msg, 'nosession) + + def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) { + val senderLoc = Locator(service.node, getOrCreateName(Actor.self)) + val receiverLoc = Locator(node, name) + namedSend(senderLoc, receiverLoc, msg, session) } - def syncSend(node: Node, name: Symbol, msg: AnyRef) { - val senderName = selfName - namedSyncSend(node, senderName, name, msg) + def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) { + val senderLoc = Locator(service.node, getOrCreateName(from)) + val receiverLoc = Locator(node, name) + namedSend(senderLoc, receiverLoc, msg, session) } def createProxy(node: Node, sym: Symbol): Actor = { - val p = new Proxy(node, sym, this) + val p = Proxy(node, sym, this) proxies += Pair((node, sym), p) p } @@ -94,18 +86,18 @@ class NetKernel(service: Service) { def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { - case cmd@NamedSend(senderName, receiver, data) => + case cmd@NamedSend(senderLoc, receiverLoc, data, session) => Debug.info(this+": processing "+cmd) - actors.get(receiver) match { + actors.get(receiverLoc.name) match { case Some(a) => try { Debug.info(this+": receiver is "+a) val msg = service.serializer.deserialize(data) Debug.info(this+": deserialized msg is "+msg) - val senderProxy = getOrCreateProxy(senderNode, senderName) + val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) Debug.info(this+": created "+senderProxy) - senderProxy.send(SendTo(a, msg), null) + senderProxy.send(SendTo(a, msg, session), null) } catch { case e: Exception => Debug.error(this+": caught "+e) @@ -115,30 +107,6 @@ class NetKernel(service: Service) { // message is lost Debug.info(this+": lost message") } - case cmd@SyncSend(senderName, receiver, data) => - Debug.info(this+": processing "+cmd) - actors.get(receiver) match { - case Some(a) => - val msg = service.serializer.deserialize(data) - - val senderProxy = getOrCreateProxy(senderNode, senderName) - senderProxy.send(SyncSendTo(a, msg, receiver), null) - - case None => - // message is lost - } - case cmd@Reply(senderName, receiver, data) => - Debug.info(this+": processing "+cmd) - actors.get(receiver) match { - case Some(a) => - val msg = service.serializer.deserialize(data) - - val senderProxy = getOrCreateProxy(senderNode, senderName) - senderProxy.send(ReplyTo(a, msg), null) - - case None => - // message is lost - } } } @@ -151,59 +119,76 @@ class NetKernel(service: Service) { } } -class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { +case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { start() - override def act() { + @transient + private var channelMap = new HashMap[Symbol, OutputChannel[Any]] + + @transient + private var sessionMap = new HashMap[Channel[Any], Symbol] + + def act() { Debug.info(this+": waiting to process commands") loop { react { - case cmd@SendTo(a, msg) => + // Request from remote proxy. + // `this` is local proxy. + case cmd@SendTo(out, msg, session) => Debug.info(this+": processing "+cmd) - a ! msg - case cmd@SyncSendTo(a, msg, receiver) => - Debug.info(this+": processing "+cmd) - val replyCh = new Channel[Any](this) - a.send(msg, replyCh) - val res = replyCh.receive { - case x => x - } - - res match { - case refmsg: AnyRef => - kernel.sendReply(node, receiver, name, refmsg) + // is this an active session? + channelMap.get(session) match { + case None => + // create a new reply channel... + val replyCh = new Channel[Any](this) + + // ...that maps to session + sessionMap += Pair(replyCh, session) + + // local send to actor + val a = out.asInstanceOf[Actor] + a.send(msg, replyCh) + + case Some(replyCh) => + replyCh ! msg + // TODO: + // remove `replyCh` from mapping + // to avoid memory leak (always safe?) + // or: use WeakHashMap + // however, it's the value (channel) + // that should be weak! } - case cmd@ReplyTo(a, msg) => - Debug.info(this+": processing "+cmd) - a.replyChannel ! msg - case cmd@Terminate => Debug.info(this+": processing "+cmd) exit() - } - } - } - override def !(msg: Any): Unit = msg match { - case ch ! m => - // do not send remotely - this.send(msg, Actor.self.replyChannel) - case a: AnyRef => - kernel.send(node, name, a) - case other => - error("Cannot send non-AnyRef value remotely.") - } + // local proxy receives response to + // reply channel + case ch ! resp => + // lookup session ID + sessionMap.get(ch) match { + case Some(sid) => + val msg = resp.asInstanceOf[AnyRef] + // send back response + kernel.forward(sender, node, name, msg, sid) + + case None => + Debug.info(this+": cannot find session for "+ch) + } - override def !?(msg: Any): Any = msg match { - case a: AnyRef => - val replyCh = Actor.self.freshReplyChannel - kernel.syncSend(node, name, a) - replyCh.receive { - case x => x + // remote proxy receives request + case msg: AnyRef => + // create fresh session ID... + val sid = FreshNameCreator.newName(node+"@"+name) + + // ...that maps to reply channel + channelMap += Pair(sid, sender) + + kernel.forward(sender, node, name, msg, sid) } - case other => - error("Cannot send non-AnyRef value remotely.") + } } + } diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index 62d983a25d..06b460d5e6 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -69,6 +69,7 @@ object RemoteActor { def createKernelOnPort(port: Int): NetKernel = { val serv = TcpService(port, cl) + Debug.info("created service at "+serv.node) val kern = serv.kernel val s = Actor.self kernels += Pair(s, kern) diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index 9377bf1f71..1f1dec4369 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -122,7 +122,12 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { def terminate() { shouldTerminate = true - new Socket(internalNode.address, internalNode.port) + try { + new Socket(internalNode.address, internalNode.port) + } catch { + case ce: java.net.ConnectException => + Debug.info(this+": caught "+ce) + } } private var shouldTerminate = false -- cgit v1.2.3