summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-09-25 22:12:11 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-09-25 22:12:11 +0000
commit20aa9911d0ee9ed96b2f12642e8e8505782cbc0b (patch)
tree4076d3778b3f6c48bfaed9932801d8cbcf1ea9c4 /src/actors
parente898539e939143b22c5a20c6c41c520fb6eb531e (diff)
downloadscala-20aa9911d0ee9ed96b2f12642e8e8505782cbc0b.tar.gz
scala-20aa9911d0ee9ed96b2f12642e8e8505782cbc0b.tar.bz2
scala-20aa9911d0ee9ed96b2f12642e8e8505782cbc0b.zip
Fixed build.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/distributed/FreshNameCreator.scala28
-rw-r--r--src/actors/scala/actors/distributed/JavaSerializer.scala17
-rw-r--r--src/actors/scala/actors/distributed/NetKernel.scala608
-rw-r--r--src/actors/scala/actors/distributed/Node.scala16
-rw-r--r--src/actors/scala/actors/distributed/Serializer.scala17
-rw-r--r--src/actors/scala/actors/distributed/Service.scala84
-rw-r--r--src/actors/scala/actors/distributed/TcpService.scala38
-rw-r--r--src/actors/scala/actors/distributed/TcpServiceWorker.scala24
8 files changed, 727 insertions, 105 deletions
diff --git a/src/actors/scala/actors/distributed/FreshNameCreator.scala b/src/actors/scala/actors/distributed/FreshNameCreator.scala
deleted file mode 100644
index ecf6e3f78a..0000000000
--- a/src/actors/scala/actors/distributed/FreshNameCreator.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package nactors.distributed
-
-import scala.collection.mutable.HashMap
-
-object FreshNameCreator {
-
- protected var counter = 0
- protected val counters = new HashMap[String,int]
-
- /**
- * Create a fresh name with the given prefix. It is guaranteed
- * that the returned name has never been returned by a previous
- * call to this function (provided the prefix does not end in a digit).
- */
- def newName(prefix: String): Symbol = {
- val count = counters.get(prefix) match {
- case Some(last) => last + 1
- case None => 0
- }
- counters.update(prefix, count)
- new Symbol(prefix + count)
- }
-
- def newName(): Symbol = {
- counter = counter + 1
- new Symbol("$" + counter + "$")
- }
-}
diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala
index a2ef350cd6..f5d114e762 100644
--- a/src/actors/scala/actors/distributed/JavaSerializer.scala
+++ b/src/actors/scala/actors/distributed/JavaSerializer.scala
@@ -1,16 +1,24 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
-import java.io._
+package scala.actors.distributed
+import java.io._
import scala.io.BytePickle.SPU
[serializable]
class JavaSerializer(serv: Service) extends Serializer(serv) {
val debug = true
- def log(s: String) =
- if (debug) Console.println("JavaSerializer: " + s)
+ def log (s: String) =
+ if (debug) scala.Console.println("JAVASerializer: " + s)
def serialize(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -26,5 +34,6 @@ class JavaSerializer(serv: Service) extends Serializer(serv) {
in.readObject()
}
+ def pid: SPU[RemotePid] = null
def addRep(name: String, repCons: Serializer => AnyRef): Unit = {}
}
diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala
index 93b3b33c24..481ce94833 100644
--- a/src/actors/scala/actors/distributed/NetKernel.scala
+++ b/src/actors/scala/actors/distributed/NetKernel.scala
@@ -1,71 +1,597 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
+
+package scala.actors.distributed
import java.io.{IOException,StringReader,StringWriter}
import java.lang.SecurityException
import java.net.UnknownHostException
+import java.util.logging.{ConsoleHandler,Level,Logger}
+
import scala.collection.mutable.{HashMap,HashSet}
+import scala.actors.multi.{Actor,ExcHandlerDesc}
-case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+case class RA(a: RemoteActor)
+object NetKernel {
+ var kernel: NetKernel = null
+}
+
+/**
+ * @author Philipp Haller
+ */
class NetKernel(service: Service) {
+ NetKernel.kernel = this
+
+ // contains constructors
+ private val ptable =
+ new HashMap[String, () => RemoteActor]
+
+ // maps local ids to scala.actors
+ private val rtable =
+ new HashMap[int, RemoteActor]
+
+ // maps scala.actors to their RemotePid
+ private val pidTable =
+ new HashMap[RemoteActor, RemotePid]
+
+ private var running = true;
+
+ val logLevel = Level.FINE
+ val l = Logger.getLogger("NetKernel")
+ l.setLevel(logLevel)
+ val consHand = new ConsoleHandler
+ consHand.setLevel(logLevel)
+ l.addHandler(consHand)
+
+ //start // start NetKernel
+
+ /** only called if destDesc is local. */
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
+ destDesc.p match {
+ case rpid: RemotePid =>
+ (rtable get rpid.localId) match {
+ case Some(actor) =>
+ actor.handleExc(destDesc, e)
+ case None =>
+ error("exc desc refers to non-registered actor")
+ }
+ }
+
+ def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) =
+ // locality check (handler local to this node?)
+ destDesc.p match {
+ case rpid: RemotePid =>
+ if (rpid.node == this.node)
+ handleExc(destDesc, e)
+ else
+ sendToNode(rpid.node, ForwardExc(destDesc, e))
+ }
def sendToNode(node: Node, msg: AnyRef) = {
- val bytes = service.serializer.serialize(msg)
+ //val sw = new StringWriter
+ val bytes = service.serializer.serialize(msg /*, sw*/)
service.send(node, bytes)
+ //service.send(node, sw.toString())
}
- def namedSend(node: Node, senderName: Symbol, receiver: Symbol, msg: AnyRef): Unit = {
- val bytes = service.serializer.serialize(msg)
- sendToNode(node, NamedSend(senderName, receiver, bytes))
+ def addConstructor(key: String, value: () => RemoteActor) =
+ ptable.update(key, value);
+
+ def node: Node = service.node
+
+ def nodes: List[Node] = service.nodes
+
+ def pidOf(actor: RemoteActor): RemotePid = synchronized {
+ pidTable.get(actor) match {
+ case None => error("malformed pid table in " + this)
+ case Some(pid) => pid
+ }
+ }
+
+ def disconnectNode(n: Node) = synchronized {
+ service.disconnectNode(n)
+ }
+
+ def getLocalRef(locId: Int): RemoteActor =
+ rtable.get(locId) match {
+ case None =>
+ error("" + locId + " is not registered at " + this)
+ case Some(remoteActor: RemoteActor) =>
+ remoteActor
+ }
+
+ def localSend(localId: Int, msg: Any): Unit = synchronized {
+ rtable.get(localId) match {
+ case None =>
+ error("" + localId + " is not registered at " + this)
+ case Some(remoteActor: RemoteActor) =>
+ //Console.println("local send to " + remoteActor)
+ remoteActor send msg
+ }
+ }
+
+ def localSend(pid: RemotePid, msg: Any): Unit =
+ localSend(pid.localId, msg)
+
+ def remoteSend(pid: RemotePid, msg: Any) = synchronized {
+ //Console.println("NetKernel: Remote msg delivery to " + pid)
+ msg match {
+ case m: AnyRef =>
+ service.remoteSend(pid, m)
+ }
}
- def send(node: Node, name: Symbol, msg: AnyRef): Unit = {
- val senderName = names.get(Actor.self) match {
- case None => {
- val freshName = FreshNameCreator.newName("remotesender")
- register(freshName, Actor.self)
- freshName
+ def namedSend(name: Name, msg: AnyRef): Unit =
+ if (name.node == this.node) {
+ // look-up name
+ nameTable.get(name.sym) match {
+ case None =>
+ // message is lost
+ //Console.println("lost message " + msg + " because " + name + " not registered.")
+ case Some(localId) => localSend(localId, msg)
}
- case Some(name) => name
}
- namedSend(node, senderName, name, msg)
+ else {
+ // remote send
+
+ // serialize msg
+ ///val sw = new StringWriter
+ val bytes = service.serializer.serialize(msg/*, sw*/)
+
+ sendToNode(name.node, NamedSend(name.sym, bytes))
+ //sendToNode(name.node, NamedSend(name.sym, sw.toString()))
+ }
+
+ val nameTable = new HashMap[Symbol, Int]
+
+ def registerName(name: Symbol, pid: RemotePid): Unit = synchronized {
+ nameTable += name -> pid.localId
+ }
+
+ def registerName(name: Symbol, a: RemoteActor): Unit = synchronized {
+ val pid = register(a)
+ registerName(name, pid)
+ a.start
}
- def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
+ /*override def run: unit = receive {
+ case ForwardExc(destDesc, e) =>
+ // TODO
+ case Spawn(reply: RemotePid, pname) =>
+ val newPid = spawn(pname)
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+ run
+
+ case SpawnObject(reply: RemotePid, data: Array[byte]) =>
+ //val sr = new StringReader(data)
+ //service.serializer.deserialize(sr) match {
+
+ service.serializer.deserialize(data) match {
+ case RA(actor) =>
+ val newPid = register(actor)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ actor.start
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+ }
+ run
+
+ case NamedSend(sym: Symbol, data) =>
+ // look-up name
+ nameTable.get(sym) match {
+ case None =>
+ // message is lost
+ Console.println("lost message " + data + " because " + sym + " not registered.")
+ case Some(localId) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+
+ val msg = service.serializer.deserialize(data)
+ localSend(localId, msg)
+ }
+ run
+
+ case Send(pid: RemotePid, data) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+ val msg = service.serializer.deserialize(data)
+
+ Console.println("locally send " + msg + " to " + pid)
+ localSend(pid, msg)
+ run
+
+ case Link(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ linkFromLocal(from, to)
+ run
+
+ case UnLink(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ unlinkFromLocal(from, to)
+ run
+
+ case Exit1(from:RemotePid, to:RemotePid, reason) =>
+ // check if "to" traps exit signals
+ // if so send a message
+ if (trapExits.contains(to.localId))
+ // convert signal into message
+ localSend(to, Exit1(from, to, reason))
+ else
+ if (reason.name.equals("normal")) {
+ // ignore signal
+ }
+ else
+ exit(from, to, reason)
+ run
+ }*/
+
+ // TODO
+ /*def isReachable(remoteNode: Node): boolean = {
+ val pingMsg = new Ping(node)
+ val sw = new StringWriter
+ service.serializer.serialize(pingMsg, sw)
+ service.send(remoteNode, sw.toString())
+ }*/
+
+ def processMsg(msg: AnyRef): Unit = synchronized {
msg match {
- case NamedSend(senderName, receiver, data) =>
- actors.get(receiver) match {
- case Some(a) => {
- Debug.info("processing message from " + senderName + " on " + senderNode)
+ case Spawn(reply: RemotePid, pname) =>
+ val newPid = spawn(pname)
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+
+ case SpawnObject(reply: RemotePid, data) =>
+ //val sr = new StringReader(data)
+ //service.serializer.deserialize(sr) match {
+
+ service.serializer.deserialize(data) match {
+ case RA(actor) =>
+ val newPid = register(actor)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ actor.start
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+ }
+
+ case NamedSend(sym: Symbol, data) =>
+ // look-up name
+ nameTable.get(sym) match {
+ case None =>
+ // message is lost
+ //Console.println("lost message " + msg + " because " + sym + " not registered.")
+ case Some(localId) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
val msg = service.serializer.deserialize(data)
- val senderProxy = new Reactor {
- override def run() = { a ! msg }
- override def !(msg: Any): Unit = {
- msg match {
- case refmsg: AnyRef => {
- Debug.info("sending " + msg + " to " + senderName + " on " + senderNode)
- namedSend(senderNode, receiver, senderName, refmsg)
- }
- }
- }
- override def !?(msg: Any): Any =
- error("!? not implemented for remote actors.")
- }
- senderProxy.start()
+ localSend(localId, msg)
+ }
+
+ case Send(pid: RemotePid, data) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+ val msg = service.serializer.deserialize(data)
+ //Console.println("locally send " + msg + " to " + pid)
+ localSend(pid, msg)
+
+ case Link(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ linkFromLocal(from, to)
+
+ case UnLink(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ unlinkFromLocal(from, to)
+
+ case Exit1(from:RemotePid, to:RemotePid, reason) =>
+ // check if "to" traps exit signals
+ // if so send a message
+ if (trapExits.contains(to.localId)) {
+ // convert signal into message
+ // TODO: simpler message (w/o to) for actor!
+ localSend(to, Exit1(from, to, reason))
+ }
+ else {
+ if (reason.name.equals("normal")) {
+ // ignore signal
}
- case None => // message is lost
- Console.println("" + receiver + " not registered in NetKernel.")
+ else
+ exit(from, to, reason)
}
}
}
- private val actors = new HashMap[Symbol, Actor]
- private val names = new HashMap[Actor, Symbol]
+ /* Registers an instance of a remote actor inside this NetKernel.
+ */
+ def register(newProc: RemoteActor): RemotePid = synchronized {
+ newProc.kernel = this
+ val newPid = service.createPid(newProc)
+ rtable += newPid.localId -> newProc
+ pidTable += newProc -> newPid
+ newPid
+ }
+
+ // local spawn
+ def spawn(pname: String): RemotePid = synchronized {
+ // get constructor out of table
+ (ptable.get(pname)) match {
+ case None =>
+ //error("No constructor found. Cannot start process.")
+ //null
+
+ val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor]
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ newProc.start
+ newPid
- private[nactors] def register(name: Symbol, a: Actor): Unit = synchronized {
- Debug.info("registering " + a + " as " + name)
- actors += name -> a
- names += a -> name
+ case Some(cons) =>
+ val newProc = cons()
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ newProc.start
+ newPid
+ }
+ }
+
+ // local spawn
+ def spawn(name: String, arg: RemotePid): RemotePid = synchronized {
+ val newPid = spawn(name)
+ localSend(newPid, arg)
+ newPid
+ }
+
+ // assume this.node != node
+ def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): Unit = {
+ val ra = RA(a)
+ //val rsw = new StringWriter
+ //service.serializer.serialize(ra, rsw)
+ val bytes = service.serializer.serialize(ra)
+ //sendToNode(node, SpawnObject(replyTo, rsw.toString()))
+ sendToNode(node, SpawnObject(replyTo, bytes))
+ }
+
+
+ def registerSerializer(index: String, rep: Serializer => AnyRef) = {
+ service.serializer.addRep(index, rep)
+
+ // send registering requests to remote nodes
+ }
+
+ // remote spawn
+ def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized {
+ // check if actor is to be spawned locally
+ if (node == this.node) {
+ val newPid = spawn(name)
+ newPid
+ }
+ else {
+ sendToNode(node, Spawn(replyTo, name))
+ null // pid needs to be sent back
+ }
+ }
+
+ /* Spawns a new actor (locally), executing "fun".
+ */
+ def spawn(fun: RemoteActor => Unit): RemotePid = synchronized {
+ val newProc = new RemoteActor {
+ override def run: unit =
+ fun(this);
+ }
+
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ newProc.start
+ newPid
}
+
+ /* Spawns a new actor (locally), executing "fun".
+ */
+ def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized {
+ val newProc = new RemoteActor {
+ override def run: unit =
+ fun(this);
+ }
+
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+
+ // link new process to pid (assume pid is local)
+ link(pid, newPid)
+
+ newProc.start
+ newPid
+ }
+
+ // maps local ids to their linked pids
+ private val links = new HashMap[int,HashSet[RemotePid]];
+
+ // which of the local processes traps exit signals?
+ private val trapExits = new HashSet[int];
+
+ def processFlag(pid: RemotePid, flag: Symbol, set: Boolean) = synchronized {
+ if (flag.name.equals("trapExit")) {
+ if (trapExits.contains(pid.localId) && !set)
+ trapExits -= pid.localId
+ else if (!trapExits.contains(pid.localId) && set)
+ trapExits += pid.localId
+ }
+ }
+
+ // assume from.node == this.node
+ private def unlinkFromLocal(from: RemotePid, to: RemotePid): Unit =
+ links.get(from.localId) match {
+ case None =>
+ // has no links -> ignore
+ case Some(set) =>
+ set -= to
+ if (set.size == 0) links -= from.localId
+ };
+
+ /*
+ unlinks bi-directional link
+ assume from.node == this.node
+ */
+ def unlink(from: RemotePid, to: RemotePid): Unit = synchronized {
+ unlinkFromLocal(from, to)
+ if (to.node == this.node)
+ unlinkFromLocal(to, from)
+ else
+ // (2) send message to NetKernel of "to" to unlink a
+ // uni-directional link from "to" to "from"
+ sendToNode(to.node, UnLink(to, from))
+ }
+
+ // assume from.node == this.node
+ private def linkFromLocal(from: RemotePid, to: RemotePid): Unit =
+ // TODO: send Exit to from if to is invalid
+ links.get(from.localId) match {
+ case None =>
+ // from has no links, yet
+ val linksTo = new HashSet[RemotePid]
+ linksTo += to
+ links += from.localId -> linksTo
+ case Some(set) =>
+ set += to
+ };
+
+ /*
+ creates bi-directional link
+ assume from.node == this.node
+ */
+ def link(from: RemotePid, to: RemotePid): unit = synchronized {
+ // (1) create locally a uni-directional link
+ linkFromLocal(from, to)
+ if (to.node == this.node)
+ linkFromLocal(to, from)
+ else
+ // (2) send message to NetKernel of "to" to create a
+ // uni-directional link from "to" to "from"
+ sendToNode(to.node, Link(to, from))
+ }
+
+ // Assume "to" is local.
+ def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = {
+ // remove link
+ unlinkFromLocal(to, from)
+ exit(to, reason)
+ }
+
+ val exitMarks = new HashSet[RemotePid]
+
+ /*
+ If reason is unequal to 'normal then
+ this will cause all linked processes to
+ (transitively) terminate abnormally.
+
+ Assume pid is local.
+ */
+ def exit(pid: RemotePid, reason: Symbol): Unit = synchronized {
+ if (!(exitMarks contains pid)) {
+ exitMarks += pid // mark pid as exiting
+ //Console.println("" + pid + " is exiting (" + reason + ").")
+
+ // first look-up remote actor in rtable
+ val actor = rtable(pid.localId)
+ // remove from table of running processes
+ rtable -= pid.localId
+ // remove from pid table
+ pidTable -= actor
+
+ // send exit signals to linked processes
+ links.get(pid.localId) match {
+ case None =>
+ //Console.println("no linked processes")
+
+ case Some(set) => // set of remote pids that we want to terminate
+ //Console.println("sending exit signals to linked processes")
+
+ val iter = set.elements
+ while (iter.hasNext) {
+ val linkedPid = iter.next
+
+ unlinkFromLocal(pid, linkedPid)
+
+ if (linkedPid.node == this.node) {
+ unlinkFromLocal(linkedPid, pid)
+
+ if (trapExits.contains(linkedPid.localId))
+ localSend(linkedPid, Exit1(pid, linkedPid, reason))
+ else if (!reason.name.equals("normal"))
+ exit(linkedPid, reason)
+ }
+ else
+ sendToNode(linkedPid.node,
+ Exit1(pid, linkedPid, reason))
+ }
+ exitMarks -= pid
+ }
+ }
+ }
+
+ private val monNodes =
+ new HashMap[Node,HashMap[RemotePid,Int]]
+
+ def monitorNode(client: RemotePid, mnode: Node, cond: Boolean) = synchronized {
+ monNodes.get(mnode) match {
+ case None =>
+ // nobody is monitoring this node
+ if (cond) {
+ val map = new HashMap[RemotePid,int]
+ map += client -> 1
+ monNodes += mnode -> map
+ }
+ case Some(map) =>
+ map.update(client, map(client) + (if (cond) 1 else -1))
+ }
+
+ // if no connection exists:
+ // try connecting, if it fails deliver nodedown msg
+ if (cond && !service.isConnected(mnode)) {
+ try {
+ service.connect(mnode)
+ }
+ catch {
+ case uhe: UnknownHostException =>
+ nodeDown(mnode)
+ case ioe: IOException =>
+ nodeDown(mnode)
+ case se: SecurityException =>
+ nodeDown(mnode)
+ }
+ }
+ }
+
+ def nodeDown(mnode: Node) =
+ // send NodeDown msg to registered RemotePids
+ monNodes.get(mnode) match {
+ case None =>
+ // nobody is monitoring this node
+ case Some(map) =>
+ // iterate over keys (RemotePids of interested clients)
+ val iter = map.keys
+ while (iter.hasNext) {
+ val client = iter.next
+ for (val i <- List.range(0, map(client))) {
+ // send nodedown msg
+ client ! Pair(NodeDown(), mnode)
+ }
+ }
+ }
+
}
diff --git a/src/actors/scala/actors/distributed/Node.scala b/src/actors/scala/actors/distributed/Node.scala
index d70e9779d4..1f28e65767 100644
--- a/src/actors/scala/actors/distributed/Node.scala
+++ b/src/actors/scala/actors/distributed/Node.scala
@@ -1,7 +1,15 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
-[serializable] abstract class Node;
+package scala.actors.distributed
-[serializable] case class TcpNode(address: String, port: Int) extends Node;
-[serializable] case class JxtaNode(group: String) extends Node;
+[serializable] abstract class Node;
+[serializable] case class TcpNode(address: String, port: Int) extends Node
+[serializable] case class JXTANode(name: String) extends Node
diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala
index 527aad83dd..d934168b5b 100644
--- a/src/actors/scala/actors/distributed/Serializer.scala
+++ b/src/actors/scala/actors/distributed/Serializer.scala
@@ -1,10 +1,19 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
+
+package scala.actors.distributed
import java.io.{DataInputStream,DataOutputStream,EOFException}
import scala.io.BytePickle.SPU
-abstract class Serializer(val service: Service) {
+abstract class Serializer(s: Service) {
def serialize(o: AnyRef/*, w: Writer*/): Array[byte]
def deserialize(a: Array[byte]/*r: Reader*/): AnyRef
@@ -42,4 +51,8 @@ abstract class Serializer(val service: Service) {
val bytes = serialize(obj)
writeBytes(outputStream, bytes)
}
+
+ def pid: SPU[RemotePid]
+ def service = s
+ def addRep(name: String, repCons: Serializer => AnyRef): unit
}
diff --git a/src/actors/scala/actors/distributed/Service.scala b/src/actors/scala/actors/distributed/Service.scala
index e404d3a73d..f5164751b3 100644
--- a/src/actors/scala/actors/distributed/Service.scala
+++ b/src/actors/scala/actors/distributed/Service.scala
@@ -1,23 +1,83 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
+
+package scala.actors.distributed
import java.io.StringWriter
+/**
+ * @author Philipp Haller
+ */
trait Service {
- private val kern = new NetKernel(this)
- def kernel = kern
-
val serializer: Serializer
def node: Node
-
+ def createPid(actor: RemoteActor): RemotePid
def send(node: Node, data: Array[byte]): Unit
-
-/*
- def connect(node: Node): Unit // non-blocking
+ def connect(node: Node): Unit // non blocking.
def disconnectNode(node: Node): Unit
def isConnected(node: Node): Boolean
- def isReachable(node: Node): Boolean // blocking
- def getRoundTripTimeMillis(node: Node): Long // blocking
- def nodes: List[Node]
-*/
+
+ //blocking. timeout depends on Implementation.
+ def isReachable(node: Node): Boolean
+
+ def getRoundTripTimeMillis(node: Node): Long //blocking
+
+ def nodes:List[Node]
+
+// implemented parts:
+
+ private val kern = new NetKernel(this)
+ def kernel = kern
+
+ def spawn(name: String): RemotePid =
+ kern spawn name
+
+ def spawn(name: String, arg: RemotePid): RemotePid =
+ kern.spawn(name, arg)
+
+
+ //suggested addition by seb
+ def spawn(fun: RemoteActor => unit): RemotePid =
+ kernel.spawn(fun);
+ def spawn(a:RemoteActor):RemotePid = {
+ //Console.println("Service:spawn(RemoteActor)")
+ val pid = kernel.register(a)
+ //Console.println("RemoteActor("+a+") registered in kernel")
+ a.start
+ //Console.println("RemoteActor("+a+") started")
+ pid
+ }
+
+ def send(pid: RemotePid, msg: AnyRef): unit = synchronized {
+ if (pid.node == this.node)
+ kernel.localSend(pid, msg)
+ else
+ kernel.remoteSend(pid, msg)
+ }
+
+ def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized {
+ //Console.println("Service: Sending " + msg + " to " + pid)
+ // lets try to serialize the message
+ //val sw = new StringWriter
+ //serializer.serialize(msg, sw)
+ val bytes = serializer.serialize(msg)
+ //val sendMsg = Send(pid, sw.toString())
+ val sendMsg = Send(pid, bytes)
+ //val sw2 = new StringWriter
+ //serializer.serialize(sendMsg, sw2)
+ //send(pid.node, sw2.toString())
+ val bytes2 = serializer.serialize(sendMsg)
+ send(pid.node, bytes2)
+ }
+
+ private var idCnt = 0
+ def makeUid = { idCnt = idCnt + 1; idCnt }
+
}
diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala
index f1ea43ad27..a1efa01d9b 100644
--- a/src/actors/scala/actors/distributed/TcpService.scala
+++ b/src/actors/scala/actors/distributed/TcpService.scala
@@ -1,9 +1,21 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
+
+package scala.actors.distributed
import java.io.IOException
import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException}
+/**
+ * @author Philipp Haller
+ */
object TcpService {
val random = new java.util.Random(System.currentTimeMillis())
@@ -26,12 +38,26 @@ object TcpService {
}
}
+object TestPorts {
+ def main(args: Array[String]): Unit = {
+ val random = new java.util.Random(System.currentTimeMillis())
+ val socket = new ServerSocket(8000 + random.nextInt(500))
+ Console.println(TcpService.generatePort)
+ }
+}
+
+/**
+ * @author Philipp Haller
+ */
class TcpService(port: Int) extends Thread with Service {
val serializer: JavaSerializer = new JavaSerializer(this)
- private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port)
+ private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port);
def node: TcpNode = internalNode
+ def createPid(actor: RemoteActor): RemotePid =
+ new TcpPid(internalNode, makeUid, kernel, actor)
+
def send(node: Node, data: String): unit = synchronized {
// retrieve worker thread (if any) that already has connection
node match {
@@ -40,7 +66,7 @@ class TcpService(port: Int) extends Thread with Service {
case None =>
// we are not connected, yet
Console.println("We are not connected, yet.");
- val newWorker = connect(tnode); // bad in a sync block
+ val newWorker = connect(tnode); //bad in a sync BLOCK!!!
newWorker transmit data
case Some(worker) => worker transmit data
}
@@ -94,7 +120,7 @@ class TcpService(port: Int) extends Thread with Service {
// connection management
private val connections =
- new scala.collection.mutable.HashMap[TcpNode, TcpServiceWorker]
+ new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker]
def nodes: List[Node] =
throw new Exception ("nodes need to be implemented in TcpService!")
@@ -129,7 +155,7 @@ class TcpService(port: Int) extends Thread with Service {
Console.println("Connected.")
// spawn new worker thread
val worker = new TcpServiceWorker(this, sock)
- worker.sendNode(n);
+ worker.sendNode;
// start worker thread
worker.start()
// register locally (we want to reuse connections which correspond to connected sockets)
@@ -174,7 +200,7 @@ class TcpService(port: Int) extends Thread with Service {
def getRoundTripTimeMillis(node: Node): Long = 0
def nodeDown(mnode: TcpNode): Unit = synchronized {
- //kernel nodeDown mnode
+ kernel nodeDown mnode
connections -= mnode
}
diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
index 36b0cfe14f..2587319fdb 100644
--- a/src/actors/scala/actors/distributed/TcpServiceWorker.scala
+++ b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
@@ -1,9 +1,21 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
-package nactors.distributed
+// $Id$
+
+package scala.actors.distributed
import java.io._
import java.net.Socket
+/**
+ * @author Philipp Haller
+ */
class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val in = so.getInputStream()
val out = so.getOutputStream()
@@ -17,12 +29,10 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val log = new Debug("TcpServiceWorker")
log.level = 2
-/*
def transmit(msg: Send): Unit = synchronized {
val data = parent.serializer.serialize(msg)
transmit(data)
}
-*/
def transmit(data: String): Unit = synchronized {
log.info("Transmitting " + data)
@@ -37,8 +47,7 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
dataout.flush()
}
- def sendNode(n: TcpNode) = {
- connectedNode = n
+ def sendNode = {
scala.Console.println("Sending our name " + parent.node)
parent.serializer.writeObject(dataout, parent.node)
}
@@ -60,7 +69,6 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
}
var running = true
-
def halt = synchronized {
so.close() // close socket
running = false // stop
@@ -73,8 +81,8 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
log.info("deserializing...");
//val msg = parent.serializer.deserialize(reader);
val msg = parent.serializer.readObject(datain);
- log.info("Received object: " + msg + " from " + connectedNode);
- parent.kernel.processMsg(connectedNode, msg)
+ log.info("Received object: " + msg);
+ parent.kernel.processMsg(msg)
}
}
}