From 097993aea447cc57c4f5701c6a2e97d6953597e9 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 16 Jul 2009 17:24:37 +0000 Subject: Fixed memory leak in remote actors. --- src/actors/scala/actors/remote/NetKernel.scala | 2 +- src/actors/scala/actors/remote/Proxy.scala | 65 +++++++++++++------------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index acc537476f..b7376a4b46 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -26,7 +26,7 @@ case class Locator(node: Node, name: Symbol) * @version 0.9.17 * @author Philipp Haller */ -class NetKernel(service: Service) { +private[remote] class NetKernel(service: Service) { def sendToNode(node: Node, msg: AnyRef) = { val bytes = service.serializer.serialize(msg) diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala index 425e98635f..1c7e6f4e55 100644 --- a/src/actors/scala/actors/remote/Proxy.scala +++ b/src/actors/scala/actors/remote/Proxy.scala @@ -13,11 +13,10 @@ package scala.actors.remote import scala.collection.mutable.HashMap /** - * @version 0.9.17 * @author Philipp Haller */ @serializable -class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor { +private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor { import java.io.{IOException, ObjectOutputStream, ObjectInputStream} @transient @@ -110,15 +109,14 @@ class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] { "("+reason.toString+")" } -case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit]) +private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit]) /** - * @version 0.9.17 * @author Philipp Haller */ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor { var channelMap = new HashMap[Symbol, OutputChannel[Any]] - var sessionMap = new HashMap[Channel[Any], Symbol] + var sessionMap = new HashMap[OutputChannel[Any], Symbol] def act() { Actor.loop { @@ -132,26 +130,25 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke // Request from remote proxy. // `this` is local proxy. case cmd@SendTo(out, msg, session) => - // is this an active session? - channelMap.get(session) match { - case None => - // create a new reply channel... - val replyCh = new Channel[Any](this) - - // ...that maps to session - sessionMap += Pair(replyCh, session) - - // local send - out.send(msg, replyCh) - - case Some(replyCh) => - replyCh ! msg - // TODO: - // remove `replyCh` from mapping - // to avoid memory leak (always safe?) - // or: use WeakHashMap - // however, it's the value (channel) - // that should be weak! + if (session.name == "nosession") { + // local send + out.send(msg, this) + } else { + // is this an active session? + channelMap.get(session) match { + case None => + // create a new reply channel... + val replyCh = new Channel[Any](this) + // ...that maps to session + sessionMap += Pair(replyCh, session) + // local send + out.send(msg, replyCh) + + // finishes request-reply cycle + case Some(replyCh) => + channelMap -= session + replyCh ! msg + } } case cmd@Terminate => @@ -163,6 +160,7 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke // lookup session ID sessionMap.get(ch) match { case Some(sid) => + sessionMap -= ch val msg = resp.asInstanceOf[AnyRef] // send back response kernel.forward(sender, node, name, msg, sid) @@ -173,13 +171,16 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke // remote proxy receives request case msg: AnyRef => - // create fresh session ID... - val sid = FreshNameCreator.newName(node+"@"+name) - - // ...that maps to reply channel - channelMap += Pair(sid, sender) - - kernel.forward(sender, node, name, msg, sid) + // find out whether it's a synchronous send + if (sender.getClass.toString.contains("Channel")) { + // create fresh session ID... + val fresh = FreshNameCreator.newName(node+"@"+name) + // ...that maps to reply channel + channelMap += Pair(fresh, sender) + kernel.forward(sender, node, name, msg, fresh) + } else { + kernel.forward(sender, node, name, msg, 'nosession) + } } } } -- cgit v1.2.3