diff options
8 files changed, 103 insertions, 730 deletions
diff --git a/src/actors/scala/actors/distributed/FreshNameCreator.scala b/src/actors/scala/actors/distributed/FreshNameCreator.scala new file mode 100644 index 0000000000..ecf6e3f78a --- /dev/null +++ b/src/actors/scala/actors/distributed/FreshNameCreator.scala @@ -0,0 +1,28 @@ +package nactors.distributed + +import scala.collection.mutable.HashMap + +object FreshNameCreator { + + protected var counter = 0 + protected val counters = new HashMap[String,int] + + /** + * Create a fresh name with the given prefix. It is guaranteed + * that the returned name has never been returned by a previous + * call to this function (provided the prefix does not end in a digit). + */ + def newName(prefix: String): Symbol = { + val count = counters.get(prefix) match { + case Some(last) => last + 1 + case None => 0 + } + counters.update(prefix, count) + new Symbol(prefix + count) + } + + def newName(): Symbol = { + counter = counter + 1 + new Symbol("$" + counter + "$") + } +} diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala index 7c2651293a..a2ef350cd6 100644 --- a/src/actors/scala/actors/distributed/JavaSerializer.scala +++ b/src/actors/scala/actors/distributed/JavaSerializer.scala @@ -1,14 +1,5 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed import java.io._ @@ -18,8 +9,8 @@ import scala.io.BytePickle.SPU class JavaSerializer(serv: Service) extends Serializer(serv) { val debug = true - def log (s: String) = - if (debug) scala.Console.println("JAVASerializer: " + s) + def log(s: String) = + if (debug) Console.println("JavaSerializer: " + s) def serialize(o: AnyRef): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -35,6 +26,5 @@ class JavaSerializer(serv: Service) extends Serializer(serv) { in.readObject() } - def pid: SPU[RemotePid] = null def addRep(name: String, repCons: Serializer => AnyRef): Unit = {} } diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala index 481ce94833..93b3b33c24 100644 --- a/src/actors/scala/actors/distributed/NetKernel.scala +++ b/src/actors/scala/actors/distributed/NetKernel.scala @@ -1,597 +1,71 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed import java.io.{IOException,StringReader,StringWriter} import java.lang.SecurityException import java.net.UnknownHostException -import java.util.logging.{ConsoleHandler,Level,Logger} - import scala.collection.mutable.{HashMap,HashSet} -import scala.actors.multi.{Actor,ExcHandlerDesc} -case class RA(a: RemoteActor) +case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte]) -object NetKernel { - var kernel: NetKernel = null -} - -/** - * @author Philipp Haller - */ class NetKernel(service: Service) { - NetKernel.kernel = this - - // contains constructors - private val ptable = - new HashMap[String, () => RemoteActor] - - // maps local ids to scala.actors - private val rtable = - new HashMap[int, RemoteActor] - - // maps scala.actors to their RemotePid - private val pidTable = - new HashMap[RemoteActor, RemotePid] - - private var running = true; - - val logLevel = Level.FINE - val l = Logger.getLogger("NetKernel") - l.setLevel(logLevel) - val consHand = new ConsoleHandler - consHand.setLevel(logLevel) - l.addHandler(consHand) - - //start // start NetKernel - - /** only called if destDesc is local. */ - def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = - destDesc.p match { - case rpid: RemotePid => - (rtable get rpid.localId) match { - case Some(actor) => - actor.handleExc(destDesc, e) - case None => - error("exc desc refers to non-registered actor") - } - } - - def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = - // locality check (handler local to this node?) - destDesc.p match { - case rpid: RemotePid => - if (rpid.node == this.node) - handleExc(destDesc, e) - else - sendToNode(rpid.node, ForwardExc(destDesc, e)) - } def sendToNode(node: Node, msg: AnyRef) = { - //val sw = new StringWriter - val bytes = service.serializer.serialize(msg /*, sw*/) + val bytes = service.serializer.serialize(msg) service.send(node, bytes) - //service.send(node, sw.toString()) } - def addConstructor(key: String, value: () => RemoteActor) = - ptable.update(key, value); - - def node: Node = service.node - - def nodes: List[Node] = service.nodes - - def pidOf(actor: RemoteActor): RemotePid = synchronized { - pidTable.get(actor) match { - case None => error("malformed pid table in " + this) - case Some(pid) => pid - } - } - - def disconnectNode(n: Node) = synchronized { - service.disconnectNode(n) - } - - def getLocalRef(locId: Int): RemoteActor = - rtable.get(locId) match { - case None => - error("" + locId + " is not registered at " + this) - case Some(remoteActor: RemoteActor) => - remoteActor - } - - def localSend(localId: Int, msg: Any): Unit = synchronized { - rtable.get(localId) match { - case None => - error("" + localId + " is not registered at " + this) - case Some(remoteActor: RemoteActor) => - //Console.println("local send to " + remoteActor) - remoteActor send msg - } - } - - def localSend(pid: RemotePid, msg: Any): Unit = - localSend(pid.localId, msg) - - def remoteSend(pid: RemotePid, msg: Any) = synchronized { - //Console.println("NetKernel: Remote msg delivery to " + pid) - msg match { - case m: AnyRef => - service.remoteSend(pid, m) - } + def namedSend(node: Node, senderName: Symbol, receiver: Symbol, msg: AnyRef): Unit = { + val bytes = service.serializer.serialize(msg) + sendToNode(node, NamedSend(senderName, receiver, bytes)) } - def namedSend(name: Name, msg: AnyRef): Unit = - if (name.node == this.node) { - // look-up name - nameTable.get(name.sym) match { - case None => - // message is lost - //Console.println("lost message " + msg + " because " + name + " not registered.") - case Some(localId) => localSend(localId, msg) + def send(node: Node, name: Symbol, msg: AnyRef): Unit = { + val senderName = names.get(Actor.self) match { + case None => { + val freshName = FreshNameCreator.newName("remotesender") + register(freshName, Actor.self) + freshName } + case Some(name) => name } - else { - // remote send - - // serialize msg - ///val sw = new StringWriter - val bytes = service.serializer.serialize(msg/*, sw*/) - - sendToNode(name.node, NamedSend(name.sym, bytes)) - //sendToNode(name.node, NamedSend(name.sym, sw.toString())) - } - - val nameTable = new HashMap[Symbol, Int] - - def registerName(name: Symbol, pid: RemotePid): Unit = synchronized { - nameTable += name -> pid.localId - } - - def registerName(name: Symbol, a: RemoteActor): Unit = synchronized { - val pid = register(a) - registerName(name, pid) - a.start + namedSend(node, senderName, name, msg) } - /*override def run: unit = receive { - case ForwardExc(destDesc, e) => - // TODO - case Spawn(reply: RemotePid, pname) => - val newPid = spawn(pname) - // need to send back the Pid - remoteSend(reply, newPid) - run - - case SpawnObject(reply: RemotePid, data: Array[byte]) => - //val sr = new StringReader(data) - //service.serializer.deserialize(sr) match { - - service.serializer.deserialize(data) match { - case RA(actor) => - val newPid = register(actor) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - actor.start - // need to send back the Pid - remoteSend(reply, newPid) - } - run - - case NamedSend(sym: Symbol, data) => - // look-up name - nameTable.get(sym) match { - case None => - // message is lost - Console.println("lost message " + data + " because " + sym + " not registered.") - case Some(localId) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - - val msg = service.serializer.deserialize(data) - localSend(localId, msg) - } - run - - case Send(pid: RemotePid, data) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - val msg = service.serializer.deserialize(data) - - Console.println("locally send " + msg + " to " + pid) - localSend(pid, msg) - run - - case Link(from:RemotePid, to:RemotePid) => - // assume from is local - linkFromLocal(from, to) - run - - case UnLink(from:RemotePid, to:RemotePid) => - // assume from is local - unlinkFromLocal(from, to) - run - - case Exit1(from:RemotePid, to:RemotePid, reason) => - // check if "to" traps exit signals - // if so send a message - if (trapExits.contains(to.localId)) - // convert signal into message - localSend(to, Exit1(from, to, reason)) - else - if (reason.name.equals("normal")) { - // ignore signal - } - else - exit(from, to, reason) - run - }*/ - - // TODO - /*def isReachable(remoteNode: Node): boolean = { - val pingMsg = new Ping(node) - val sw = new StringWriter - service.serializer.serialize(pingMsg, sw) - service.send(remoteNode, sw.toString()) - }*/ - - def processMsg(msg: AnyRef): Unit = synchronized { + def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { msg match { - case Spawn(reply: RemotePid, pname) => - val newPid = spawn(pname) - // need to send back the Pid - remoteSend(reply, newPid) - - case SpawnObject(reply: RemotePid, data) => - //val sr = new StringReader(data) - //service.serializer.deserialize(sr) match { - - service.serializer.deserialize(data) match { - case RA(actor) => - val newPid = register(actor) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - actor.start - // need to send back the Pid - remoteSend(reply, newPid) - } - - case NamedSend(sym: Symbol, data) => - // look-up name - nameTable.get(sym) match { - case None => - // message is lost - //Console.println("lost message " + msg + " because " + sym + " not registered.") - case Some(localId) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) + case NamedSend(senderName, receiver, data) => + actors.get(receiver) match { + case Some(a) => { + Debug.info("processing message from " + senderName + " on " + senderNode) val msg = service.serializer.deserialize(data) - localSend(localId, msg) - } - - case Send(pid: RemotePid, data) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - val msg = service.serializer.deserialize(data) - //Console.println("locally send " + msg + " to " + pid) - localSend(pid, msg) - - case Link(from:RemotePid, to:RemotePid) => - // assume from is local - linkFromLocal(from, to) - - case UnLink(from:RemotePid, to:RemotePid) => - // assume from is local - unlinkFromLocal(from, to) - - case Exit1(from:RemotePid, to:RemotePid, reason) => - // check if "to" traps exit signals - // if so send a message - if (trapExits.contains(to.localId)) { - // convert signal into message - // TODO: simpler message (w/o to) for actor! - localSend(to, Exit1(from, to, reason)) - } - else { - if (reason.name.equals("normal")) { - // ignore signal - } - else - exit(from, to, reason) - } - } - } - - /* Registers an instance of a remote actor inside this NetKernel. - */ - def register(newProc: RemoteActor): RemotePid = synchronized { - newProc.kernel = this - val newPid = service.createPid(newProc) - rtable += newPid.localId -> newProc - pidTable += newProc -> newPid - newPid - } - - // local spawn - def spawn(pname: String): RemotePid = synchronized { - // get constructor out of table - (ptable.get(pname)) match { - case None => - //error("No constructor found. Cannot start process.") - //null - - val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor] - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - newProc.start - newPid - - case Some(cons) => - val newProc = cons() - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - newProc.start - newPid - } - } - - // local spawn - def spawn(name: String, arg: RemotePid): RemotePid = synchronized { - val newPid = spawn(name) - localSend(newPid, arg) - newPid - } - - // assume this.node != node - def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): Unit = { - val ra = RA(a) - //val rsw = new StringWriter - //service.serializer.serialize(ra, rsw) - val bytes = service.serializer.serialize(ra) - //sendToNode(node, SpawnObject(replyTo, rsw.toString())) - sendToNode(node, SpawnObject(replyTo, bytes)) - } - - - def registerSerializer(index: String, rep: Serializer => AnyRef) = { - service.serializer.addRep(index, rep) - - // send registering requests to remote nodes - } - - // remote spawn - def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized { - // check if actor is to be spawned locally - if (node == this.node) { - val newPid = spawn(name) - newPid - } - else { - sendToNode(node, Spawn(replyTo, name)) - null // pid needs to be sent back - } - } - - /* Spawns a new actor (locally), executing "fun". - */ - def spawn(fun: RemoteActor => Unit): RemotePid = synchronized { - val newProc = new RemoteActor { - override def run: unit = - fun(this); - } - - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - newProc.start - newPid - } - - /* Spawns a new actor (locally), executing "fun". - */ - def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized { - val newProc = new RemoteActor { - override def run: unit = - fun(this); - } - - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - - // link new process to pid (assume pid is local) - link(pid, newPid) - - newProc.start - newPid - } - - // maps local ids to their linked pids - private val links = new HashMap[int,HashSet[RemotePid]]; - - // which of the local processes traps exit signals? - private val trapExits = new HashSet[int]; - - def processFlag(pid: RemotePid, flag: Symbol, set: Boolean) = synchronized { - if (flag.name.equals("trapExit")) { - if (trapExits.contains(pid.localId) && !set) - trapExits -= pid.localId - else if (!trapExits.contains(pid.localId) && set) - trapExits += pid.localId - } - } - - // assume from.node == this.node - private def unlinkFromLocal(from: RemotePid, to: RemotePid): Unit = - links.get(from.localId) match { - case None => - // has no links -> ignore - case Some(set) => - set -= to - if (set.size == 0) links -= from.localId - }; - - /* - unlinks bi-directional link - assume from.node == this.node - */ - def unlink(from: RemotePid, to: RemotePid): Unit = synchronized { - unlinkFromLocal(from, to) - if (to.node == this.node) - unlinkFromLocal(to, from) - else - // (2) send message to NetKernel of "to" to unlink a - // uni-directional link from "to" to "from" - sendToNode(to.node, UnLink(to, from)) - } - - // assume from.node == this.node - private def linkFromLocal(from: RemotePid, to: RemotePid): Unit = - // TODO: send Exit to from if to is invalid - links.get(from.localId) match { - case None => - // from has no links, yet - val linksTo = new HashSet[RemotePid] - linksTo += to - links += from.localId -> linksTo - case Some(set) => - set += to - }; - - /* - creates bi-directional link - assume from.node == this.node - */ - def link(from: RemotePid, to: RemotePid): unit = synchronized { - // (1) create locally a uni-directional link - linkFromLocal(from, to) - if (to.node == this.node) - linkFromLocal(to, from) - else - // (2) send message to NetKernel of "to" to create a - // uni-directional link from "to" to "from" - sendToNode(to.node, Link(to, from)) - } - - // Assume "to" is local. - def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = { - // remove link - unlinkFromLocal(to, from) - exit(to, reason) - } - - val exitMarks = new HashSet[RemotePid] - - /* - If reason is unequal to 'normal then - this will cause all linked processes to - (transitively) terminate abnormally. - - Assume pid is local. - */ - def exit(pid: RemotePid, reason: Symbol): Unit = synchronized { - if (!(exitMarks contains pid)) { - exitMarks += pid // mark pid as exiting - //Console.println("" + pid + " is exiting (" + reason + ").") - - // first look-up remote actor in rtable - val actor = rtable(pid.localId) - // remove from table of running processes - rtable -= pid.localId - // remove from pid table - pidTable -= actor - - // send exit signals to linked processes - links.get(pid.localId) match { - case None => - //Console.println("no linked processes") - - case Some(set) => // set of remote pids that we want to terminate - //Console.println("sending exit signals to linked processes") - - val iter = set.elements - while (iter.hasNext) { - val linkedPid = iter.next - - unlinkFromLocal(pid, linkedPid) - - if (linkedPid.node == this.node) { - unlinkFromLocal(linkedPid, pid) - - if (trapExits.contains(linkedPid.localId)) - localSend(linkedPid, Exit1(pid, linkedPid, reason)) - else if (!reason.name.equals("normal")) - exit(linkedPid, reason) + val senderProxy = new Reactor { + override def run() = { a ! msg } + override def !(msg: Any): Unit = { + msg match { + case refmsg: AnyRef => { + Debug.info("sending " + msg + " to " + senderName + " on " + senderNode) + namedSend(senderNode, receiver, senderName, refmsg) + } + } + } + override def !?(msg: Any): Any = + error("!? not implemented for remote actors.") } - else - sendToNode(linkedPid.node, - Exit1(pid, linkedPid, reason)) + senderProxy.start() } - exitMarks -= pid - } - } - } - - private val monNodes = - new HashMap[Node,HashMap[RemotePid,Int]] - - def monitorNode(client: RemotePid, mnode: Node, cond: Boolean) = synchronized { - monNodes.get(mnode) match { - case None => - // nobody is monitoring this node - if (cond) { - val map = new HashMap[RemotePid,int] - map += client -> 1 - monNodes += mnode -> map + case None => // message is lost + Console.println("" + receiver + " not registered in NetKernel.") } - case Some(map) => - map.update(client, map(client) + (if (cond) 1 else -1)) - } - - // if no connection exists: - // try connecting, if it fails deliver nodedown msg - if (cond && !service.isConnected(mnode)) { - try { - service.connect(mnode) - } - catch { - case uhe: UnknownHostException => - nodeDown(mnode) - case ioe: IOException => - nodeDown(mnode) - case se: SecurityException => - nodeDown(mnode) - } } } - def nodeDown(mnode: Node) = - // send NodeDown msg to registered RemotePids - monNodes.get(mnode) match { - case None => - // nobody is monitoring this node - case Some(map) => - // iterate over keys (RemotePids of interested clients) - val iter = map.keys - while (iter.hasNext) { - val client = iter.next - for (val i <- List.range(0, map(client))) { - // send nodedown msg - client ! Pair(NodeDown(), mnode) - } - } - } + private val actors = new HashMap[Symbol, Actor] + private val names = new HashMap[Actor, Symbol] + private[nactors] def register(name: Symbol, a: Actor): Unit = synchronized { + Debug.info("registering " + a + " as " + name) + actors += name -> a + names += a -> name + } } diff --git a/src/actors/scala/actors/distributed/Node.scala b/src/actors/scala/actors/distributed/Node.scala index 09ecd5dc95..d70e9779d4 100644 --- a/src/actors/scala/actors/distributed/Node.scala +++ b/src/actors/scala/actors/distributed/Node.scala @@ -1,16 +1,7 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed [serializable] abstract class Node; -[serializable] case class TcpNode(address: String, port: Int) extends Node -[serializable] case class JXTANode(name: String) extends Node +[serializable] case class TcpNode(address: String, port: Int) extends Node; +[serializable] case class JxtaNode(group: String) extends Node; diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala index bdab8be7d1..527aad83dd 100644 --- a/src/actors/scala/actors/distributed/Serializer.scala +++ b/src/actors/scala/actors/distributed/Serializer.scala @@ -1,22 +1,10 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed import java.io.{DataInputStream,DataOutputStream,EOFException} import scala.io.BytePickle.SPU -/** - * @author Philipp Haller - */ -abstract class Serializer(s: Service) { +abstract class Serializer(val service: Service) { def serialize(o: AnyRef/*, w: Writer*/): Array[byte] def deserialize(a: Array[byte]/*r: Reader*/): AnyRef @@ -54,8 +42,4 @@ abstract class Serializer(s: Service) { val bytes = serialize(obj) writeBytes(outputStream, bytes) } - - def pid: SPU[RemotePid] - def service = s - def addRep(name: String, repCons: Serializer => AnyRef): unit } diff --git a/src/actors/scala/actors/distributed/Service.scala b/src/actors/scala/actors/distributed/Service.scala index f5164751b3..e404d3a73d 100644 --- a/src/actors/scala/actors/distributed/Service.scala +++ b/src/actors/scala/actors/distributed/Service.scala @@ -1,83 +1,23 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed import java.io.StringWriter -/** - * @author Philipp Haller - */ trait Service { - val serializer: Serializer - def node: Node - def createPid(actor: RemoteActor): RemotePid - def send(node: Node, data: Array[byte]): Unit - def connect(node: Node): Unit // non blocking. - def disconnectNode(node: Node): Unit - def isConnected(node: Node): Boolean - - //blocking. timeout depends on Implementation. - def isReachable(node: Node): Boolean - - def getRoundTripTimeMillis(node: Node): Long //blocking - - def nodes:List[Node] - -// implemented parts: - private val kern = new NetKernel(this) def kernel = kern - def spawn(name: String): RemotePid = - kern spawn name - - def spawn(name: String, arg: RemotePid): RemotePid = - kern.spawn(name, arg) - - - //suggested addition by seb - def spawn(fun: RemoteActor => unit): RemotePid = - kernel.spawn(fun); - def spawn(a:RemoteActor):RemotePid = { - //Console.println("Service:spawn(RemoteActor)") - val pid = kernel.register(a) - //Console.println("RemoteActor("+a+") registered in kernel") - a.start - //Console.println("RemoteActor("+a+") started") - pid - } - - def send(pid: RemotePid, msg: AnyRef): unit = synchronized { - if (pid.node == this.node) - kernel.localSend(pid, msg) - else - kernel.remoteSend(pid, msg) - } - - def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized { - //Console.println("Service: Sending " + msg + " to " + pid) - // lets try to serialize the message - //val sw = new StringWriter - //serializer.serialize(msg, sw) - val bytes = serializer.serialize(msg) - //val sendMsg = Send(pid, sw.toString()) - val sendMsg = Send(pid, bytes) - //val sw2 = new StringWriter - //serializer.serialize(sendMsg, sw2) - //send(pid.node, sw2.toString()) - val bytes2 = serializer.serialize(sendMsg) - send(pid.node, bytes2) - } + val serializer: Serializer + def node: Node - private var idCnt = 0 - def makeUid = { idCnt = idCnt + 1; idCnt } + def send(node: Node, data: Array[byte]): Unit +/* + def connect(node: Node): Unit // non-blocking + def disconnectNode(node: Node): Unit + def isConnected(node: Node): Boolean + def isReachable(node: Node): Boolean // blocking + def getRoundTripTimeMillis(node: Node): Long // blocking + def nodes: List[Node] +*/ } diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala index a1efa01d9b..f1ea43ad27 100644 --- a/src/actors/scala/actors/distributed/TcpService.scala +++ b/src/actors/scala/actors/distributed/TcpService.scala @@ -1,21 +1,9 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed import java.io.IOException import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException} -/** - * @author Philipp Haller - */ object TcpService { val random = new java.util.Random(System.currentTimeMillis()) @@ -38,26 +26,12 @@ object TcpService { } } -object TestPorts { - def main(args: Array[String]): Unit = { - val random = new java.util.Random(System.currentTimeMillis()) - val socket = new ServerSocket(8000 + random.nextInt(500)) - Console.println(TcpService.generatePort) - } -} - -/** - * @author Philipp Haller - */ class TcpService(port: Int) extends Thread with Service { val serializer: JavaSerializer = new JavaSerializer(this) - private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port); + private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port) def node: TcpNode = internalNode - def createPid(actor: RemoteActor): RemotePid = - new TcpPid(internalNode, makeUid, kernel, actor) - def send(node: Node, data: String): unit = synchronized { // retrieve worker thread (if any) that already has connection node match { @@ -66,7 +40,7 @@ class TcpService(port: Int) extends Thread with Service { case None => // we are not connected, yet Console.println("We are not connected, yet."); - val newWorker = connect(tnode); //bad in a sync BLOCK!!! + val newWorker = connect(tnode); // bad in a sync block newWorker transmit data case Some(worker) => worker transmit data } @@ -120,7 +94,7 @@ class TcpService(port: Int) extends Thread with Service { // connection management private val connections = - new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker] + new scala.collection.mutable.HashMap[TcpNode, TcpServiceWorker] def nodes: List[Node] = throw new Exception ("nodes need to be implemented in TcpService!") @@ -155,7 +129,7 @@ class TcpService(port: Int) extends Thread with Service { Console.println("Connected.") // spawn new worker thread val worker = new TcpServiceWorker(this, sock) - worker.sendNode; + worker.sendNode(n); // start worker thread worker.start() // register locally (we want to reuse connections which correspond to connected sockets) @@ -200,7 +174,7 @@ class TcpService(port: Int) extends Thread with Service { def getRoundTripTimeMillis(node: Node): Long = 0 def nodeDown(mnode: TcpNode): Unit = synchronized { - kernel nodeDown mnode + //kernel nodeDown mnode connections -= mnode } diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala index 2587319fdb..36b0cfe14f 100644 --- a/src/actors/scala/actors/distributed/TcpServiceWorker.scala +++ b/src/actors/scala/actors/distributed/TcpServiceWorker.scala @@ -1,21 +1,9 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ -// $Id$ - -package scala.actors.distributed +package nactors.distributed import java.io._ import java.net.Socket -/** - * @author Philipp Haller - */ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { val in = so.getInputStream() val out = so.getOutputStream() @@ -29,10 +17,12 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { val log = new Debug("TcpServiceWorker") log.level = 2 +/* def transmit(msg: Send): Unit = synchronized { val data = parent.serializer.serialize(msg) transmit(data) } +*/ def transmit(data: String): Unit = synchronized { log.info("Transmitting " + data) @@ -47,7 +37,8 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { dataout.flush() } - def sendNode = { + def sendNode(n: TcpNode) = { + connectedNode = n scala.Console.println("Sending our name " + parent.node) parent.serializer.writeObject(dataout, parent.node) } @@ -69,6 +60,7 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { } var running = true + def halt = synchronized { so.close() // close socket running = false // stop @@ -81,8 +73,8 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { log.info("deserializing..."); //val msg = parent.serializer.deserialize(reader); val msg = parent.serializer.readObject(datain); - log.info("Received object: " + msg); - parent.kernel.processMsg(msg) + log.info("Received object: " + msg + " from " + connectedNode); + parent.kernel.processMsg(connectedNode, msg) } } } |