summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-06-12 17:27:11 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-06-12 17:27:11 +0000
commit3aad376baf8297f8b1f7d9d78e59c0707f9fd0f7 (patch)
tree2eb0677d4c71f0d255412b3b9de029a7c3544962
parentf587ec7c8f616d80f1a115fdb21a5b7f7da8ebf7 (diff)
downloadscala-3aad376baf8297f8b1f7d9d78e59c0707f9fd0f7.tar.gz
scala-3aad376baf8297f8b1f7d9d78e59c0707f9fd0f7.tar.bz2
scala-3aad376baf8297f8b1f7d9d78e59c0707f9fd0f7.zip
Implemented synchronous message send for remote...
Implemented synchronous message send for remote actors.
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala89
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala34
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala10
3 files changed, 103 insertions, 30 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index e9cee3cc6b..0862f148df 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -13,6 +13,8 @@ package scala.actors.remote
import scala.collection.mutable.{HashMap, HashSet}
case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+case class SyncSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+case class Reply(senderName: Symbol, receiver: Symbol, data: Array[Byte])
/**
* @author Philipp Haller
@@ -24,23 +26,50 @@ class NetKernel(service: Service) {
service.send(node, bytes)
}
- def namedSend(node: Node, senderName: Symbol, receiver: Symbol, msg: AnyRef) {
+ def namedSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
val bytes = service.serializer.serialize(msg)
- sendToNode(node, NamedSend(senderName, receiver, bytes))
+ sendToNode(node, NamedSend(sender, receiver, bytes))
+ }
+
+ def namedSyncSend(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
+ val bytes = service.serializer.serialize(msg)
+ val toSend = SyncSend(sender, receiver, bytes)
+ sendToNode(node, toSend)
+ }
+
+ def sendReply(node: Node, sender: Symbol, receiver: Symbol, msg: AnyRef) {
+ val bytes = service.serializer.serialize(msg)
+ val toSend = Reply(sender, receiver, bytes)
+ sendToNode(node, toSend)
+ }
+
+ private val actors = new HashMap[Symbol, Actor]
+ private val names = new HashMap[Actor, Symbol]
+
+ def register(name: Symbol, a: Actor): Unit = synchronized {
+ actors += name -> a
+ names += a -> name
+ }
+
+ def selfName = names.get(Actor.self) match {
+ case None =>
+ val freshName = FreshNameCreator.newName("remotesender")
+ register(freshName, Actor.self)
+ freshName
+ case Some(name) =>
+ name
}
def send(node: Node, name: Symbol, msg: AnyRef) {
- val senderName = names.get(Actor.self) match {
- case None =>
- val freshName = FreshNameCreator.newName("remotesender")
- register(freshName, Actor.self)
- freshName
- case Some(name) =>
- name
- }
+ val senderName = selfName
namedSend(node, senderName, name, msg)
}
+ def syncSend(node: Node, name: Symbol, msg: AnyRef) {
+ val senderName = selfName
+ namedSyncSend(node, senderName, name, msg)
+ }
+
def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
case NamedSend(senderName, receiver, data) =>
@@ -52,6 +81,7 @@ class NetKernel(service: Service) {
override def !(msg: Any) {
msg match {
case refmsg: AnyRef =>
+ // node, senderName, receiver, msg
namedSend(senderNode, receiver, senderName, refmsg)
}
}
@@ -60,14 +90,37 @@ class NetKernel(service: Service) {
case None =>
// message is lost
}
+ case SyncSend(senderName, receiver, data) =>
+ actors.get(receiver) match {
+ case Some(a) =>
+ val msg = service.serializer.deserialize(data)
+ val senderProxy = new Actor {
+ def act() = {
+ val res = a !? msg
+ res match {
+ case refmsg: AnyRef =>
+ // node, senderName, receiver, msg
+ sendReply(senderNode, receiver, senderName, refmsg)
+ }
+ }
+ }
+ senderProxy.start(); {}
+ case None =>
+ // message is lost
+ }
+ case Reply(senderName, receiver, data) =>
+ actors.get(receiver) match {
+ case Some(a) =>
+ val msg = service.serializer.deserialize(data)
+ val senderProxy = new Actor {
+ def act() = {
+ a.getReplyChannel ! msg
+ }
+ }
+ senderProxy.start(); {}
+ case None =>
+ // message is lost
+ }
}
}
-
- private val actors = new HashMap[Symbol, Actor]
- private val names = new HashMap[Actor, Symbol]
-
- /*private[actors]*/ def register(name: Symbol, a: Actor): Unit = synchronized {
- actors += name -> a
- names += a -> name
- }
}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 19d85df491..14dc7b1a91 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -71,6 +71,18 @@ object RemoteActor {
kernel.register(name, a)
}
+ private def selfKernel = kernels.get(Actor.self) match {
+ case None =>
+ // establish remotely accessible
+ // return path (sender)
+ val serv = new TcpService(TcpService.generatePort)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ serv.kernel
+ case Some(k) =>
+ k
+ }
+
/**
* Returns (a proxy for) the actor registered under
* <code>name</code> on <code>node</code>.
@@ -80,19 +92,17 @@ object RemoteActor {
def act() {}
override def !(msg: Any): Unit = msg match {
case a: AnyRef =>
- // establish remotely accessible
- // return path (sender)
- val kernel = kernels.get(Actor.self) match {
- case None =>
- val serv = new TcpService(TcpService.generatePort)
- serv.start()
- kernels += Actor.self -> serv.kernel
- serv.kernel
- case Some(k) =>
- k
+ selfKernel.send(node, sym, a)
+ case other =>
+ error("Cannot send non-AnyRef value remotely.")
+ }
+ override def !?(msg: Any): Any = msg match {
+ case a: AnyRef =>
+ val replyChannel = Actor.self.freshReply()
+ selfKernel.syncSend(node, sym, a)
+ replyChannel.receive {
+ case x => x
}
- kernel.send(node, sym, a)
-
case other =>
error("Cannot send non-AnyRef value remotely.")
}
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index ba1ee9202a..616af48c1a 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -23,6 +23,11 @@ import compat.Platform
import scala.collection.mutable.HashMap
+/* Object TcpService.
+ *
+ * @version 0.9.8
+ * @author Philipp Haller
+ */
object TcpService {
val random = new java.util.Random(Platform.currentTime)
@@ -46,6 +51,11 @@ object TcpService {
}
}
+/* Class TcpService.
+ *
+ * @version 0.9.8
+ * @author Philipp Haller
+ */
class TcpService(port: Int) extends Thread with Service {
val serializer: JavaSerializer = new JavaSerializer(this)