summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-09-25 16:23:32 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-09-25 16:23:32 +0000
commite898539e939143b22c5a20c6c41c520fb6eb531e (patch)
tree467df87b0db2e9b570d89c8a6d39533c4d6ad301 /src/actors
parentce2affc166c29d34616bcfef53c8aaa9a95749a9 (diff)
downloadscala-e898539e939143b22c5a20c6c41c520fb6eb531e.tar.gz
scala-e898539e939143b22c5a20c6c41c520fb6eb531e.tar.bz2
scala-e898539e939143b22c5a20c6c41c520fb6eb531e.zip
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/distributed/FreshNameCreator.scala28
-rw-r--r--src/actors/scala/actors/distributed/JavaSerializer.scala16
-rw-r--r--src/actors/scala/actors/distributed/NetKernel.scala608
-rw-r--r--src/actors/scala/actors/distributed/Node.scala15
-rw-r--r--src/actors/scala/actors/distributed/Serializer.scala20
-rw-r--r--src/actors/scala/actors/distributed/Service.scala84
-rw-r--r--src/actors/scala/actors/distributed/TcpService.scala38
-rw-r--r--src/actors/scala/actors/distributed/TcpServiceWorker.scala24
8 files changed, 103 insertions, 730 deletions
diff --git a/src/actors/scala/actors/distributed/FreshNameCreator.scala b/src/actors/scala/actors/distributed/FreshNameCreator.scala
new file mode 100644
index 0000000000..ecf6e3f78a
--- /dev/null
+++ b/src/actors/scala/actors/distributed/FreshNameCreator.scala
@@ -0,0 +1,28 @@
+package nactors.distributed
+
+import scala.collection.mutable.HashMap
+
+object FreshNameCreator {
+
+ protected var counter = 0
+ protected val counters = new HashMap[String,int]
+
+ /**
+ * Create a fresh name with the given prefix. It is guaranteed
+ * that the returned name has never been returned by a previous
+ * call to this function (provided the prefix does not end in a digit).
+ */
+ def newName(prefix: String): Symbol = {
+ val count = counters.get(prefix) match {
+ case Some(last) => last + 1
+ case None => 0
+ }
+ counters.update(prefix, count)
+ new Symbol(prefix + count)
+ }
+
+ def newName(): Symbol = {
+ counter = counter + 1
+ new Symbol("$" + counter + "$")
+ }
+}
diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala
index 7c2651293a..a2ef350cd6 100644
--- a/src/actors/scala/actors/distributed/JavaSerializer.scala
+++ b/src/actors/scala/actors/distributed/JavaSerializer.scala
@@ -1,14 +1,5 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
import java.io._
@@ -18,8 +9,8 @@ import scala.io.BytePickle.SPU
class JavaSerializer(serv: Service) extends Serializer(serv) {
val debug = true
- def log (s: String) =
- if (debug) scala.Console.println("JAVASerializer: " + s)
+ def log(s: String) =
+ if (debug) Console.println("JavaSerializer: " + s)
def serialize(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -35,6 +26,5 @@ class JavaSerializer(serv: Service) extends Serializer(serv) {
in.readObject()
}
- def pid: SPU[RemotePid] = null
def addRep(name: String, repCons: Serializer => AnyRef): Unit = {}
}
diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala
index 481ce94833..93b3b33c24 100644
--- a/src/actors/scala/actors/distributed/NetKernel.scala
+++ b/src/actors/scala/actors/distributed/NetKernel.scala
@@ -1,597 +1,71 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
import java.io.{IOException,StringReader,StringWriter}
import java.lang.SecurityException
import java.net.UnknownHostException
-import java.util.logging.{ConsoleHandler,Level,Logger}
-
import scala.collection.mutable.{HashMap,HashSet}
-import scala.actors.multi.{Actor,ExcHandlerDesc}
-case class RA(a: RemoteActor)
+case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
-object NetKernel {
- var kernel: NetKernel = null
-}
-
-/**
- * @author Philipp Haller
- */
class NetKernel(service: Service) {
- NetKernel.kernel = this
-
- // contains constructors
- private val ptable =
- new HashMap[String, () => RemoteActor]
-
- // maps local ids to scala.actors
- private val rtable =
- new HashMap[int, RemoteActor]
-
- // maps scala.actors to their RemotePid
- private val pidTable =
- new HashMap[RemoteActor, RemotePid]
-
- private var running = true;
-
- val logLevel = Level.FINE
- val l = Logger.getLogger("NetKernel")
- l.setLevel(logLevel)
- val consHand = new ConsoleHandler
- consHand.setLevel(logLevel)
- l.addHandler(consHand)
-
- //start // start NetKernel
-
- /** only called if destDesc is local. */
- def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
- destDesc.p match {
- case rpid: RemotePid =>
- (rtable get rpid.localId) match {
- case Some(actor) =>
- actor.handleExc(destDesc, e)
- case None =>
- error("exc desc refers to non-registered actor")
- }
- }
-
- def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) =
- // locality check (handler local to this node?)
- destDesc.p match {
- case rpid: RemotePid =>
- if (rpid.node == this.node)
- handleExc(destDesc, e)
- else
- sendToNode(rpid.node, ForwardExc(destDesc, e))
- }
def sendToNode(node: Node, msg: AnyRef) = {
- //val sw = new StringWriter
- val bytes = service.serializer.serialize(msg /*, sw*/)
+ val bytes = service.serializer.serialize(msg)
service.send(node, bytes)
- //service.send(node, sw.toString())
}
- def addConstructor(key: String, value: () => RemoteActor) =
- ptable.update(key, value);
-
- def node: Node = service.node
-
- def nodes: List[Node] = service.nodes
-
- def pidOf(actor: RemoteActor): RemotePid = synchronized {
- pidTable.get(actor) match {
- case None => error("malformed pid table in " + this)
- case Some(pid) => pid
- }
- }
-
- def disconnectNode(n: Node) = synchronized {
- service.disconnectNode(n)
- }
-
- def getLocalRef(locId: Int): RemoteActor =
- rtable.get(locId) match {
- case None =>
- error("" + locId + " is not registered at " + this)
- case Some(remoteActor: RemoteActor) =>
- remoteActor
- }
-
- def localSend(localId: Int, msg: Any): Unit = synchronized {
- rtable.get(localId) match {
- case None =>
- error("" + localId + " is not registered at " + this)
- case Some(remoteActor: RemoteActor) =>
- //Console.println("local send to " + remoteActor)
- remoteActor send msg
- }
- }
-
- def localSend(pid: RemotePid, msg: Any): Unit =
- localSend(pid.localId, msg)
-
- def remoteSend(pid: RemotePid, msg: Any) = synchronized {
- //Console.println("NetKernel: Remote msg delivery to " + pid)
- msg match {
- case m: AnyRef =>
- service.remoteSend(pid, m)
- }
+ def namedSend(node: Node, senderName: Symbol, receiver: Symbol, msg: AnyRef): Unit = {
+ val bytes = service.serializer.serialize(msg)
+ sendToNode(node, NamedSend(senderName, receiver, bytes))
}
- def namedSend(name: Name, msg: AnyRef): Unit =
- if (name.node == this.node) {
- // look-up name
- nameTable.get(name.sym) match {
- case None =>
- // message is lost
- //Console.println("lost message " + msg + " because " + name + " not registered.")
- case Some(localId) => localSend(localId, msg)
+ def send(node: Node, name: Symbol, msg: AnyRef): Unit = {
+ val senderName = names.get(Actor.self) match {
+ case None => {
+ val freshName = FreshNameCreator.newName("remotesender")
+ register(freshName, Actor.self)
+ freshName
}
+ case Some(name) => name
}
- else {
- // remote send
-
- // serialize msg
- ///val sw = new StringWriter
- val bytes = service.serializer.serialize(msg/*, sw*/)
-
- sendToNode(name.node, NamedSend(name.sym, bytes))
- //sendToNode(name.node, NamedSend(name.sym, sw.toString()))
- }
-
- val nameTable = new HashMap[Symbol, Int]
-
- def registerName(name: Symbol, pid: RemotePid): Unit = synchronized {
- nameTable += name -> pid.localId
- }
-
- def registerName(name: Symbol, a: RemoteActor): Unit = synchronized {
- val pid = register(a)
- registerName(name, pid)
- a.start
+ namedSend(node, senderName, name, msg)
}
- /*override def run: unit = receive {
- case ForwardExc(destDesc, e) =>
- // TODO
- case Spawn(reply: RemotePid, pname) =>
- val newPid = spawn(pname)
- // need to send back the Pid
- remoteSend(reply, newPid)
- run
-
- case SpawnObject(reply: RemotePid, data: Array[byte]) =>
- //val sr = new StringReader(data)
- //service.serializer.deserialize(sr) match {
-
- service.serializer.deserialize(data) match {
- case RA(actor) =>
- val newPid = register(actor)
- //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
- actor.start
- // need to send back the Pid
- remoteSend(reply, newPid)
- }
- run
-
- case NamedSend(sym: Symbol, data) =>
- // look-up name
- nameTable.get(sym) match {
- case None =>
- // message is lost
- Console.println("lost message " + data + " because " + sym + " not registered.")
- case Some(localId) =>
- // deserialize data
- //val sr = new StringReader(data)
- //val msg = service.serializer.deserialize(sr)
-
- val msg = service.serializer.deserialize(data)
- localSend(localId, msg)
- }
- run
-
- case Send(pid: RemotePid, data) =>
- // deserialize data
- //val sr = new StringReader(data)
- //val msg = service.serializer.deserialize(sr)
- val msg = service.serializer.deserialize(data)
-
- Console.println("locally send " + msg + " to " + pid)
- localSend(pid, msg)
- run
-
- case Link(from:RemotePid, to:RemotePid) =>
- // assume from is local
- linkFromLocal(from, to)
- run
-
- case UnLink(from:RemotePid, to:RemotePid) =>
- // assume from is local
- unlinkFromLocal(from, to)
- run
-
- case Exit1(from:RemotePid, to:RemotePid, reason) =>
- // check if "to" traps exit signals
- // if so send a message
- if (trapExits.contains(to.localId))
- // convert signal into message
- localSend(to, Exit1(from, to, reason))
- else
- if (reason.name.equals("normal")) {
- // ignore signal
- }
- else
- exit(from, to, reason)
- run
- }*/
-
- // TODO
- /*def isReachable(remoteNode: Node): boolean = {
- val pingMsg = new Ping(node)
- val sw = new StringWriter
- service.serializer.serialize(pingMsg, sw)
- service.send(remoteNode, sw.toString())
- }*/
-
- def processMsg(msg: AnyRef): Unit = synchronized {
+ def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
msg match {
- case Spawn(reply: RemotePid, pname) =>
- val newPid = spawn(pname)
- // need to send back the Pid
- remoteSend(reply, newPid)
-
- case SpawnObject(reply: RemotePid, data) =>
- //val sr = new StringReader(data)
- //service.serializer.deserialize(sr) match {
-
- service.serializer.deserialize(data) match {
- case RA(actor) =>
- val newPid = register(actor)
- //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
- actor.start
- // need to send back the Pid
- remoteSend(reply, newPid)
- }
-
- case NamedSend(sym: Symbol, data) =>
- // look-up name
- nameTable.get(sym) match {
- case None =>
- // message is lost
- //Console.println("lost message " + msg + " because " + sym + " not registered.")
- case Some(localId) =>
- // deserialize data
- //val sr = new StringReader(data)
- //val msg = service.serializer.deserialize(sr)
+ case NamedSend(senderName, receiver, data) =>
+ actors.get(receiver) match {
+ case Some(a) => {
+ Debug.info("processing message from " + senderName + " on " + senderNode)
val msg = service.serializer.deserialize(data)
- localSend(localId, msg)
- }
-
- case Send(pid: RemotePid, data) =>
- // deserialize data
- //val sr = new StringReader(data)
- //val msg = service.serializer.deserialize(sr)
- val msg = service.serializer.deserialize(data)
- //Console.println("locally send " + msg + " to " + pid)
- localSend(pid, msg)
-
- case Link(from:RemotePid, to:RemotePid) =>
- // assume from is local
- linkFromLocal(from, to)
-
- case UnLink(from:RemotePid, to:RemotePid) =>
- // assume from is local
- unlinkFromLocal(from, to)
-
- case Exit1(from:RemotePid, to:RemotePid, reason) =>
- // check if "to" traps exit signals
- // if so send a message
- if (trapExits.contains(to.localId)) {
- // convert signal into message
- // TODO: simpler message (w/o to) for actor!
- localSend(to, Exit1(from, to, reason))
- }
- else {
- if (reason.name.equals("normal")) {
- // ignore signal
- }
- else
- exit(from, to, reason)
- }
- }
- }
-
- /* Registers an instance of a remote actor inside this NetKernel.
- */
- def register(newProc: RemoteActor): RemotePid = synchronized {
- newProc.kernel = this
- val newPid = service.createPid(newProc)
- rtable += newPid.localId -> newProc
- pidTable += newProc -> newPid
- newPid
- }
-
- // local spawn
- def spawn(pname: String): RemotePid = synchronized {
- // get constructor out of table
- (ptable.get(pname)) match {
- case None =>
- //error("No constructor found. Cannot start process.")
- //null
-
- val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor]
- // create Pid for remote communication and register process
- val newPid = register(newProc)
- //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
- newProc.start
- newPid
-
- case Some(cons) =>
- val newProc = cons()
- // create Pid for remote communication and register process
- val newPid = register(newProc)
- //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
- newProc.start
- newPid
- }
- }
-
- // local spawn
- def spawn(name: String, arg: RemotePid): RemotePid = synchronized {
- val newPid = spawn(name)
- localSend(newPid, arg)
- newPid
- }
-
- // assume this.node != node
- def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): Unit = {
- val ra = RA(a)
- //val rsw = new StringWriter
- //service.serializer.serialize(ra, rsw)
- val bytes = service.serializer.serialize(ra)
- //sendToNode(node, SpawnObject(replyTo, rsw.toString()))
- sendToNode(node, SpawnObject(replyTo, bytes))
- }
-
-
- def registerSerializer(index: String, rep: Serializer => AnyRef) = {
- service.serializer.addRep(index, rep)
-
- // send registering requests to remote nodes
- }
-
- // remote spawn
- def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized {
- // check if actor is to be spawned locally
- if (node == this.node) {
- val newPid = spawn(name)
- newPid
- }
- else {
- sendToNode(node, Spawn(replyTo, name))
- null // pid needs to be sent back
- }
- }
-
- /* Spawns a new actor (locally), executing "fun".
- */
- def spawn(fun: RemoteActor => Unit): RemotePid = synchronized {
- val newProc = new RemoteActor {
- override def run: unit =
- fun(this);
- }
-
- // create Pid for remote communication and register process
- val newPid = register(newProc)
- //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
- newProc.start
- newPid
- }
-
- /* Spawns a new actor (locally), executing "fun".
- */
- def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized {
- val newProc = new RemoteActor {
- override def run: unit =
- fun(this);
- }
-
- // create Pid for remote communication and register process
- val newPid = register(newProc)
- //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
-
- // link new process to pid (assume pid is local)
- link(pid, newPid)
-
- newProc.start
- newPid
- }
-
- // maps local ids to their linked pids
- private val links = new HashMap[int,HashSet[RemotePid]];
-
- // which of the local processes traps exit signals?
- private val trapExits = new HashSet[int];
-
- def processFlag(pid: RemotePid, flag: Symbol, set: Boolean) = synchronized {
- if (flag.name.equals("trapExit")) {
- if (trapExits.contains(pid.localId) && !set)
- trapExits -= pid.localId
- else if (!trapExits.contains(pid.localId) && set)
- trapExits += pid.localId
- }
- }
-
- // assume from.node == this.node
- private def unlinkFromLocal(from: RemotePid, to: RemotePid): Unit =
- links.get(from.localId) match {
- case None =>
- // has no links -> ignore
- case Some(set) =>
- set -= to
- if (set.size == 0) links -= from.localId
- };
-
- /*
- unlinks bi-directional link
- assume from.node == this.node
- */
- def unlink(from: RemotePid, to: RemotePid): Unit = synchronized {
- unlinkFromLocal(from, to)
- if (to.node == this.node)
- unlinkFromLocal(to, from)
- else
- // (2) send message to NetKernel of "to" to unlink a
- // uni-directional link from "to" to "from"
- sendToNode(to.node, UnLink(to, from))
- }
-
- // assume from.node == this.node
- private def linkFromLocal(from: RemotePid, to: RemotePid): Unit =
- // TODO: send Exit to from if to is invalid
- links.get(from.localId) match {
- case None =>
- // from has no links, yet
- val linksTo = new HashSet[RemotePid]
- linksTo += to
- links += from.localId -> linksTo
- case Some(set) =>
- set += to
- };
-
- /*
- creates bi-directional link
- assume from.node == this.node
- */
- def link(from: RemotePid, to: RemotePid): unit = synchronized {
- // (1) create locally a uni-directional link
- linkFromLocal(from, to)
- if (to.node == this.node)
- linkFromLocal(to, from)
- else
- // (2) send message to NetKernel of "to" to create a
- // uni-directional link from "to" to "from"
- sendToNode(to.node, Link(to, from))
- }
-
- // Assume "to" is local.
- def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = {
- // remove link
- unlinkFromLocal(to, from)
- exit(to, reason)
- }
-
- val exitMarks = new HashSet[RemotePid]
-
- /*
- If reason is unequal to 'normal then
- this will cause all linked processes to
- (transitively) terminate abnormally.
-
- Assume pid is local.
- */
- def exit(pid: RemotePid, reason: Symbol): Unit = synchronized {
- if (!(exitMarks contains pid)) {
- exitMarks += pid // mark pid as exiting
- //Console.println("" + pid + " is exiting (" + reason + ").")
-
- // first look-up remote actor in rtable
- val actor = rtable(pid.localId)
- // remove from table of running processes
- rtable -= pid.localId
- // remove from pid table
- pidTable -= actor
-
- // send exit signals to linked processes
- links.get(pid.localId) match {
- case None =>
- //Console.println("no linked processes")
-
- case Some(set) => // set of remote pids that we want to terminate
- //Console.println("sending exit signals to linked processes")
-
- val iter = set.elements
- while (iter.hasNext) {
- val linkedPid = iter.next
-
- unlinkFromLocal(pid, linkedPid)
-
- if (linkedPid.node == this.node) {
- unlinkFromLocal(linkedPid, pid)
-
- if (trapExits.contains(linkedPid.localId))
- localSend(linkedPid, Exit1(pid, linkedPid, reason))
- else if (!reason.name.equals("normal"))
- exit(linkedPid, reason)
+ val senderProxy = new Reactor {
+ override def run() = { a ! msg }
+ override def !(msg: Any): Unit = {
+ msg match {
+ case refmsg: AnyRef => {
+ Debug.info("sending " + msg + " to " + senderName + " on " + senderNode)
+ namedSend(senderNode, receiver, senderName, refmsg)
+ }
+ }
+ }
+ override def !?(msg: Any): Any =
+ error("!? not implemented for remote actors.")
}
- else
- sendToNode(linkedPid.node,
- Exit1(pid, linkedPid, reason))
+ senderProxy.start()
}
- exitMarks -= pid
- }
- }
- }
-
- private val monNodes =
- new HashMap[Node,HashMap[RemotePid,Int]]
-
- def monitorNode(client: RemotePid, mnode: Node, cond: Boolean) = synchronized {
- monNodes.get(mnode) match {
- case None =>
- // nobody is monitoring this node
- if (cond) {
- val map = new HashMap[RemotePid,int]
- map += client -> 1
- monNodes += mnode -> map
+ case None => // message is lost
+ Console.println("" + receiver + " not registered in NetKernel.")
}
- case Some(map) =>
- map.update(client, map(client) + (if (cond) 1 else -1))
- }
-
- // if no connection exists:
- // try connecting, if it fails deliver nodedown msg
- if (cond && !service.isConnected(mnode)) {
- try {
- service.connect(mnode)
- }
- catch {
- case uhe: UnknownHostException =>
- nodeDown(mnode)
- case ioe: IOException =>
- nodeDown(mnode)
- case se: SecurityException =>
- nodeDown(mnode)
- }
}
}
- def nodeDown(mnode: Node) =
- // send NodeDown msg to registered RemotePids
- monNodes.get(mnode) match {
- case None =>
- // nobody is monitoring this node
- case Some(map) =>
- // iterate over keys (RemotePids of interested clients)
- val iter = map.keys
- while (iter.hasNext) {
- val client = iter.next
- for (val i <- List.range(0, map(client))) {
- // send nodedown msg
- client ! Pair(NodeDown(), mnode)
- }
- }
- }
+ private val actors = new HashMap[Symbol, Actor]
+ private val names = new HashMap[Actor, Symbol]
+ private[nactors] def register(name: Symbol, a: Actor): Unit = synchronized {
+ Debug.info("registering " + a + " as " + name)
+ actors += name -> a
+ names += a -> name
+ }
}
diff --git a/src/actors/scala/actors/distributed/Node.scala b/src/actors/scala/actors/distributed/Node.scala
index 09ecd5dc95..d70e9779d4 100644
--- a/src/actors/scala/actors/distributed/Node.scala
+++ b/src/actors/scala/actors/distributed/Node.scala
@@ -1,16 +1,7 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
[serializable] abstract class Node;
-[serializable] case class TcpNode(address: String, port: Int) extends Node
-[serializable] case class JXTANode(name: String) extends Node
+[serializable] case class TcpNode(address: String, port: Int) extends Node;
+[serializable] case class JxtaNode(group: String) extends Node;
diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala
index bdab8be7d1..527aad83dd 100644
--- a/src/actors/scala/actors/distributed/Serializer.scala
+++ b/src/actors/scala/actors/distributed/Serializer.scala
@@ -1,22 +1,10 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
import java.io.{DataInputStream,DataOutputStream,EOFException}
import scala.io.BytePickle.SPU
-/**
- * @author Philipp Haller
- */
-abstract class Serializer(s: Service) {
+abstract class Serializer(val service: Service) {
def serialize(o: AnyRef/*, w: Writer*/): Array[byte]
def deserialize(a: Array[byte]/*r: Reader*/): AnyRef
@@ -54,8 +42,4 @@ abstract class Serializer(s: Service) {
val bytes = serialize(obj)
writeBytes(outputStream, bytes)
}
-
- def pid: SPU[RemotePid]
- def service = s
- def addRep(name: String, repCons: Serializer => AnyRef): unit
}
diff --git a/src/actors/scala/actors/distributed/Service.scala b/src/actors/scala/actors/distributed/Service.scala
index f5164751b3..e404d3a73d 100644
--- a/src/actors/scala/actors/distributed/Service.scala
+++ b/src/actors/scala/actors/distributed/Service.scala
@@ -1,83 +1,23 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
import java.io.StringWriter
-/**
- * @author Philipp Haller
- */
trait Service {
- val serializer: Serializer
- def node: Node
- def createPid(actor: RemoteActor): RemotePid
- def send(node: Node, data: Array[byte]): Unit
- def connect(node: Node): Unit // non blocking.
- def disconnectNode(node: Node): Unit
- def isConnected(node: Node): Boolean
-
- //blocking. timeout depends on Implementation.
- def isReachable(node: Node): Boolean
-
- def getRoundTripTimeMillis(node: Node): Long //blocking
-
- def nodes:List[Node]
-
-// implemented parts:
-
private val kern = new NetKernel(this)
def kernel = kern
- def spawn(name: String): RemotePid =
- kern spawn name
-
- def spawn(name: String, arg: RemotePid): RemotePid =
- kern.spawn(name, arg)
-
-
- //suggested addition by seb
- def spawn(fun: RemoteActor => unit): RemotePid =
- kernel.spawn(fun);
- def spawn(a:RemoteActor):RemotePid = {
- //Console.println("Service:spawn(RemoteActor)")
- val pid = kernel.register(a)
- //Console.println("RemoteActor("+a+") registered in kernel")
- a.start
- //Console.println("RemoteActor("+a+") started")
- pid
- }
-
- def send(pid: RemotePid, msg: AnyRef): unit = synchronized {
- if (pid.node == this.node)
- kernel.localSend(pid, msg)
- else
- kernel.remoteSend(pid, msg)
- }
-
- def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized {
- //Console.println("Service: Sending " + msg + " to " + pid)
- // lets try to serialize the message
- //val sw = new StringWriter
- //serializer.serialize(msg, sw)
- val bytes = serializer.serialize(msg)
- //val sendMsg = Send(pid, sw.toString())
- val sendMsg = Send(pid, bytes)
- //val sw2 = new StringWriter
- //serializer.serialize(sendMsg, sw2)
- //send(pid.node, sw2.toString())
- val bytes2 = serializer.serialize(sendMsg)
- send(pid.node, bytes2)
- }
+ val serializer: Serializer
+ def node: Node
- private var idCnt = 0
- def makeUid = { idCnt = idCnt + 1; idCnt }
+ def send(node: Node, data: Array[byte]): Unit
+/*
+ def connect(node: Node): Unit // non-blocking
+ def disconnectNode(node: Node): Unit
+ def isConnected(node: Node): Boolean
+ def isReachable(node: Node): Boolean // blocking
+ def getRoundTripTimeMillis(node: Node): Long // blocking
+ def nodes: List[Node]
+*/
}
diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala
index a1efa01d9b..f1ea43ad27 100644
--- a/src/actors/scala/actors/distributed/TcpService.scala
+++ b/src/actors/scala/actors/distributed/TcpService.scala
@@ -1,21 +1,9 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
import java.io.IOException
import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException}
-/**
- * @author Philipp Haller
- */
object TcpService {
val random = new java.util.Random(System.currentTimeMillis())
@@ -38,26 +26,12 @@ object TcpService {
}
}
-object TestPorts {
- def main(args: Array[String]): Unit = {
- val random = new java.util.Random(System.currentTimeMillis())
- val socket = new ServerSocket(8000 + random.nextInt(500))
- Console.println(TcpService.generatePort)
- }
-}
-
-/**
- * @author Philipp Haller
- */
class TcpService(port: Int) extends Thread with Service {
val serializer: JavaSerializer = new JavaSerializer(this)
- private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port);
+ private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port)
def node: TcpNode = internalNode
- def createPid(actor: RemoteActor): RemotePid =
- new TcpPid(internalNode, makeUid, kernel, actor)
-
def send(node: Node, data: String): unit = synchronized {
// retrieve worker thread (if any) that already has connection
node match {
@@ -66,7 +40,7 @@ class TcpService(port: Int) extends Thread with Service {
case None =>
// we are not connected, yet
Console.println("We are not connected, yet.");
- val newWorker = connect(tnode); //bad in a sync BLOCK!!!
+ val newWorker = connect(tnode); // bad in a sync block
newWorker transmit data
case Some(worker) => worker transmit data
}
@@ -120,7 +94,7 @@ class TcpService(port: Int) extends Thread with Service {
// connection management
private val connections =
- new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker]
+ new scala.collection.mutable.HashMap[TcpNode, TcpServiceWorker]
def nodes: List[Node] =
throw new Exception ("nodes need to be implemented in TcpService!")
@@ -155,7 +129,7 @@ class TcpService(port: Int) extends Thread with Service {
Console.println("Connected.")
// spawn new worker thread
val worker = new TcpServiceWorker(this, sock)
- worker.sendNode;
+ worker.sendNode(n);
// start worker thread
worker.start()
// register locally (we want to reuse connections which correspond to connected sockets)
@@ -200,7 +174,7 @@ class TcpService(port: Int) extends Thread with Service {
def getRoundTripTimeMillis(node: Node): Long = 0
def nodeDown(mnode: TcpNode): Unit = synchronized {
- kernel nodeDown mnode
+ //kernel nodeDown mnode
connections -= mnode
}
diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
index 2587319fdb..36b0cfe14f 100644
--- a/src/actors/scala/actors/distributed/TcpServiceWorker.scala
+++ b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
@@ -1,21 +1,9 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-// $Id$
-
-package scala.actors.distributed
+package nactors.distributed
import java.io._
import java.net.Socket
-/**
- * @author Philipp Haller
- */
class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val in = so.getInputStream()
val out = so.getOutputStream()
@@ -29,10 +17,12 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val log = new Debug("TcpServiceWorker")
log.level = 2
+/*
def transmit(msg: Send): Unit = synchronized {
val data = parent.serializer.serialize(msg)
transmit(data)
}
+*/
def transmit(data: String): Unit = synchronized {
log.info("Transmitting " + data)
@@ -47,7 +37,8 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
dataout.flush()
}
- def sendNode = {
+ def sendNode(n: TcpNode) = {
+ connectedNode = n
scala.Console.println("Sending our name " + parent.node)
parent.serializer.writeObject(dataout, parent.node)
}
@@ -69,6 +60,7 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
}
var running = true
+
def halt = synchronized {
so.close() // close socket
running = false // stop
@@ -81,8 +73,8 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
log.info("deserializing...");
//val msg = parent.serializer.deserialize(reader);
val msg = parent.serializer.readObject(datain);
- log.info("Received object: " + msg);
- parent.kernel.processMsg(msg)
+ log.info("Received object: " + msg + " from " + connectedNode);
+ parent.kernel.processMsg(connectedNode, msg)
}
}
}