summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-07-16 17:24:37 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-07-16 17:24:37 +0000
commit097993aea447cc57c4f5701c6a2e97d6953597e9 (patch)
tree1fcb76927e05bcad78985e05356528aaed5f43db
parent58bc0b3a53e6172bd9daf3701afb94b327c393c6 (diff)
downloadscala-097993aea447cc57c4f5701c6a2e97d6953597e9.tar.gz
scala-097993aea447cc57c4f5701c6a2e97d6953597e9.tar.bz2
scala-097993aea447cc57c4f5701c6a2e97d6953597e9.zip
Fixed memory leak in remote actors.
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala2
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala65
2 files changed, 34 insertions, 33 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index acc537476f..b7376a4b46 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -26,7 +26,7 @@ case class Locator(node: Node, name: Symbol)
* @version 0.9.17
* @author Philipp Haller
*/
-class NetKernel(service: Service) {
+private[remote] class NetKernel(service: Service) {
def sendToNode(node: Node, msg: AnyRef) = {
val bytes = service.serializer.serialize(msg)
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
index 425e98635f..1c7e6f4e55 100644
--- a/src/actors/scala/actors/remote/Proxy.scala
+++ b/src/actors/scala/actors/remote/Proxy.scala
@@ -13,11 +13,10 @@ package scala.actors.remote
import scala.collection.mutable.HashMap
/**
- * @version 0.9.17
* @author Philipp Haller
*/
@serializable
-class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor {
+private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor {
import java.io.{IOException, ObjectOutputStream, ObjectInputStream}
@transient
@@ -110,15 +109,14 @@ class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] {
"<ExitFun>("+reason.toString+")"
}
-case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit])
+private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit])
/**
- * @version 0.9.17
* @author Philipp Haller
*/
private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor {
var channelMap = new HashMap[Symbol, OutputChannel[Any]]
- var sessionMap = new HashMap[Channel[Any], Symbol]
+ var sessionMap = new HashMap[OutputChannel[Any], Symbol]
def act() {
Actor.loop {
@@ -132,26 +130,25 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke
// Request from remote proxy.
// `this` is local proxy.
case cmd@SendTo(out, msg, session) =>
- // is this an active session?
- channelMap.get(session) match {
- case None =>
- // create a new reply channel...
- val replyCh = new Channel[Any](this)
-
- // ...that maps to session
- sessionMap += Pair(replyCh, session)
-
- // local send
- out.send(msg, replyCh)
-
- case Some(replyCh) =>
- replyCh ! msg
- // TODO:
- // remove `replyCh` from mapping
- // to avoid memory leak (always safe?)
- // or: use WeakHashMap
- // however, it's the value (channel)
- // that should be weak!
+ if (session.name == "nosession") {
+ // local send
+ out.send(msg, this)
+ } else {
+ // is this an active session?
+ channelMap.get(session) match {
+ case None =>
+ // create a new reply channel...
+ val replyCh = new Channel[Any](this)
+ // ...that maps to session
+ sessionMap += Pair(replyCh, session)
+ // local send
+ out.send(msg, replyCh)
+
+ // finishes request-reply cycle
+ case Some(replyCh) =>
+ channelMap -= session
+ replyCh ! msg
+ }
}
case cmd@Terminate =>
@@ -163,6 +160,7 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke
// lookup session ID
sessionMap.get(ch) match {
case Some(sid) =>
+ sessionMap -= ch
val msg = resp.asInstanceOf[AnyRef]
// send back response
kernel.forward(sender, node, name, msg, sid)
@@ -173,13 +171,16 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke
// remote proxy receives request
case msg: AnyRef =>
- // create fresh session ID...
- val sid = FreshNameCreator.newName(node+"@"+name)
-
- // ...that maps to reply channel
- channelMap += Pair(sid, sender)
-
- kernel.forward(sender, node, name, msg, sid)
+ // find out whether it's a synchronous send
+ if (sender.getClass.toString.contains("Channel")) {
+ // create fresh session ID...
+ val fresh = FreshNameCreator.newName(node+"@"+name)
+ // ...that maps to reply channel
+ channelMap += Pair(fresh, sender)
+ kernel.forward(sender, node, name, msg, fresh)
+ } else {
+ kernel.forward(sender, node, name, msg, 'nosession)
+ }
}
}
}