summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/remote/NetKernel.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-07-02 18:02:18 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-07-02 18:02:18 +0000
commit3f256f905fdf2d35c38ce049920dcc5338501db9 (patch)
tree27063b0e135f1ec13b310cb79412ec58e2047483 /src/actors/scala/actors/remote/NetKernel.scala
parent3b18a3f00475bea500309d9c4d2ad4bb6ee61abe (diff)
downloadscala-3f256f905fdf2d35c38ce049920dcc5338501db9.tar.gz
scala-3f256f905fdf2d35c38ce049920dcc5338501db9.tar.bz2
scala-3f256f905fdf2d35c38ce049920dcc5338501db9.zip
Fixed #1085.
Diffstat (limited to 'src/actors/scala/actors/remote/NetKernel.scala')
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala103
1 files changed, 20 insertions, 83 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 7cb776257e..0025b57de4 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -21,7 +21,7 @@ case object Terminate
case class Locator(node: Node, name: Symbol)
/**
- * @version 0.9.10
+ * @version 0.9.17
* @author Philipp Haller
*/
class NetKernel(service: Service) {
@@ -69,20 +69,31 @@ class NetKernel(service: Service) {
namedSend(senderLoc, receiverLoc, msg, session)
}
- def createProxy(node: Node, sym: Symbol): Actor = {
- val p = Proxy(node, sym, this)
+ def createProxy(node: Node, sym: Symbol): Proxy = {
+ val p = new Proxy(node, sym, this)
proxies += Pair((node, sym), p)
p
}
- val proxies = new HashMap[(Node, Symbol), Actor]
+ val proxies = new HashMap[(Node, Symbol), Proxy]
- def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized {
- proxies.get((senderNode, senderName)) match {
- case Some(senderProxy) => senderProxy
- case None => createProxy(senderNode, senderName)
+ def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy =
+ proxies.synchronized {
+ proxies.get((senderNode, senderName)) match {
+ case Some(senderProxy) => senderProxy
+ case None => createProxy(senderNode, senderName)
+ }
+ }
+
+ /* Register proxy if no other proxy has been registered.
+ */
+ def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit =
+ proxies.synchronized {
+ proxies.get((senderNode, senderName)) match {
+ case Some(senderProxy) => // do nothing
+ case None => proxies += Pair((senderNode, senderName), p)
+ }
}
- }
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
@@ -118,77 +129,3 @@ class NetKernel(service: Service) {
service.terminate()
}
}
-
-case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
- start()
-
- @transient
- private var channelMap = new HashMap[Symbol, OutputChannel[Any]]
-
- @transient
- private var sessionMap = new HashMap[Channel[Any], Symbol]
-
- def act() {
- Debug.info(this+": waiting to process commands")
- loop {
- react {
- // Request from remote proxy.
- // `this` is local proxy.
- case cmd@SendTo(out, msg, session) =>
- Debug.info(this+": processing "+cmd)
-
- // is this an active session?
- channelMap.get(session) match {
- case None =>
- // create a new reply channel...
- val replyCh = new Channel[Any](this)
-
- // ...that maps to session
- sessionMap += Pair(replyCh, session)
-
- // local send to actor
- val a = out.asInstanceOf[Actor]
- a.send(msg, replyCh)
-
- case Some(replyCh) =>
- replyCh ! msg
- // TODO:
- // remove `replyCh` from mapping
- // to avoid memory leak (always safe?)
- // or: use WeakHashMap
- // however, it's the value (channel)
- // that should be weak!
- }
-
- case cmd@Terminate =>
- Debug.info(this+": processing "+cmd)
- exit()
-
- // local proxy receives response to
- // reply channel
- case ch ! resp =>
- // lookup session ID
- sessionMap.get(ch) match {
- case Some(sid) =>
- val msg = resp.asInstanceOf[AnyRef]
- // send back response
- kernel.forward(sender, node, name, msg, sid)
-
- case None =>
- Debug.info(this+": cannot find session for "+ch)
- }
-
- // remote proxy receives request
- case msg: AnyRef =>
- // create fresh session ID...
- val sid = FreshNameCreator.newName(node+"@"+name)
-
- // ...that maps to reply channel
- channelMap += Pair(sid, sender)
-
- kernel.forward(sender, node, name, msg, sid)
- }
- }
- }
-
-}