summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-07-30 11:04:14 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-07-30 11:04:14 +0000
commit2eb46f56d2f894683ef368caefbd81679a0eb6e1 (patch)
treebfad454637e9271bb3b817a167a49cb2f2142682
parent078d9446bb9b0d935452280257b17aed3ebf6e0a (diff)
downloadscala-2eb46f56d2f894683ef368caefbd81679a0eb6e1.tar.gz
scala-2eb46f56d2f894683ef368caefbd81679a0eb6e1.tar.bz2
scala-2eb46f56d2f894683ef368caefbd81679a0eb6e1.zip
Enabled actor links for remote actors.
-rw-r--r--src/actors/scala/actors/AbstractActor.scala32
-rw-r--r--src/actors/scala/actors/Actor.scala19
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala68
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala55
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala6
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala1
6 files changed, 150 insertions, 31 deletions
diff --git a/src/actors/scala/actors/AbstractActor.scala b/src/actors/scala/actors/AbstractActor.scala
new file mode 100644
index 0000000000..5bd29021c0
--- /dev/null
+++ b/src/actors/scala/actors/AbstractActor.scala
@@ -0,0 +1,32 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * The <code>AbstractActor</code> trait...
+ *
+ * @version 0.9.18
+ * @author Philipp Haller
+ */
+trait AbstractActor extends OutputChannel[Any] {
+
+ private[actors] var exiting = false
+
+ private[actors] def linkTo(to: AbstractActor): Unit
+ private[actors] def unlinkFrom(from: AbstractActor): Unit
+
+ private[actors] def exit(from: AbstractActor, reason: AnyRef): Unit
+
+ def !?(msg: Any): Any
+ def !?(msec: Long, msg: Any): Option[Any]
+ def !!(msg: Any): Future[Any]
+ def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A]
+}
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index e04af74459..3ec610e624 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -354,7 +354,7 @@ object Actor {
* @author Philipp Haller
*/
@serializable
-trait Actor extends OutputChannel[Any] {
+trait Actor extends AbstractActor {
private var received: Option[Any] = None
@@ -792,7 +792,7 @@ trait Actor extends OutputChannel[Any] {
throw new KillActorException
}
- private[actors] var links: List[Actor] = Nil
+ private[actors] var links: List[AbstractActor] = Nil
/**
* Links <code>self</code> to actor <code>to</code>.
@@ -800,7 +800,7 @@ trait Actor extends OutputChannel[Any] {
* @param to ...
* @return ...
*/
- def link(to: Actor): Actor = {
+ def link(to: AbstractActor): AbstractActor = {
assert(Actor.self == this, "link called on actor different from self")
links = to :: links
to.linkTo(this)
@@ -819,26 +819,25 @@ trait Actor extends OutputChannel[Any] {
actor
}
- private[actors] def linkTo(to: Actor) = synchronized {
+ private[actors] def linkTo(to: AbstractActor) = synchronized {
links = to :: links
}
/**
* Unlinks <code>self</code> from actor <code>from</code>.
*/
- def unlink(from: Actor) {
+ def unlink(from: AbstractActor) {
assert(Actor.self == this, "unlink called on actor different from self")
links = links.remove(from.==)
from.unlinkFrom(this)
}
- private[actors] def unlinkFrom(from: Actor) = synchronized {
+ private[actors] def unlinkFrom(from: AbstractActor) = synchronized {
links = links.remove(from.==)
}
var trapExit = false
private[actors] var exitReason: AnyRef = 'normal
- private[actors] var exiting = false
private[actors] var shouldExit = false
/**
@@ -879,7 +878,7 @@ trait Actor extends OutputChannel[Any] {
// remove this from links
links = links.remove(this.==)
// exit linked processes
- links.foreach((linked: Actor) => {
+ links.foreach((linked: AbstractActor) => {
unlink(linked)
if (!linked.exiting)
linked.exit(this, exitReason)
@@ -893,7 +892,7 @@ trait Actor extends OutputChannel[Any] {
}
// Assume !this.exiting
- private[actors] def exit(from: Actor, reason: AnyRef) {
+ private[actors] def exit(from: AbstractActor, reason: AnyRef) {
if (trapExit) {
this ! Exit(from, reason)
}
@@ -932,7 +931,7 @@ trait Actor extends OutputChannel[Any] {
case object TIMEOUT
-case class Exit(from: Actor, reason: AnyRef)
+case class Exit(from: AbstractActor, reason: AnyRef)
/** <p>
* This class is used to manage control flow of actor
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 0025b57de4..bb10d4c376 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -14,8 +14,14 @@ import scala.collection.mutable.{HashMap, HashSet}
import scala.actors.Actor.loop
case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)
-
-case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
+case class RemoteLinkTo(senderLoc: Locator, receiverLoc: Locator)
+case class RemoteUnlinkFrom(senderLoc: Locator, receiverLoc: Locator)
+case class RemoteExit(senderLoc: Locator, receiverLoc: Locator, reason: AnyRef)
+
+case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
+case class LocalLinkTo(a: AbstractActor)
+case class LocalUnlinkFrom(a: AbstractActor)
+case class LocalExit(from: AbstractActor, reason: AnyRef)
case object Terminate
case class Locator(node: Node, name: Symbol)
@@ -69,6 +75,24 @@ class NetKernel(service: Service) {
namedSend(senderLoc, receiverLoc, msg, session)
}
+ def linkTo(node: Node, name: Symbol, from: AbstractActor) {
+ val senderLoc = Locator(service.node, getOrCreateName(from))
+ val receiverLoc = Locator(node, name)
+ sendToNode(receiverLoc.node, RemoteLinkTo(senderLoc, receiverLoc))
+ }
+
+ def unlinkFrom(node: Node, name: Symbol, from: AbstractActor) {
+ val senderLoc = Locator(service.node, getOrCreateName(from))
+ val receiverLoc = Locator(node, name)
+ sendToNode(receiverLoc.node, RemoteUnlinkFrom(senderLoc, receiverLoc))
+ }
+
+ def exit(node: Node, name: Symbol, from: AbstractActor, reason: AnyRef) {
+ val senderLoc = Locator(service.node, getOrCreateName(from))
+ val receiverLoc = Locator(node, name)
+ sendToNode(receiverLoc.node, RemoteExit(senderLoc, receiverLoc, reason))
+ }
+
def createProxy(node: Node, sym: Symbol): Proxy = {
val p = new Proxy(node, sym, this)
proxies += Pair((node, sym), p)
@@ -97,17 +121,49 @@ class NetKernel(service: Service) {
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
+ case cmd@RemoteExit(senderLoc, receiverLoc, reason) =>
+ Debug.info(this+": processing "+cmd)
+ actors.get(receiverLoc.name) match {
+ case Some(a) =>
+ val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
+ senderProxy.send(LocalExit(a.asInstanceOf[Actor], reason), null)
+
+ case None =>
+ // message is lost
+ Debug.info(this+": lost message")
+ }
+
+ case cmd@RemoteUnlinkFrom(senderLoc, receiverLoc) =>
+ Debug.info(this+": processing "+cmd)
+ actors.get(receiverLoc.name) match {
+ case Some(a) =>
+ val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
+ senderProxy.send(LocalUnlinkFrom(a.asInstanceOf[AbstractActor]), null)
+
+ case None =>
+ // message is lost
+ Debug.info(this+": lost message")
+ }
+
+ case cmd@RemoteLinkTo(senderLoc, receiverLoc) =>
+ Debug.info(this+": processing "+cmd)
+ actors.get(receiverLoc.name) match {
+ case Some(a) =>
+ val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
+ senderProxy.send(LocalLinkTo(a.asInstanceOf[AbstractActor]), null)
+
+ case None =>
+ // message is lost
+ Debug.info(this+": lost message")
+ }
+
case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
Debug.info(this+": processing "+cmd)
actors.get(receiverLoc.name) match {
case Some(a) =>
try {
- Debug.info(this+": receiver is "+a)
val msg = service.serializer.deserialize(data)
- Debug.info(this+": deserialized msg is "+msg)
-
val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- Debug.info(this+": created "+senderProxy)
senderProxy.send(SendTo(a, msg, session), null)
} catch {
case e: Exception =>
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
index d752d00139..c490aa9bb5 100644
--- a/src/actors/scala/actors/remote/Proxy.scala
+++ b/src/actors/scala/actors/remote/Proxy.scala
@@ -17,7 +17,7 @@ import scala.collection.mutable.HashMap
* @author Philipp Haller
*/
@serializable
-class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends OutputChannel[Any] {
+class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor {
import java.io.{IOException, ObjectOutputStream, ObjectInputStream}
@transient
@@ -26,20 +26,18 @@ class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream) {
- Debug.info("Serializing "+this)
out.defaultWriteObject()
}
@throws(classOf[ClassNotFoundException]) @throws(classOf[IOException])
private def readObject(in: ObjectInputStream) {
- Debug.info("Deserializing "+this)
in.defaultReadObject()
setupKernel()
startDelegate()
}
private def startDelegate() {
- del = new DelegateActor(node, name, kernel)
+ del = new DelegateActor(this, node, name, kernel)
del.start()
}
@@ -49,41 +47,77 @@ class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends
}
def !(msg: Any): Unit =
- del.send(msg, Actor.self)
+ del ! msg
def send(msg: Any, replyCh: OutputChannel[Any]): Unit =
del.send(msg, replyCh)
def forward(msg: Any): Unit =
- del.send(msg, Actor.sender)
+ del.forward(msg)
def receiver: Actor =
del
+ def !?(msg: Any): Any =
+ del !? msg
+
+ def !?(msec: Long, msg: Any): Option[Any] =
+ del !? (msec, msg)
+
def !!(msg: Any): Future[Any] =
del !! msg
+ def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] =
+ del !! (msg, f)
+
+ def linkTo(to: AbstractActor): Unit =
+ del ! LinkTo(to)
+
+ def unlinkFrom(from: AbstractActor): Unit =
+ del ! UnlinkFrom(from)
+
+ def exit(from: AbstractActor, reason: AnyRef): Unit =
+ del ! Exit(from, reason)
+
override def toString() =
name+"@"+node
}
+case class LinkTo(to: AbstractActor)
+case class UnlinkFrom(from: AbstractActor)
+
/**
* @version 0.9.17
* @author Philipp Haller
*/
-private[remote] class DelegateActor(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
+private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor {
var channelMap = new HashMap[Symbol, OutputChannel[Any]]
var sessionMap = new HashMap[Channel[Any], Symbol]
def act() {
- Debug.info(this+": waiting to process commands")
Actor.loop {
react {
+ case cmd@LinkTo(to) =>
+ kernel.linkTo(node, name, to)
+
+ case cmd@UnlinkFrom(from) =>
+ kernel.unlinkFrom(node, name, from)
+
+ case cmd@Exit(from, reason) =>
+ kernel.exit(node, name, from, reason)
+
+ case cmd@LocalLinkTo(to) =>
+ to.linkTo(creator)
+
+ case cmd@LocalUnlinkFrom(from) =>
+ from.unlinkFrom(creator)
+
+ case cmd@LocalExit(to, reason) =>
+ to.exit(creator, reason)
+
// Request from remote proxy.
// `this` is local proxy.
case cmd@SendTo(out, msg, session) =>
- Debug.info(this+": processing "+cmd)
-
// is this an active session?
channelMap.get(session) match {
case None =>
@@ -107,7 +141,6 @@ private[remote] class DelegateActor(node: Node, name: Symbol, kernel: NetKernel)
}
case cmd@Terminate =>
- Debug.info(this+": processing "+cmd)
exit()
// local proxy receives response to
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index c37fd88432..ee64f1d9a8 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -69,7 +69,6 @@ object RemoteActor {
def createKernelOnPort(port: Int): NetKernel = {
val serv = TcpService(port, cl)
- Debug.info("created service at "+serv.node)
val kern = serv.kernel
val s = Actor.self
kernels += Pair(s, kern)
@@ -97,8 +96,7 @@ object RemoteActor {
def register(name: Symbol, a: Actor): Unit = synchronized {
val kernel = kernels.get(Actor.self) match {
case None =>
- val serv = new TcpService(TcpService.generatePort, cl)
- serv.start()
+ val serv = TcpService(TcpService.generatePort, cl)
kernels += Pair(Actor.self, serv.kernel)
serv.kernel
case Some(k) =>
@@ -120,7 +118,7 @@ object RemoteActor {
* Returns (a proxy for) the actor registered under
* <code>name</code> on <code>node</code>.
*/
- def select(node: Node, sym: Symbol): OutputChannel[Any] = synchronized {
+ def select(node: Node, sym: Symbol): AbstractActor = synchronized {
selfKernel.getOrCreateProxy(node, sym)
}
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index 11af1bbbee..eb6177f7cd 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -36,6 +36,7 @@ object TcpService {
val service = new TcpService(port, cl)
ports += Pair(port, service)
service.start()
+ Debug.info("created service at "+service.node)
service
}