diff options
author | Philipp Haller <hallerp@gmail.com> | 2008-07-02 18:02:18 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2008-07-02 18:02:18 +0000 |
commit | 3f256f905fdf2d35c38ce049920dcc5338501db9 (patch) | |
tree | 27063b0e135f1ec13b310cb79412ec58e2047483 /src/actors/scala/actors/remote/NetKernel.scala | |
parent | 3b18a3f00475bea500309d9c4d2ad4bb6ee61abe (diff) | |
download | scala-3f256f905fdf2d35c38ce049920dcc5338501db9.tar.gz scala-3f256f905fdf2d35c38ce049920dcc5338501db9.tar.bz2 scala-3f256f905fdf2d35c38ce049920dcc5338501db9.zip |
Fixed #1085.
Diffstat (limited to 'src/actors/scala/actors/remote/NetKernel.scala')
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 103 |
1 files changed, 20 insertions, 83 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index 7cb776257e..0025b57de4 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -21,7 +21,7 @@ case object Terminate case class Locator(node: Node, name: Symbol) /** - * @version 0.9.10 + * @version 0.9.17 * @author Philipp Haller */ class NetKernel(service: Service) { @@ -69,20 +69,31 @@ class NetKernel(service: Service) { namedSend(senderLoc, receiverLoc, msg, session) } - def createProxy(node: Node, sym: Symbol): Actor = { - val p = Proxy(node, sym, this) + def createProxy(node: Node, sym: Symbol): Proxy = { + val p = new Proxy(node, sym, this) proxies += Pair((node, sym), p) p } - val proxies = new HashMap[(Node, Symbol), Actor] + val proxies = new HashMap[(Node, Symbol), Proxy] - def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized { - proxies.get((senderNode, senderName)) match { - case Some(senderProxy) => senderProxy - case None => createProxy(senderNode, senderName) + def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy = + proxies.synchronized { + proxies.get((senderNode, senderName)) match { + case Some(senderProxy) => senderProxy + case None => createProxy(senderNode, senderName) + } + } + + /* Register proxy if no other proxy has been registered. + */ + def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit = + proxies.synchronized { + proxies.get((senderNode, senderName)) match { + case Some(senderProxy) => // do nothing + case None => proxies += Pair((senderNode, senderName), p) + } } - } def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { @@ -118,77 +129,3 @@ class NetKernel(service: Service) { service.terminate() } } - -case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { - start() - - @transient - private var channelMap = new HashMap[Symbol, OutputChannel[Any]] - - @transient - private var sessionMap = new HashMap[Channel[Any], Symbol] - - def act() { - Debug.info(this+": waiting to process commands") - loop { - react { - // Request from remote proxy. - // `this` is local proxy. - case cmd@SendTo(out, msg, session) => - Debug.info(this+": processing "+cmd) - - // is this an active session? - channelMap.get(session) match { - case None => - // create a new reply channel... - val replyCh = new Channel[Any](this) - - // ...that maps to session - sessionMap += Pair(replyCh, session) - - // local send to actor - val a = out.asInstanceOf[Actor] - a.send(msg, replyCh) - - case Some(replyCh) => - replyCh ! msg - // TODO: - // remove `replyCh` from mapping - // to avoid memory leak (always safe?) - // or: use WeakHashMap - // however, it's the value (channel) - // that should be weak! - } - - case cmd@Terminate => - Debug.info(this+": processing "+cmd) - exit() - - // local proxy receives response to - // reply channel - case ch ! resp => - // lookup session ID - sessionMap.get(ch) match { - case Some(sid) => - val msg = resp.asInstanceOf[AnyRef] - // send back response - kernel.forward(sender, node, name, msg, sid) - - case None => - Debug.info(this+": cannot find session for "+ch) - } - - // remote proxy receives request - case msg: AnyRef => - // create fresh session ID... - val sid = FreshNameCreator.newName(node+"@"+name) - - // ...that maps to reply channel - channelMap += Pair(sid, sender) - - kernel.forward(sender, node, name, msg, sid) - } - } - } - -} |