summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-08-01 15:51:00 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-08-01 15:51:00 +0000
commit8954759d5001df331452872bacbcc7b932639eed (patch)
tree7e52a93f04a8ef7f4ef4fd3163a09064195ad712
parent2389f12ce64f8fd54aedaf94ddd9586bcb1fe377 (diff)
downloadscala-8954759d5001df331452872bacbcc7b932639eed.tar.gz
scala-8954759d5001df331452872bacbcc7b932639eed.tar.bz2
scala-8954759d5001df331452872bacbcc7b932639eed.zip
Lambda: The Ultimate Distributive
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala53
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala32
2 files changed, 18 insertions, 67 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index bb10d4c376..834de3026f 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -14,14 +14,11 @@ import scala.collection.mutable.{HashMap, HashSet}
import scala.actors.Actor.loop
case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)
-case class RemoteLinkTo(senderLoc: Locator, receiverLoc: Locator)
-case class RemoteUnlinkFrom(senderLoc: Locator, receiverLoc: Locator)
-case class RemoteExit(senderLoc: Locator, receiverLoc: Locator, reason: AnyRef)
+
+case class RemoteApply0(senderLoc: Locator, receiverLoc: Locator, rfun: Function2[AbstractActor, Proxy, Unit])
+case class LocalApply0(rfun: Function2[AbstractActor, Proxy, Unit], a: AbstractActor)
case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
-case class LocalLinkTo(a: AbstractActor)
-case class LocalUnlinkFrom(a: AbstractActor)
-case class LocalExit(from: AbstractActor, reason: AnyRef)
case object Terminate
case class Locator(node: Node, name: Symbol)
@@ -75,22 +72,10 @@ class NetKernel(service: Service) {
namedSend(senderLoc, receiverLoc, msg, session)
}
- def linkTo(node: Node, name: Symbol, from: AbstractActor) {
- val senderLoc = Locator(service.node, getOrCreateName(from))
- val receiverLoc = Locator(node, name)
- sendToNode(receiverLoc.node, RemoteLinkTo(senderLoc, receiverLoc))
- }
-
- def unlinkFrom(node: Node, name: Symbol, from: AbstractActor) {
+ def remoteApply(node: Node, name: Symbol, from: OutputChannel[Any], rfun: Function2[AbstractActor, Proxy, Unit]) {
val senderLoc = Locator(service.node, getOrCreateName(from))
val receiverLoc = Locator(node, name)
- sendToNode(receiverLoc.node, RemoteUnlinkFrom(senderLoc, receiverLoc))
- }
-
- def exit(node: Node, name: Symbol, from: AbstractActor, reason: AnyRef) {
- val senderLoc = Locator(service.node, getOrCreateName(from))
- val receiverLoc = Locator(node, name)
- sendToNode(receiverLoc.node, RemoteExit(senderLoc, receiverLoc, reason))
+ sendToNode(receiverLoc.node, RemoteApply0(senderLoc, receiverLoc, rfun))
}
def createProxy(node: Node, sym: Symbol): Proxy = {
@@ -121,36 +106,12 @@ class NetKernel(service: Service) {
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
- case cmd@RemoteExit(senderLoc, receiverLoc, reason) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiverLoc.name) match {
- case Some(a) =>
- val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(LocalExit(a.asInstanceOf[Actor], reason), null)
-
- case None =>
- // message is lost
- Debug.info(this+": lost message")
- }
-
- case cmd@RemoteUnlinkFrom(senderLoc, receiverLoc) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiverLoc.name) match {
- case Some(a) =>
- val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(LocalUnlinkFrom(a.asInstanceOf[AbstractActor]), null)
-
- case None =>
- // message is lost
- Debug.info(this+": lost message")
- }
-
- case cmd@RemoteLinkTo(senderLoc, receiverLoc) =>
+ case cmd@RemoteApply0(senderLoc, receiverLoc, rfun) =>
Debug.info(this+": processing "+cmd)
actors.get(receiverLoc.name) match {
case Some(a) =>
val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(LocalLinkTo(a.asInstanceOf[AbstractActor]), null)
+ senderProxy.send(LocalApply0(rfun, a.asInstanceOf[AbstractActor]), null)
case None =>
// message is lost
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
index c490aa9bb5..5f3bbda723 100644
--- a/src/actors/scala/actors/remote/Proxy.scala
+++ b/src/actors/scala/actors/remote/Proxy.scala
@@ -71,20 +71,22 @@ class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends
del !! (msg, f)
def linkTo(to: AbstractActor): Unit =
- del ! LinkTo(to)
+ del ! Apply0((target: AbstractActor, creator: Proxy) =>
+ target.linkTo(creator))
def unlinkFrom(from: AbstractActor): Unit =
- del ! UnlinkFrom(from)
+ del ! Apply0((target: AbstractActor, creator: Proxy) =>
+ target.unlinkFrom(creator))
def exit(from: AbstractActor, reason: AnyRef): Unit =
- del ! Exit(from, reason)
+ del ! Apply0((target: AbstractActor, creator: Proxy) =>
+ target.exit(creator, reason))
override def toString() =
name+"@"+node
}
-case class LinkTo(to: AbstractActor)
-case class UnlinkFrom(from: AbstractActor)
+case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit])
/**
* @version 0.9.17
@@ -97,23 +99,11 @@ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, ke
def act() {
Actor.loop {
react {
- case cmd@LinkTo(to) =>
- kernel.linkTo(node, name, to)
+ case cmd@Apply0(rfun) =>
+ kernel.remoteApply(node, name, sender, rfun)
- case cmd@UnlinkFrom(from) =>
- kernel.unlinkFrom(node, name, from)
-
- case cmd@Exit(from, reason) =>
- kernel.exit(node, name, from, reason)
-
- case cmd@LocalLinkTo(to) =>
- to.linkTo(creator)
-
- case cmd@LocalUnlinkFrom(from) =>
- from.unlinkFrom(creator)
-
- case cmd@LocalExit(to, reason) =>
- to.exit(creator, reason)
+ case cmd@LocalApply0(rfun, target) =>
+ rfun(target, creator)
// Request from remote proxy.
// `this` is local proxy.