summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-07-02 18:02:18 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-07-02 18:02:18 +0000
commit3f256f905fdf2d35c38ce049920dcc5338501db9 (patch)
tree27063b0e135f1ec13b310cb79412ec58e2047483 /src
parent3b18a3f00475bea500309d9c4d2ad4bb6ee61abe (diff)
downloadscala-3f256f905fdf2d35c38ce049920dcc5338501db9.tar.gz
scala-3f256f905fdf2d35c38ce049920dcc5338501db9.tar.bz2
scala-3f256f905fdf2d35c38ce049920dcc5338501db9.zip
Fixed #1085.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Channel.scala13
-rw-r--r--src/actors/scala/actors/OutputChannel.scala12
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala103
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala140
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala7
5 files changed, 188 insertions, 87 deletions
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index ac195ef9bf..d094c961fd 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -35,7 +35,7 @@ case class ! [a](ch: Channel[a], msg: a)
* actors. Only the actor creating an instance of a
* <code>Channel</code> may receive from it.
*
- * @version 0.9.9
+ * @version 0.9.17
* @author Philipp Haller
*/
class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
@@ -62,6 +62,17 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
/**
+ * Sends a message to this <code>Channel</code>
+ * (asynchronous) supplying explicit reply destination.
+ *
+ * @param msg the message to send
+ * @param replyTo the reply destination
+ */
+ def send(msg: Msg, replyTo: OutputChannel[Any]) {
+ recv.send(scala.actors.!(this, msg), replyTo)
+ }
+
+ /**
* Forwards <code>msg</code> to <code>this</code> keeping the
* last sender as sender instead of <code>self</code>.
*/
diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala
index 1cc24d6dd0..6c2fe8e756 100644
--- a/src/actors/scala/actors/OutputChannel.scala
+++ b/src/actors/scala/actors/OutputChannel.scala
@@ -14,7 +14,7 @@ package scala.actors
* The <code>OutputChannel</code> trait provides a common interface
* for all channels to which values can be sent.
*
- * @version 0.9.9
+ * @version 0.9.17
* @author Philipp Haller
*/
trait OutputChannel[-Msg] {
@@ -26,6 +26,16 @@ trait OutputChannel[-Msg] {
def !(msg: Msg): Unit
/**
+ * Sends <code>msg</code> to this
+ * <code>OutputChannel</code> (asynchronous) supplying
+ * explicit reply destination.
+ *
+ * @param msg the message to send
+ * @param replyTo the reply destination
+ */
+ def send(msg: Msg, replyTo: OutputChannel[Any]): Unit
+
+ /**
* Forwards <code>msg</code> to this
* <code>OutputChannel</code> (asynchronous).
*/
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 7cb776257e..0025b57de4 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -21,7 +21,7 @@ case object Terminate
case class Locator(node: Node, name: Symbol)
/**
- * @version 0.9.10
+ * @version 0.9.17
* @author Philipp Haller
*/
class NetKernel(service: Service) {
@@ -69,20 +69,31 @@ class NetKernel(service: Service) {
namedSend(senderLoc, receiverLoc, msg, session)
}
- def createProxy(node: Node, sym: Symbol): Actor = {
- val p = Proxy(node, sym, this)
+ def createProxy(node: Node, sym: Symbol): Proxy = {
+ val p = new Proxy(node, sym, this)
proxies += Pair((node, sym), p)
p
}
- val proxies = new HashMap[(Node, Symbol), Actor]
+ val proxies = new HashMap[(Node, Symbol), Proxy]
- def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized {
- proxies.get((senderNode, senderName)) match {
- case Some(senderProxy) => senderProxy
- case None => createProxy(senderNode, senderName)
+ def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy =
+ proxies.synchronized {
+ proxies.get((senderNode, senderName)) match {
+ case Some(senderProxy) => senderProxy
+ case None => createProxy(senderNode, senderName)
+ }
+ }
+
+ /* Register proxy if no other proxy has been registered.
+ */
+ def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit =
+ proxies.synchronized {
+ proxies.get((senderNode, senderName)) match {
+ case Some(senderProxy) => // do nothing
+ case None => proxies += Pair((senderNode, senderName), p)
+ }
}
- }
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
@@ -118,77 +129,3 @@ class NetKernel(service: Service) {
service.terminate()
}
}
-
-case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
- start()
-
- @transient
- private var channelMap = new HashMap[Symbol, OutputChannel[Any]]
-
- @transient
- private var sessionMap = new HashMap[Channel[Any], Symbol]
-
- def act() {
- Debug.info(this+": waiting to process commands")
- loop {
- react {
- // Request from remote proxy.
- // `this` is local proxy.
- case cmd@SendTo(out, msg, session) =>
- Debug.info(this+": processing "+cmd)
-
- // is this an active session?
- channelMap.get(session) match {
- case None =>
- // create a new reply channel...
- val replyCh = new Channel[Any](this)
-
- // ...that maps to session
- sessionMap += Pair(replyCh, session)
-
- // local send to actor
- val a = out.asInstanceOf[Actor]
- a.send(msg, replyCh)
-
- case Some(replyCh) =>
- replyCh ! msg
- // TODO:
- // remove `replyCh` from mapping
- // to avoid memory leak (always safe?)
- // or: use WeakHashMap
- // however, it's the value (channel)
- // that should be weak!
- }
-
- case cmd@Terminate =>
- Debug.info(this+": processing "+cmd)
- exit()
-
- // local proxy receives response to
- // reply channel
- case ch ! resp =>
- // lookup session ID
- sessionMap.get(ch) match {
- case Some(sid) =>
- val msg = resp.asInstanceOf[AnyRef]
- // send back response
- kernel.forward(sender, node, name, msg, sid)
-
- case None =>
- Debug.info(this+": cannot find session for "+ch)
- }
-
- // remote proxy receives request
- case msg: AnyRef =>
- // create fresh session ID...
- val sid = FreshNameCreator.newName(node+"@"+name)
-
- // ...that maps to reply channel
- channelMap += Pair(sid, sender)
-
- kernel.forward(sender, node, name, msg, sid)
- }
- }
- }
-
-}
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
new file mode 100644
index 0000000000..d752d00139
--- /dev/null
+++ b/src/actors/scala/actors/remote/Proxy.scala
@@ -0,0 +1,140 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors.remote
+
+import scala.collection.mutable.HashMap
+
+/**
+ * @version 0.9.17
+ * @author Philipp Haller
+ */
+@serializable
+class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends OutputChannel[Any] {
+ import java.io.{IOException, ObjectOutputStream, ObjectInputStream}
+
+ @transient
+ private[remote] var del: Actor = null
+ startDelegate()
+
+ @throws(classOf[IOException])
+ private def writeObject(out: ObjectOutputStream) {
+ Debug.info("Serializing "+this)
+ out.defaultWriteObject()
+ }
+
+ @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException])
+ private def readObject(in: ObjectInputStream) {
+ Debug.info("Deserializing "+this)
+ in.defaultReadObject()
+ setupKernel()
+ startDelegate()
+ }
+
+ private def startDelegate() {
+ del = new DelegateActor(node, name, kernel)
+ del.start()
+ }
+
+ private def setupKernel() {
+ kernel = RemoteActor.someKernel
+ kernel.registerProxy(node, name, this)
+ }
+
+ def !(msg: Any): Unit =
+ del.send(msg, Actor.self)
+
+ def send(msg: Any, replyCh: OutputChannel[Any]): Unit =
+ del.send(msg, replyCh)
+
+ def forward(msg: Any): Unit =
+ del.send(msg, Actor.sender)
+
+ def receiver: Actor =
+ del
+
+ def !!(msg: Any): Future[Any] =
+ del !! msg
+
+ override def toString() =
+ name+"@"+node
+}
+
+/**
+ * @version 0.9.17
+ * @author Philipp Haller
+ */
+private[remote] class DelegateActor(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
+ var channelMap = new HashMap[Symbol, OutputChannel[Any]]
+ var sessionMap = new HashMap[Channel[Any], Symbol]
+
+ def act() {
+ Debug.info(this+": waiting to process commands")
+ Actor.loop {
+ react {
+ // Request from remote proxy.
+ // `this` is local proxy.
+ case cmd@SendTo(out, msg, session) =>
+ Debug.info(this+": processing "+cmd)
+
+ // is this an active session?
+ channelMap.get(session) match {
+ case None =>
+ // create a new reply channel...
+ val replyCh = new Channel[Any](this)
+
+ // ...that maps to session
+ sessionMap += Pair(replyCh, session)
+
+ // local send
+ out.send(msg, replyCh)
+
+ case Some(replyCh) =>
+ replyCh ! msg
+ // TODO:
+ // remove `replyCh` from mapping
+ // to avoid memory leak (always safe?)
+ // or: use WeakHashMap
+ // however, it's the value (channel)
+ // that should be weak!
+ }
+
+ case cmd@Terminate =>
+ Debug.info(this+": processing "+cmd)
+ exit()
+
+ // local proxy receives response to
+ // reply channel
+ case ch ! resp =>
+ // lookup session ID
+ sessionMap.get(ch) match {
+ case Some(sid) =>
+ val msg = resp.asInstanceOf[AnyRef]
+ // send back response
+ kernel.forward(sender, node, name, msg, sid)
+
+ case None =>
+ Debug.info(this+": cannot find session for "+ch)
+ }
+
+ // remote proxy receives request
+ case msg: AnyRef =>
+ // create fresh session ID...
+ val sid = FreshNameCreator.newName(node+"@"+name)
+
+ // ...that maps to reply channel
+ channelMap += Pair(sid, sender)
+
+ kernel.forward(sender, node, name, msg, sid)
+ }
+ }
+ }
+
+}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 06b460d5e6..c37fd88432 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -38,7 +38,7 @@ package scala.actors.remote
* }
* </pre>
*
- * @version 0.9.10
+ * @version 0.9.17
* @author Philipp Haller
*/
object RemoteActor {
@@ -120,9 +120,12 @@ object RemoteActor {
* Returns (a proxy for) the actor registered under
* <code>name</code> on <code>node</code>.
*/
- def select(node: Node, sym: Symbol): Actor = synchronized {
+ def select(node: Node, sym: Symbol): OutputChannel[Any] = synchronized {
selfKernel.getOrCreateProxy(node, sym)
}
+
+ def someKernel: NetKernel =
+ kernels.values.next
}