summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala183
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala1
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala7
3 files changed, 91 insertions, 100 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index c2d3723033..7cb776257e 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -13,15 +13,13 @@ package scala.actors.remote
import scala.collection.mutable.{HashMap, HashSet}
import scala.actors.Actor.loop
-case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
-case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
-case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)
-case class SendTo(a: Actor, msg: Any)
-case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol)
-case class ReplyTo(a: Actor, msg: Any)
+case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
case object Terminate
+case class Locator(node: Node, name: Symbol)
+
/**
* @version 0.9.10
* @author Philipp Haller
@@ -33,52 +31,46 @@ class NetKernel(service: Service) {
service.send(node, bytes)
}
- def namedSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
- val bytes = service.serializer.serialize(msg)
- sendToNode(node, NamedSend(sender, receiver, bytes))
- }
-
- def namedSyncSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
- val bytes = service.serializer.serialize(msg)
- val toSend = SyncSend(sender, receiver, bytes)
- sendToNode(node, toSend)
- }
-
- def sendReply(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
+ def namedSend(senderLoc: Locator, receiverLoc: Locator,
+ msg: AnyRef, session: Symbol) {
val bytes = service.serializer.serialize(msg)
- val toSend = Reply(sender, receiver, bytes)
- sendToNode(node, toSend)
+ sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session))
}
- private val actors = new HashMap[Symbol, Actor]
- private val names = new HashMap[Actor, Symbol]
+ private val actors = new HashMap[Symbol, OutputChannel[Any]]
+ private val names = new HashMap[OutputChannel[Any], Symbol]
- def register(name: Symbol, a: Actor): Unit = synchronized {
+ def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized {
actors += Pair(name, a)
names += Pair(a, name)
}
- def selfName = names.get(Actor.self) match {
+ def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match {
case None =>
val freshName = FreshNameCreator.newName("remotesender")
- register(freshName, Actor.self)
+ register(freshName, from)
freshName
case Some(name) =>
name
}
- def send(node: Node, name: Symbol, msg: AnyRef) {
- val senderName = selfName
- namedSend(node, senderName, name, msg)
+ def send(node: Node, name: Symbol, msg: AnyRef): Unit =
+ send(node, name, msg, 'nosession)
+
+ def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
+ val senderLoc = Locator(service.node, getOrCreateName(Actor.self))
+ val receiverLoc = Locator(node, name)
+ namedSend(senderLoc, receiverLoc, msg, session)
}
- def syncSend(node: Node, name: Symbol, msg: AnyRef) {
- val senderName = selfName
- namedSyncSend(node, senderName, name, msg)
+ def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
+ val senderLoc = Locator(service.node, getOrCreateName(from))
+ val receiverLoc = Locator(node, name)
+ namedSend(senderLoc, receiverLoc, msg, session)
}
def createProxy(node: Node, sym: Symbol): Actor = {
- val p = new Proxy(node, sym, this)
+ val p = Proxy(node, sym, this)
proxies += Pair((node, sym), p)
p
}
@@ -94,18 +86,18 @@ class NetKernel(service: Service) {
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
- case cmd@NamedSend(senderName, receiver, data) =>
+ case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
Debug.info(this+": processing "+cmd)
- actors.get(receiver) match {
+ actors.get(receiverLoc.name) match {
case Some(a) =>
try {
Debug.info(this+": receiver is "+a)
val msg = service.serializer.deserialize(data)
Debug.info(this+": deserialized msg is "+msg)
- val senderProxy = getOrCreateProxy(senderNode, senderName)
+ val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
Debug.info(this+": created "+senderProxy)
- senderProxy.send(SendTo(a, msg), null)
+ senderProxy.send(SendTo(a, msg, session), null)
} catch {
case e: Exception =>
Debug.error(this+": caught "+e)
@@ -115,30 +107,6 @@ class NetKernel(service: Service) {
// message is lost
Debug.info(this+": lost message")
}
- case cmd@SyncSend(senderName, receiver, data) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiver) match {
- case Some(a) =>
- val msg = service.serializer.deserialize(data)
-
- val senderProxy = getOrCreateProxy(senderNode, senderName)
- senderProxy.send(SyncSendTo(a, msg, receiver), null)
-
- case None =>
- // message is lost
- }
- case cmd@Reply(senderName, receiver, data) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiver) match {
- case Some(a) =>
- val msg = service.serializer.deserialize(data)
-
- val senderProxy = getOrCreateProxy(senderNode, senderName)
- senderProxy.send(ReplyTo(a, msg), null)
-
- case None =>
- // message is lost
- }
}
}
@@ -151,59 +119,76 @@ class NetKernel(service: Service) {
}
}
-class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
+case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
start()
- override def act() {
+ @transient
+ private var channelMap = new HashMap[Symbol, OutputChannel[Any]]
+
+ @transient
+ private var sessionMap = new HashMap[Channel[Any], Symbol]
+
+ def act() {
Debug.info(this+": waiting to process commands")
loop {
react {
- case cmd@SendTo(a, msg) =>
+ // Request from remote proxy.
+ // `this` is local proxy.
+ case cmd@SendTo(out, msg, session) =>
Debug.info(this+": processing "+cmd)
- a ! msg
- case cmd@SyncSendTo(a, msg, receiver) =>
- Debug.info(this+": processing "+cmd)
- val replyCh = new Channel[Any](this)
- a.send(msg, replyCh)
- val res = replyCh.receive {
- case x => x
- }
-
- res match {
- case refmsg: AnyRef =>
- kernel.sendReply(node, receiver, name, refmsg)
+ // is this an active session?
+ channelMap.get(session) match {
+ case None =>
+ // create a new reply channel...
+ val replyCh = new Channel[Any](this)
+
+ // ...that maps to session
+ sessionMap += Pair(replyCh, session)
+
+ // local send to actor
+ val a = out.asInstanceOf[Actor]
+ a.send(msg, replyCh)
+
+ case Some(replyCh) =>
+ replyCh ! msg
+ // TODO:
+ // remove `replyCh` from mapping
+ // to avoid memory leak (always safe?)
+ // or: use WeakHashMap
+ // however, it's the value (channel)
+ // that should be weak!
}
- case cmd@ReplyTo(a, msg) =>
- Debug.info(this+": processing "+cmd)
- a.replyChannel ! msg
-
case cmd@Terminate =>
Debug.info(this+": processing "+cmd)
exit()
- }
- }
- }
- override def !(msg: Any): Unit = msg match {
- case ch ! m =>
- // do not send remotely
- this.send(msg, Actor.self.replyChannel)
- case a: AnyRef =>
- kernel.send(node, name, a)
- case other =>
- error("Cannot send non-AnyRef value remotely.")
- }
+ // local proxy receives response to
+ // reply channel
+ case ch ! resp =>
+ // lookup session ID
+ sessionMap.get(ch) match {
+ case Some(sid) =>
+ val msg = resp.asInstanceOf[AnyRef]
+ // send back response
+ kernel.forward(sender, node, name, msg, sid)
+
+ case None =>
+ Debug.info(this+": cannot find session for "+ch)
+ }
- override def !?(msg: Any): Any = msg match {
- case a: AnyRef =>
- val replyCh = Actor.self.freshReplyChannel
- kernel.syncSend(node, name, a)
- replyCh.receive {
- case x => x
+ // remote proxy receives request
+ case msg: AnyRef =>
+ // create fresh session ID...
+ val sid = FreshNameCreator.newName(node+"@"+name)
+
+ // ...that maps to reply channel
+ channelMap += Pair(sid, sender)
+
+ kernel.forward(sender, node, name, msg, sid)
}
- case other =>
- error("Cannot send non-AnyRef value remotely.")
+ }
}
+
}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 62d983a25d..06b460d5e6 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -69,6 +69,7 @@ object RemoteActor {
def createKernelOnPort(port: Int): NetKernel = {
val serv = TcpService(port, cl)
+ Debug.info("created service at "+serv.node)
val kern = serv.kernel
val s = Actor.self
kernels += Pair(s, kern)
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index 9377bf1f71..1f1dec4369 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -122,7 +122,12 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service {
def terminate() {
shouldTerminate = true
- new Socket(internalNode.address, internalNode.port)
+ try {
+ new Socket(internalNode.address, internalNode.port)
+ } catch {
+ case ce: java.net.ConnectException =>
+ Debug.info(this+": caught "+ce)
+ }
}
private var shouldTerminate = false