From 3f256f905fdf2d35c38ce049920dcc5338501db9 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 2 Jul 2008 18:02:18 +0000 Subject: Fixed #1085. --- src/actors/scala/actors/Channel.scala | 13 ++- src/actors/scala/actors/OutputChannel.scala | 12 +- src/actors/scala/actors/remote/NetKernel.scala | 103 ++++------------- src/actors/scala/actors/remote/Proxy.scala | 140 +++++++++++++++++++++++ src/actors/scala/actors/remote/RemoteActor.scala | 7 +- 5 files changed, 188 insertions(+), 87 deletions(-) create mode 100644 src/actors/scala/actors/remote/Proxy.scala (limited to 'src') diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index ac195ef9bf..d094c961fd 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -35,7 +35,7 @@ case class ! [a](ch: Channel[a], msg: a) * actors. Only the actor creating an instance of a * Channel may receive from it. * - * @version 0.9.9 + * @version 0.9.17 * @author Philipp Haller */ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { @@ -61,6 +61,17 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { recv ! scala.actors.!(this, msg) } + /** + * Sends a message to this Channel + * (asynchronous) supplying explicit reply destination. + * + * @param msg the message to send + * @param replyTo the reply destination + */ + def send(msg: Msg, replyTo: OutputChannel[Any]) { + recv.send(scala.actors.!(this, msg), replyTo) + } + /** * Forwards msg to this keeping the * last sender as sender instead of self. diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala index 1cc24d6dd0..6c2fe8e756 100644 --- a/src/actors/scala/actors/OutputChannel.scala +++ b/src/actors/scala/actors/OutputChannel.scala @@ -14,7 +14,7 @@ package scala.actors * The OutputChannel trait provides a common interface * for all channels to which values can be sent. * - * @version 0.9.9 + * @version 0.9.17 * @author Philipp Haller */ trait OutputChannel[-Msg] { @@ -25,6 +25,16 @@ trait OutputChannel[-Msg] { */ def !(msg: Msg): Unit + /** + * Sends msg to this + * OutputChannel (asynchronous) supplying + * explicit reply destination. + * + * @param msg the message to send + * @param replyTo the reply destination + */ + def send(msg: Msg, replyTo: OutputChannel[Any]): Unit + /** * Forwards msg to this * OutputChannel (asynchronous). diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index 7cb776257e..0025b57de4 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -21,7 +21,7 @@ case object Terminate case class Locator(node: Node, name: Symbol) /** - * @version 0.9.10 + * @version 0.9.17 * @author Philipp Haller */ class NetKernel(service: Service) { @@ -69,20 +69,31 @@ class NetKernel(service: Service) { namedSend(senderLoc, receiverLoc, msg, session) } - def createProxy(node: Node, sym: Symbol): Actor = { - val p = Proxy(node, sym, this) + def createProxy(node: Node, sym: Symbol): Proxy = { + val p = new Proxy(node, sym, this) proxies += Pair((node, sym), p) p } - val proxies = new HashMap[(Node, Symbol), Actor] + val proxies = new HashMap[(Node, Symbol), Proxy] - def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized { - proxies.get((senderNode, senderName)) match { - case Some(senderProxy) => senderProxy - case None => createProxy(senderNode, senderName) + def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy = + proxies.synchronized { + proxies.get((senderNode, senderName)) match { + case Some(senderProxy) => senderProxy + case None => createProxy(senderNode, senderName) + } + } + + /* Register proxy if no other proxy has been registered. + */ + def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit = + proxies.synchronized { + proxies.get((senderNode, senderName)) match { + case Some(senderProxy) => // do nothing + case None => proxies += Pair((senderNode, senderName), p) + } } - } def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { @@ -118,77 +129,3 @@ class NetKernel(service: Service) { service.terminate() } } - -case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { - start() - - @transient - private var channelMap = new HashMap[Symbol, OutputChannel[Any]] - - @transient - private var sessionMap = new HashMap[Channel[Any], Symbol] - - def act() { - Debug.info(this+": waiting to process commands") - loop { - react { - // Request from remote proxy. - // `this` is local proxy. - case cmd@SendTo(out, msg, session) => - Debug.info(this+": processing "+cmd) - - // is this an active session? - channelMap.get(session) match { - case None => - // create a new reply channel... - val replyCh = new Channel[Any](this) - - // ...that maps to session - sessionMap += Pair(replyCh, session) - - // local send to actor - val a = out.asInstanceOf[Actor] - a.send(msg, replyCh) - - case Some(replyCh) => - replyCh ! msg - // TODO: - // remove `replyCh` from mapping - // to avoid memory leak (always safe?) - // or: use WeakHashMap - // however, it's the value (channel) - // that should be weak! - } - - case cmd@Terminate => - Debug.info(this+": processing "+cmd) - exit() - - // local proxy receives response to - // reply channel - case ch ! resp => - // lookup session ID - sessionMap.get(ch) match { - case Some(sid) => - val msg = resp.asInstanceOf[AnyRef] - // send back response - kernel.forward(sender, node, name, msg, sid) - - case None => - Debug.info(this+": cannot find session for "+ch) - } - - // remote proxy receives request - case msg: AnyRef => - // create fresh session ID... - val sid = FreshNameCreator.newName(node+"@"+name) - - // ...that maps to reply channel - channelMap += Pair(sid, sender) - - kernel.forward(sender, node, name, msg, sid) - } - } - } - -} diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala new file mode 100644 index 0000000000..d752d00139 --- /dev/null +++ b/src/actors/scala/actors/remote/Proxy.scala @@ -0,0 +1,140 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors.remote + +import scala.collection.mutable.HashMap + +/** + * @version 0.9.17 + * @author Philipp Haller + */ +@serializable +class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends OutputChannel[Any] { + import java.io.{IOException, ObjectOutputStream, ObjectInputStream} + + @transient + private[remote] var del: Actor = null + startDelegate() + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream) { + Debug.info("Serializing "+this) + out.defaultWriteObject() + } + + @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream) { + Debug.info("Deserializing "+this) + in.defaultReadObject() + setupKernel() + startDelegate() + } + + private def startDelegate() { + del = new DelegateActor(node, name, kernel) + del.start() + } + + private def setupKernel() { + kernel = RemoteActor.someKernel + kernel.registerProxy(node, name, this) + } + + def !(msg: Any): Unit = + del.send(msg, Actor.self) + + def send(msg: Any, replyCh: OutputChannel[Any]): Unit = + del.send(msg, replyCh) + + def forward(msg: Any): Unit = + del.send(msg, Actor.sender) + + def receiver: Actor = + del + + def !!(msg: Any): Future[Any] = + del !! msg + + override def toString() = + name+"@"+node +} + +/** + * @version 0.9.17 + * @author Philipp Haller + */ +private[remote] class DelegateActor(node: Node, name: Symbol, kernel: NetKernel) extends Actor { + var channelMap = new HashMap[Symbol, OutputChannel[Any]] + var sessionMap = new HashMap[Channel[Any], Symbol] + + def act() { + Debug.info(this+": waiting to process commands") + Actor.loop { + react { + // Request from remote proxy. + // `this` is local proxy. + case cmd@SendTo(out, msg, session) => + Debug.info(this+": processing "+cmd) + + // is this an active session? + channelMap.get(session) match { + case None => + // create a new reply channel... + val replyCh = new Channel[Any](this) + + // ...that maps to session + sessionMap += Pair(replyCh, session) + + // local send + out.send(msg, replyCh) + + case Some(replyCh) => + replyCh ! msg + // TODO: + // remove `replyCh` from mapping + // to avoid memory leak (always safe?) + // or: use WeakHashMap + // however, it's the value (channel) + // that should be weak! + } + + case cmd@Terminate => + Debug.info(this+": processing "+cmd) + exit() + + // local proxy receives response to + // reply channel + case ch ! resp => + // lookup session ID + sessionMap.get(ch) match { + case Some(sid) => + val msg = resp.asInstanceOf[AnyRef] + // send back response + kernel.forward(sender, node, name, msg, sid) + + case None => + Debug.info(this+": cannot find session for "+ch) + } + + // remote proxy receives request + case msg: AnyRef => + // create fresh session ID... + val sid = FreshNameCreator.newName(node+"@"+name) + + // ...that maps to reply channel + channelMap += Pair(sid, sender) + + kernel.forward(sender, node, name, msg, sid) + } + } + } + +} diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index 06b460d5e6..c37fd88432 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -38,7 +38,7 @@ package scala.actors.remote * } * * - * @version 0.9.10 + * @version 0.9.17 * @author Philipp Haller */ object RemoteActor { @@ -120,9 +120,12 @@ object RemoteActor { * Returns (a proxy for) the actor registered under * name on node. */ - def select(node: Node, sym: Symbol): Actor = synchronized { + def select(node: Node, sym: Symbol): OutputChannel[Any] = synchronized { selfKernel.getOrCreateProxy(node, sym) } + + def someKernel: NetKernel = + kernels.values.next } -- cgit v1.2.3