diff options
author | Philipp Haller <hallerp@gmail.com> | 2008-07-30 11:04:14 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2008-07-30 11:04:14 +0000 |
commit | 2eb46f56d2f894683ef368caefbd81679a0eb6e1 (patch) | |
tree | bfad454637e9271bb3b817a167a49cb2f2142682 /src/actors | |
parent | 078d9446bb9b0d935452280257b17aed3ebf6e0a (diff) | |
download | scala-2eb46f56d2f894683ef368caefbd81679a0eb6e1.tar.gz scala-2eb46f56d2f894683ef368caefbd81679a0eb6e1.tar.bz2 scala-2eb46f56d2f894683ef368caefbd81679a0eb6e1.zip |
Enabled actor links for remote actors.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/AbstractActor.scala | 32 | ||||
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 19 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 68 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/Proxy.scala | 55 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 6 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/TcpService.scala | 1 |
6 files changed, 150 insertions, 31 deletions
diff --git a/src/actors/scala/actors/AbstractActor.scala b/src/actors/scala/actors/AbstractActor.scala new file mode 100644 index 0000000000..5bd29021c0 --- /dev/null +++ b/src/actors/scala/actors/AbstractActor.scala @@ -0,0 +1,32 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * The <code>AbstractActor</code> trait... + * + * @version 0.9.18 + * @author Philipp Haller + */ +trait AbstractActor extends OutputChannel[Any] { + + private[actors] var exiting = false + + private[actors] def linkTo(to: AbstractActor): Unit + private[actors] def unlinkFrom(from: AbstractActor): Unit + + private[actors] def exit(from: AbstractActor, reason: AnyRef): Unit + + def !?(msg: Any): Any + def !?(msec: Long, msg: Any): Option[Any] + def !!(msg: Any): Future[Any] + def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] +} diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index e04af74459..3ec610e624 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -354,7 +354,7 @@ object Actor { * @author Philipp Haller */ @serializable -trait Actor extends OutputChannel[Any] { +trait Actor extends AbstractActor { private var received: Option[Any] = None @@ -792,7 +792,7 @@ trait Actor extends OutputChannel[Any] { throw new KillActorException } - private[actors] var links: List[Actor] = Nil + private[actors] var links: List[AbstractActor] = Nil /** * Links <code>self</code> to actor <code>to</code>. @@ -800,7 +800,7 @@ trait Actor extends OutputChannel[Any] { * @param to ... * @return ... */ - def link(to: Actor): Actor = { + def link(to: AbstractActor): AbstractActor = { assert(Actor.self == this, "link called on actor different from self") links = to :: links to.linkTo(this) @@ -819,26 +819,25 @@ trait Actor extends OutputChannel[Any] { actor } - private[actors] def linkTo(to: Actor) = synchronized { + private[actors] def linkTo(to: AbstractActor) = synchronized { links = to :: links } /** * Unlinks <code>self</code> from actor <code>from</code>. */ - def unlink(from: Actor) { + def unlink(from: AbstractActor) { assert(Actor.self == this, "unlink called on actor different from self") links = links.remove(from.==) from.unlinkFrom(this) } - private[actors] def unlinkFrom(from: Actor) = synchronized { + private[actors] def unlinkFrom(from: AbstractActor) = synchronized { links = links.remove(from.==) } var trapExit = false private[actors] var exitReason: AnyRef = 'normal - private[actors] var exiting = false private[actors] var shouldExit = false /** @@ -879,7 +878,7 @@ trait Actor extends OutputChannel[Any] { // remove this from links links = links.remove(this.==) // exit linked processes - links.foreach((linked: Actor) => { + links.foreach((linked: AbstractActor) => { unlink(linked) if (!linked.exiting) linked.exit(this, exitReason) @@ -893,7 +892,7 @@ trait Actor extends OutputChannel[Any] { } // Assume !this.exiting - private[actors] def exit(from: Actor, reason: AnyRef) { + private[actors] def exit(from: AbstractActor, reason: AnyRef) { if (trapExit) { this ! Exit(from, reason) } @@ -932,7 +931,7 @@ trait Actor extends OutputChannel[Any] { case object TIMEOUT -case class Exit(from: Actor, reason: AnyRef) +case class Exit(from: AbstractActor, reason: AnyRef) /** <p> * This class is used to manage control flow of actor diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index 0025b57de4..bb10d4c376 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -14,8 +14,14 @@ import scala.collection.mutable.{HashMap, HashSet} import scala.actors.Actor.loop case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol) - -case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol) +case class RemoteLinkTo(senderLoc: Locator, receiverLoc: Locator) +case class RemoteUnlinkFrom(senderLoc: Locator, receiverLoc: Locator) +case class RemoteExit(senderLoc: Locator, receiverLoc: Locator, reason: AnyRef) + +case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol) +case class LocalLinkTo(a: AbstractActor) +case class LocalUnlinkFrom(a: AbstractActor) +case class LocalExit(from: AbstractActor, reason: AnyRef) case object Terminate case class Locator(node: Node, name: Symbol) @@ -69,6 +75,24 @@ class NetKernel(service: Service) { namedSend(senderLoc, receiverLoc, msg, session) } + def linkTo(node: Node, name: Symbol, from: AbstractActor) { + val senderLoc = Locator(service.node, getOrCreateName(from)) + val receiverLoc = Locator(node, name) + sendToNode(receiverLoc.node, RemoteLinkTo(senderLoc, receiverLoc)) + } + + def unlinkFrom(node: Node, name: Symbol, from: AbstractActor) { + val senderLoc = Locator(service.node, getOrCreateName(from)) + val receiverLoc = Locator(node, name) + sendToNode(receiverLoc.node, RemoteUnlinkFrom(senderLoc, receiverLoc)) + } + + def exit(node: Node, name: Symbol, from: AbstractActor, reason: AnyRef) { + val senderLoc = Locator(service.node, getOrCreateName(from)) + val receiverLoc = Locator(node, name) + sendToNode(receiverLoc.node, RemoteExit(senderLoc, receiverLoc, reason)) + } + def createProxy(node: Node, sym: Symbol): Proxy = { val p = new Proxy(node, sym, this) proxies += Pair((node, sym), p) @@ -97,17 +121,49 @@ class NetKernel(service: Service) { def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { + case cmd@RemoteExit(senderLoc, receiverLoc, reason) => + Debug.info(this+": processing "+cmd) + actors.get(receiverLoc.name) match { + case Some(a) => + val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) + senderProxy.send(LocalExit(a.asInstanceOf[Actor], reason), null) + + case None => + // message is lost + Debug.info(this+": lost message") + } + + case cmd@RemoteUnlinkFrom(senderLoc, receiverLoc) => + Debug.info(this+": processing "+cmd) + actors.get(receiverLoc.name) match { + case Some(a) => + val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) + senderProxy.send(LocalUnlinkFrom(a.asInstanceOf[AbstractActor]), null) + + case None => + // message is lost + Debug.info(this+": lost message") + } + + case cmd@RemoteLinkTo(senderLoc, receiverLoc) => + Debug.info(this+": processing "+cmd) + actors.get(receiverLoc.name) match { + case Some(a) => + val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) + senderProxy.send(LocalLinkTo(a.asInstanceOf[AbstractActor]), null) + + case None => + // message is lost + Debug.info(this+": lost message") + } + case cmd@NamedSend(senderLoc, receiverLoc, data, session) => Debug.info(this+": processing "+cmd) actors.get(receiverLoc.name) match { case Some(a) => try { - Debug.info(this+": receiver is "+a) val msg = service.serializer.deserialize(data) - Debug.info(this+": deserialized msg is "+msg) - val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) - Debug.info(this+": created "+senderProxy) senderProxy.send(SendTo(a, msg, session), null) } catch { case e: Exception => diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala index d752d00139..c490aa9bb5 100644 --- a/src/actors/scala/actors/remote/Proxy.scala +++ b/src/actors/scala/actors/remote/Proxy.scala @@ -17,7 +17,7 @@ import scala.collection.mutable.HashMap * @author Philipp Haller */ @serializable -class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends OutputChannel[Any] { +class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor { import java.io.{IOException, ObjectOutputStream, ObjectInputStream} @transient @@ -26,20 +26,18 @@ class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream) { - Debug.info("Serializing "+this) out.defaultWriteObject() } @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException]) private def readObject(in: ObjectInputStream) { - Debug.info("Deserializing "+this) in.defaultReadObject() setupKernel() startDelegate() } private def startDelegate() { - del = new DelegateActor(node, name, kernel) + del = new DelegateActor(this, node, name, kernel) del.start() } @@ -49,41 +47,77 @@ class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends } def !(msg: Any): Unit = - del.send(msg, Actor.self) + del ! msg def send(msg: Any, replyCh: OutputChannel[Any]): Unit = del.send(msg, replyCh) def forward(msg: Any): Unit = - del.send(msg, Actor.sender) + del.forward(msg) def receiver: Actor = del + def !?(msg: Any): Any = + del !? msg + + def !?(msec: Long, msg: Any): Option[Any] = + del !? (msec, msg) + def !!(msg: Any): Future[Any] = del !! msg + def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = + del !! (msg, f) + + def linkTo(to: AbstractActor): Unit = + del ! LinkTo(to) + + def unlinkFrom(from: AbstractActor): Unit = + del ! UnlinkFrom(from) + + def exit(from: AbstractActor, reason: AnyRef): Unit = + del ! Exit(from, reason) + override def toString() = name+"@"+node } +case class LinkTo(to: AbstractActor) +case class UnlinkFrom(from: AbstractActor) + /** * @version 0.9.17 * @author Philipp Haller */ -private[remote] class DelegateActor(node: Node, name: Symbol, kernel: NetKernel) extends Actor { +private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor { var channelMap = new HashMap[Symbol, OutputChannel[Any]] var sessionMap = new HashMap[Channel[Any], Symbol] def act() { - Debug.info(this+": waiting to process commands") Actor.loop { react { + case cmd@LinkTo(to) => + kernel.linkTo(node, name, to) + + case cmd@UnlinkFrom(from) => + kernel.unlinkFrom(node, name, from) + + case cmd@Exit(from, reason) => + kernel.exit(node, name, from, reason) + + case cmd@LocalLinkTo(to) => + to.linkTo(creator) + + case cmd@LocalUnlinkFrom(from) => + from.unlinkFrom(creator) + + case cmd@LocalExit(to, reason) => + to.exit(creator, reason) + // Request from remote proxy. // `this` is local proxy. case cmd@SendTo(out, msg, session) => - Debug.info(this+": processing "+cmd) - // is this an active session? channelMap.get(session) match { case None => @@ -107,7 +141,6 @@ private[remote] class DelegateActor(node: Node, name: Symbol, kernel: NetKernel) } case cmd@Terminate => - Debug.info(this+": processing "+cmd) exit() // local proxy receives response to diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala index c37fd88432..ee64f1d9a8 100644 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -69,7 +69,6 @@ object RemoteActor { def createKernelOnPort(port: Int): NetKernel = { val serv = TcpService(port, cl) - Debug.info("created service at "+serv.node) val kern = serv.kernel val s = Actor.self kernels += Pair(s, kern) @@ -97,8 +96,7 @@ object RemoteActor { def register(name: Symbol, a: Actor): Unit = synchronized { val kernel = kernels.get(Actor.self) match { case None => - val serv = new TcpService(TcpService.generatePort, cl) - serv.start() + val serv = TcpService(TcpService.generatePort, cl) kernels += Pair(Actor.self, serv.kernel) serv.kernel case Some(k) => @@ -120,7 +118,7 @@ object RemoteActor { * Returns (a proxy for) the actor registered under * <code>name</code> on <code>node</code>. */ - def select(node: Node, sym: Symbol): OutputChannel[Any] = synchronized { + def select(node: Node, sym: Symbol): AbstractActor = synchronized { selfKernel.getOrCreateProxy(node, sym) } diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index 11af1bbbee..eb6177f7cd 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -36,6 +36,7 @@ object TcpService { val service = new TcpService(port, cl) ports += Pair(port, service) service.start() + Debug.info("created service at "+service.node) service } |