diff options
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 183 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 1 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/TcpService.scala | 7 |
3 files changed, 91 insertions, 100 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index c2d3723033..7cb776257e 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -13,15 +13,13 @@ package scala.actors.remote import scala.collection.mutable.{HashMap, HashSet} import scala.actors.Actor.loop -case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) -case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) -case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte]) +case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol) -case class SendTo(a: Actor, msg: Any) -case class SyncSendTo(a: Actor, msg: Any, receiver: Symbol) -case class ReplyTo(a: Actor, msg: Any) +case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol) case object Terminate +case class Locator(node: Node, name: Symbol) + /** * @version 0.9.10 * @author Philipp Haller @@ -33,52 +31,46 @@ class NetKernel(service: Service) { service.send(node, bytes) } - def namedSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { - val bytes = service.serializer.serialize(msg) - sendToNode(node, NamedSend(sender, receiver, bytes)) - } - - def namedSyncSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { - val bytes = service.serializer.serialize(msg) - val toSend = SyncSend(sender, receiver, bytes) - sendToNode(node, toSend) - } - - def sendReply(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) { + def namedSend(senderLoc: Locator, receiverLoc: Locator, + msg: AnyRef, session: Symbol) { val bytes = service.serializer.serialize(msg) - val toSend = Reply(sender, receiver, bytes) - sendToNode(node, toSend) + sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session)) } - private val actors = new HashMap[Symbol, Actor] - private val names = new HashMap[Actor, Symbol] + private val actors = new HashMap[Symbol, OutputChannel[Any]] + private val names = new HashMap[OutputChannel[Any], Symbol] - def register(name: Symbol, a: Actor): Unit = synchronized { + def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized { actors += Pair(name, a) names += Pair(a, name) } - def selfName = names.get(Actor.self) match { + def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match { case None => val freshName = FreshNameCreator.newName("remotesender") - register(freshName, Actor.self) + register(freshName, from) freshName case Some(name) => name } - def send(node: Node, name: Symbol, msg: AnyRef) { - val senderName = selfName - namedSend(node, senderName, name, msg) + def send(node: Node, name: Symbol, msg: AnyRef): Unit = + send(node, name, msg, 'nosession) + + def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) { + val senderLoc = Locator(service.node, getOrCreateName(Actor.self)) + val receiverLoc = Locator(node, name) + namedSend(senderLoc, receiverLoc, msg, session) } - def syncSend(node: Node, name: Symbol, msg: AnyRef) { - val senderName = selfName - namedSyncSend(node, senderName, name, msg) + def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) { + val senderLoc = Locator(service.node, getOrCreateName(from)) + val receiverLoc = Locator(node, name) + namedSend(senderLoc, receiverLoc, msg, session) } def createProxy(node: Node, sym: Symbol): Actor = { - val p = new Proxy(node, sym, this) + val p = Proxy(node, sym, this) proxies += Pair((node, sym), p) p } @@ -94,18 +86,18 @@ class NetKernel(service: Service) { def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { - case cmd@NamedSend(senderName, receiver, data) => + case cmd@NamedSend(senderLoc, receiverLoc, data, session) => Debug.info(this+": processing "+cmd) - actors.get(receiver) match { + actors.get(receiverLoc.name) match { case Some(a) => try { Debug.info(this+": receiver is "+a) val msg = service.serializer.deserialize(data) Debug.info(this+": deserialized msg is "+msg) - val senderProxy = getOrCreateProxy(senderNode, senderName) + val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) Debug.info(this+": created "+senderProxy) - senderProxy.send(SendTo(a, msg), null) + senderProxy.send(SendTo(a, msg, session), null) } catch { case e: Exception => Debug.error(this+": caught "+e) @@ -115,30 +107,6 @@ class NetKernel(service: Service) { // message is lost Debug.info(this+": lost message") } - case cmd@SyncSend(senderName, receiver, data) => - Debug.info(this+": processing "+cmd) - actors.get(receiver) match { - case Some(a) => - val msg = service.serializer.deserialize(data) - - val senderProxy = getOrCreateProxy(senderNode, senderName) - senderProxy.send(SyncSendTo(a, msg, receiver), null) - - case None => - // message is lost - } - case cmd@Reply(senderName, receiver, data) => - Debug.info(this+": processing "+cmd) - actors.get(receiver) match { - case Some(a) => - val msg = service.serializer.deserialize(data) - - val senderProxy = getOrCreateProxy(senderNode, senderName) - senderProxy.send(ReplyTo(a, msg), null) - - case None => - // message is lost - } } } @@ -151,59 +119,76 @@ class NetKernel(service: Service) { } } -class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { +case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor { start() - override def act() { + @transient + private var channelMap = new HashMap[Symbol, OutputChannel[Any]] + + @transient + private var sessionMap = new HashMap[Channel[Any], Symbol] + + def act() { Debug.info(this+": waiting to process commands") loop { react { - case cmd@SendTo(a, msg) => + // Request from remote proxy. + // `this` is local proxy. + case cmd@SendTo(out, msg, session) => Debug.info(this+": processing "+cmd) - a ! msg - case cmd@SyncSendTo(a, msg, receiver) => - Debug.info(this+": processing "+cmd) - val replyCh = new Channel[Any](this) - a.send(msg, replyCh) - val res = replyCh.receive { - case x => x - } - - res match { - case refmsg: AnyRef => - kernel.sendReply(node, receiver, name, refmsg) + // is this an active session? + channelMap.get(session) match { + case None => + // create a new reply channel... + val replyCh = new Channel[Any](this) + + // ...that maps to session + sessionMap += Pair(replyCh, session) + + // local send to actor + val a = out.asInstanceOf[Actor] + a.send(msg, replyCh) + + case Some(replyCh) => + replyCh ! msg + // TODO: + // remove `replyCh` from mapping + // to avoid memory leak (always safe?) + // or: use WeakHashMap + // however, it's the value (channel) + // that should be weak! } - case cmd@ReplyTo(a, msg) => - Debug.info(this+": processing "+cmd) - a.replyChannel ! msg - case cmd@Terminate => Debug.info(this+": processing "+cmd) exit() - } - } - } - override def !(msg: Any): Unit = msg match { - case ch ! m => - // do not send remotely - this.send(msg, Actor.self.replyChannel) - case a: AnyRef => - kernel.send(node, name, a) - case other => - error("Cannot send non-AnyRef value remotely.") - } + // local proxy receives response to + // reply channel + case ch ! resp => + // lookup session ID + sessionMap.get(ch) match { + case Some(sid) => + val msg = resp.asInstanceOf[AnyRef] + // send back response + kernel.forward(sender, node, name, msg, sid) + + case None => + Debug.info(this+": cannot find session for "+ch) + } - override def !?(msg: Any): Any = msg match { - case a: AnyRef => - val replyCh = Actor.self.freshReplyChannel - kernel.syncSend(node, name, a) - replyCh.receive { - case x => x + // remote proxy receives request + case msg: AnyRef => + // create fresh session ID... + val sid = FreshNameCreator.newName(node+"@"+name) + + // ...that maps to reply channel + channelMap += Pair(sid, sender) + + kernel.forward(sender, node, name, msg, sid) } - case other => - error("Cannot send non-AnyRef value remotely.") + } } + } diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index 62d983a25d..06b460d5e6 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -69,6 +69,7 @@ object RemoteActor { def createKernelOnPort(port: Int): NetKernel = { val serv = TcpService(port, cl) + Debug.info("created service at "+serv.node) val kern = serv.kernel val s = Actor.self kernels += Pair(s, kern) diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index 9377bf1f71..1f1dec4369 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -122,7 +122,12 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { def terminate() { shouldTerminate = true - new Socket(internalNode.address, internalNode.port) + try { + new Socket(internalNode.address, internalNode.port) + } catch { + case ce: java.net.ConnectException => + Debug.info(this+": caught "+ce) + } } private var shouldTerminate = false |