diff options
Diffstat (limited to 'src/actors/scala/actors/remote')
-rw-r--r-- | src/actors/scala/actors/remote/FreshNameCreator.scala | 36 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/JavaSerializer.scala | 63 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/NetKernel.scala | 147 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/Proxy.scala | 190 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/RemoteActor.scala | 132 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/Serializer.scala | 58 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/Service.scala | 24 | ||||
-rw-r--r-- | src/actors/scala/actors/remote/TcpService.scala | 292 |
8 files changed, 0 insertions, 942 deletions
diff --git a/src/actors/scala/actors/remote/FreshNameCreator.scala b/src/actors/scala/actors/remote/FreshNameCreator.scala deleted file mode 100644 index f7cf29387e..0000000000 --- a/src/actors/scala/actors/remote/FreshNameCreator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package remote - -object FreshNameCreator { - - protected var counter = 0 - protected val counters = new scala.collection.mutable.HashMap[String, Int] - - /** - * Create a fresh name with the given prefix. It is guaranteed - * that the returned name has never been returned by a previous - * call to this function (provided the prefix does not end in a digit). - */ - def newName(prefix: String): Symbol = { - val count = counters.get(prefix) match { - case Some(last) => last + 1 - case None => 0 - } - counters.update(prefix, count) - Symbol(prefix + count) - } - - def newName(): Symbol = { - counter += 1 - Symbol("$" + counter + "$") - } -} diff --git a/src/actors/scala/actors/remote/JavaSerializer.scala b/src/actors/scala/actors/remote/JavaSerializer.scala deleted file mode 100644 index 7549bbf429..0000000000 --- a/src/actors/scala/actors/remote/JavaSerializer.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package remote - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, - ObjectInputStream, ObjectOutputStream, InputStream, - ObjectStreamClass} - -/** - * @author Guy Oliver - */ -private[remote] class CustomObjectInputStream(in: InputStream, cl: ClassLoader) -extends ObjectInputStream(in) { - override def resolveClass(cd: ObjectStreamClass): Class[_] = - try { - cl.loadClass(cd.getName()) - } catch { - case cnf: ClassNotFoundException => - super.resolveClass(cd) - } - override def resolveProxyClass(interfaces: Array[String]): Class[_] = - try { - val ifaces = interfaces map { iface => cl.loadClass(iface) } - java.lang.reflect.Proxy.getProxyClass(cl, ifaces: _*) - } catch { - case e: ClassNotFoundException => - super.resolveProxyClass(interfaces) - } -} - -/** - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class JavaSerializer(serv: Service, cl: ClassLoader) extends Serializer(serv) { - def serialize(o: AnyRef): Array[Byte] = { - val bos = new ByteArrayOutputStream() - val out = new ObjectOutputStream(bos) - out.writeObject(o) - out.flush() - bos.toByteArray() - } - - def deserialize(bytes: Array[Byte]): AnyRef = { - val bis = new ByteArrayInputStream(bytes) - - // use custom stream only if cl != null - val in = if (cl != null) - new CustomObjectInputStream(bis, cl) - else - new ObjectInputStream(bis) - - in.readObject() - } -} diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala deleted file mode 100644 index 57d7af6d26..0000000000 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package remote - -import scala.collection.mutable - -case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol) - -case class RemoteApply0(senderLoc: Locator, receiverLoc: Locator, rfun: Function2[AbstractActor, Proxy, Unit]) -case class LocalApply0(rfun: Function2[AbstractActor, Proxy, Unit], a: AbstractActor) - -case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol) -case object Terminate - -case class Locator(node: Node, name: Symbol) - -/** - * @version 0.9.17 - * @author Philipp Haller - */ -private[remote] class NetKernel(service: Service) { - - def sendToNode(node: Node, msg: AnyRef) = { - val bytes = service.serializer.serialize(msg) - service.send(node, bytes) - } - - def namedSend(senderLoc: Locator, receiverLoc: Locator, - msg: AnyRef, session: Symbol) { - val bytes = service.serializer.serialize(msg) - sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session)) - } - - private val actors = new mutable.HashMap[Symbol, OutputChannel[Any]] - private val names = new mutable.HashMap[OutputChannel[Any], Symbol] - - def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized { - actors(name) = a - names(a) = name - } - - def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match { - case None => - val freshName = FreshNameCreator.newName("remotesender") - register(freshName, from) - freshName - case Some(name) => - name - } - - def send(node: Node, name: Symbol, msg: AnyRef): Unit = - send(node, name, msg, 'nosession) - - def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) { - val senderLoc = Locator(service.node, getOrCreateName(Actor.self(Scheduler))) - val receiverLoc = Locator(node, name) - namedSend(senderLoc, receiverLoc, msg, session) - } - - def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) { - val senderLoc = Locator(service.node, getOrCreateName(from)) - val receiverLoc = Locator(node, name) - namedSend(senderLoc, receiverLoc, msg, session) - } - - def remoteApply(node: Node, name: Symbol, from: OutputChannel[Any], rfun: Function2[AbstractActor, Proxy, Unit]) { - val senderLoc = Locator(service.node, getOrCreateName(from)) - val receiverLoc = Locator(node, name) - sendToNode(receiverLoc.node, RemoteApply0(senderLoc, receiverLoc, rfun)) - } - - def createProxy(node: Node, sym: Symbol): Proxy = { - val p = new Proxy(node, sym, this) - proxies((node, sym)) = p - p - } - - val proxies = new mutable.HashMap[(Node, Symbol), Proxy] - - def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy = - proxies.synchronized { - proxies.get((senderNode, senderName)) match { - case Some(senderProxy) => senderProxy - case None => createProxy(senderNode, senderName) - } - } - - /* Register proxy if no other proxy has been registered. - */ - def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit = - proxies.synchronized { - proxies.get((senderNode, senderName)) match { - case Some(senderProxy) => // do nothing - case None => proxies((senderNode, senderName)) = p - } - } - - def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized { - msg match { - case cmd@RemoteApply0(senderLoc, receiverLoc, rfun) => - Debug.info(this+": processing "+cmd) - actors.get(receiverLoc.name) match { - case Some(a) => - val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) - senderProxy.send(LocalApply0(rfun, a.asInstanceOf[AbstractActor]), null) - - case None => - // message is lost - Debug.info(this+": lost message") - } - - case cmd@NamedSend(senderLoc, receiverLoc, data, session) => - Debug.info(this+": processing "+cmd) - actors.get(receiverLoc.name) match { - case Some(a) => - try { - val msg = service.serializer.deserialize(data) - val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name) - senderProxy.send(SendTo(a, msg, session), null) - } catch { - case e: Exception => - Debug.error(this+": caught "+e) - } - - case None => - // message is lost - Debug.info(this+": lost message") - } - } - } - - def terminate() { - // tell all proxies to terminate - proxies.values foreach { _.send(Terminate, null) } - - // tell service to terminate - service.terminate() - } -} diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala deleted file mode 100644 index 2cb03544f2..0000000000 --- a/src/actors/scala/actors/remote/Proxy.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package remote - -import scala.collection.mutable - -/** - * @author Philipp Haller - */ -private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor with Serializable { - import java.io.{IOException, ObjectOutputStream, ObjectInputStream} - - type Future[+P] = scala.actors.Future[P] - - @transient - private[remote] var del: Actor = null - startDelegate() - - @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) { - out.defaultWriteObject() - } - - @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) { - in.defaultReadObject() - setupKernel() - startDelegate() - } - - private def startDelegate() { - del = new DelegateActor(this, node, name, kernel) - del.start() - } - - private def setupKernel() { - kernel = RemoteActor.someNetKernel - kernel.registerProxy(node, name, this) - } - - def !(msg: Any): Unit = - del ! msg - - def send(msg: Any, replyCh: OutputChannel[Any]): Unit = - del.send(msg, replyCh) - - def forward(msg: Any): Unit = - del.forward(msg) - - def receiver: Actor = - del - - def !?(msg: Any): Any = - del !? msg - - def !?(msec: Long, msg: Any): Option[Any] = - del !? (msec, msg) - - def !!(msg: Any): Future[Any] = - del !! msg - - def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = - del !! (msg, f) - - def linkTo(to: AbstractActor): Unit = - del ! Apply0(new LinkToFun) - - def unlinkFrom(from: AbstractActor): Unit = - del ! Apply0(new UnlinkFromFun) - - def exit(from: AbstractActor, reason: AnyRef): Unit = - del ! Apply0(new ExitFun(reason)) - - override def toString() = - name+"@"+node -} - -// Proxy is private[remote], but these classes are public and use it in a public -// method signature. That makes the only method they have non-overridable. -// So I made them final, which seems appropriate anyway. - -final class LinkToFun extends Function2[AbstractActor, Proxy, Unit] with Serializable { - def apply(target: AbstractActor, creator: Proxy) { - target.linkTo(creator) - } - override def toString = - "<LinkToFun>" -} - -final class UnlinkFromFun extends Function2[AbstractActor, Proxy, Unit] with Serializable { - def apply(target: AbstractActor, creator: Proxy) { - target.unlinkFrom(creator) - } - override def toString = - "<UnlinkFromFun>" -} - -final class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] with Serializable { - def apply(target: AbstractActor, creator: Proxy) { - target.exit(creator, reason) - } - override def toString = - "<ExitFun>("+reason.toString+")" -} - -private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit]) - -/** - * @author Philipp Haller - */ -private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor { - var channelMap = new mutable.HashMap[Symbol, OutputChannel[Any]] - var sessionMap = new mutable.HashMap[OutputChannel[_], Symbol] - - def act() { - Actor.loop { - react { - case cmd@Apply0(rfun) => - kernel.remoteApply(node, name, sender, rfun) - - case cmd@LocalApply0(rfun, target) => - rfun(target, creator) - - // Request from remote proxy. - // `this` is local proxy. - case cmd@SendTo(out, msg, session) => - if (session.name == "nosession") { - // local send - out.send(msg, this) - } else { - // is this an active session? - channelMap.get(session) match { - case None => - // create a new reply channel... - val replyCh = new Channel[Any](this) - // ...that maps to session - sessionMap(replyCh) = session - // local send - out.send(msg, replyCh) - - // finishes request-reply cycle - case Some(replyCh) => - channelMap -= session - replyCh ! msg - } - } - - case cmd@Terminate => - exit() - - // local proxy receives response to - // reply channel - case ch ! resp => - // lookup session ID - sessionMap.get(ch) match { - case Some(sid) => - sessionMap -= ch - val msg = resp.asInstanceOf[AnyRef] - // send back response - kernel.forward(sender, node, name, msg, sid) - - case None => - Debug.info(this+": cannot find session for "+ch) - } - - // remote proxy receives request - case msg: AnyRef => - // find out whether it's a synchronous send - if (sender.getClass.toString.contains("Channel")) { - // create fresh session ID... - val fresh = FreshNameCreator.newName(node+"@"+name) - // ...that maps to reply channel - channelMap(fresh) = sender - kernel.forward(sender, node, name, msg, fresh) - } else { - kernel.forward(sender, node, name, msg, 'nosession) - } - } - } - } - -} diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala deleted file mode 100644 index 2daf9ceb43..0000000000 --- a/src/actors/scala/actors/remote/RemoteActor.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - - -package scala.actors -package remote - - -/** - * This object provides methods for creating, registering, and - * selecting remotely accessible actors. - * - * A remote actor is typically created like this: - * {{{ - * actor { - * alive(9010) - * register('myName, self) - * - * // behavior - * } - * }}} - * It can be accessed by an actor running on a (possibly) - * different node by selecting it in the following way: - * {{{ - * actor { - * // ... - * val c = select(Node("127.0.0.1", 9010), 'myName) - * c ! msg - * // ... - * } - * }}} - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -object RemoteActor { - - private val kernels = new scala.collection.mutable.HashMap[InternalActor, NetKernel] - - /* If set to <code>null</code> (default), the default class loader - * of <code>java.io.ObjectInputStream</code> is used for deserializing - * objects sent as messages. - */ - private var cl: ClassLoader = null - - def classLoader: ClassLoader = cl - def classLoader_=(x: ClassLoader) { cl = x } - - /** - * Makes <code>self</code> remotely accessible on TCP port - * <code>port</code>. - */ - def alive(port: Int): Unit = synchronized { - createNetKernelOnPort(port) - } - - private def createNetKernelOnPort(port: Int): NetKernel = { - val serv = TcpService(port, cl) - val kern = serv.kernel - val s = Actor.self(Scheduler) - kernels(s) = kern - - s.onTerminate { - Debug.info("alive actor "+s+" terminated") - // remove mapping for `s` - kernels -= s - // terminate `kern` when it does - // not appear as value any more - if (!kernels.valuesIterator.contains(kern)) { - Debug.info("terminating "+kern) - // terminate NetKernel - kern.terminate() - } - } - - kern - } - - /** - * Registers <code>a</code> under <code>name</code> on this - * node. - */ - def register(name: Symbol, a: Actor): Unit = synchronized { - val kernel = kernels.get(Actor.self(Scheduler)) match { - case None => - val serv = TcpService(TcpService.generatePort, cl) - kernels(Actor.self(Scheduler)) = serv.kernel - serv.kernel - case Some(k) => - k - } - kernel.register(name, a) - } - - private def selfKernel = kernels.get(Actor.self(Scheduler)) match { - case None => - // establish remotely accessible - // return path (sender) - createNetKernelOnPort(TcpService.generatePort) - case Some(k) => - k - } - - /** - * Returns (a proxy for) the actor registered under - * <code>name</code> on <code>node</code>. - */ - def select(node: Node, sym: Symbol): AbstractActor = synchronized { - selfKernel.getOrCreateProxy(node, sym) - } - - private[remote] def someNetKernel: NetKernel = - kernels.valuesIterator.next -} - - -/** - * This class represents a machine node on a TCP network. - * - * @param address the host name, or <code>null</code> for the loopback address. - * @param port the port number. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -case class Node(address: String, port: Int) diff --git a/src/actors/scala/actors/remote/Serializer.scala b/src/actors/scala/actors/remote/Serializer.scala deleted file mode 100644 index 7be4aa6583..0000000000 --- a/src/actors/scala/actors/remote/Serializer.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - - -package scala.actors -package remote - - -import java.lang.ClassNotFoundException - -import java.io.{DataInputStream, DataOutputStream, EOFException, IOException} - -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -abstract class Serializer(val service: Service) { - def serialize(o: AnyRef): Array[Byte] - def deserialize(a: Array[Byte]): AnyRef - - @throws(classOf[IOException]) - private def readBytes(inputStream: DataInputStream): Array[Byte] = { - try { - val length = inputStream.readInt() - val bytes = new Array[Byte](length) - inputStream.readFully(bytes, 0, length) - bytes - } - catch { - case npe: NullPointerException => - throw new EOFException("Connection closed.") - } - } - - @throws(classOf[IOException]) @throws(classOf[ClassNotFoundException]) - def readObject(inputStream: DataInputStream): AnyRef = { - val bytes = readBytes(inputStream) - deserialize(bytes) - } - - @throws(classOf[IOException]) - private def writeBytes(outputStream: DataOutputStream, bytes: Array[Byte]) { - val length = bytes.length; - // original length - outputStream.writeInt(length) - outputStream.write(bytes, 0, length) - outputStream.flush() - } - - @throws(classOf[IOException]) - def writeObject(outputStream: DataOutputStream, obj: AnyRef) { - val bytes = serialize(obj) - writeBytes(outputStream, bytes) - } -} diff --git a/src/actors/scala/actors/remote/Service.scala b/src/actors/scala/actors/remote/Service.scala deleted file mode 100644 index d102df1970..0000000000 --- a/src/actors/scala/actors/remote/Service.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package remote - -/** - * @version 0.9.10 - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -trait Service { - val kernel = new NetKernel(this) - val serializer: Serializer - def node: Node - def send(node: Node, data: Array[Byte]): Unit - def terminate(): Unit -} diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala deleted file mode 100644 index 69e5c46c52..0000000000 --- a/src/actors/scala/actors/remote/TcpService.scala +++ /dev/null @@ -1,292 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - - -package scala.actors -package remote - - -import java.io.{DataInputStream, DataOutputStream, IOException} -import java.lang.{Thread, SecurityException} -import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketTimeoutException, UnknownHostException} - -import scala.collection.mutable -import scala.util.Random - -/* Object TcpService. - * - * @version 0.9.9 - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -object TcpService { - private val random = new Random - private val ports = new mutable.HashMap[Int, TcpService] - - def apply(port: Int, cl: ClassLoader): TcpService = - ports.get(port) match { - case Some(service) => - service - case None => - val service = new TcpService(port, cl) - ports(port) = service - service.start() - Debug.info("created service at "+service.node) - service - } - - def generatePort: Int = { - var portnum = 0 - try { - portnum = 8000 + random.nextInt(500) - val socket = new ServerSocket(portnum) - socket.close() - } - catch { - case ioe: IOException => - // this happens when trying to open a socket twice - // at the same port - // try again - generatePort - case se: SecurityException => - // do nothing - } - portnum - } - - private val connectTimeoutMillis = { - val propName = "scala.actors.tcpSocket.connectTimeoutMillis" - val defaultTimeoutMillis = 0 - sys.props get propName flatMap { - timeout => - try { - val to = timeout.toInt - Debug.info(s"Using socket timeout $to") - Some(to) - } catch { - case e: NumberFormatException => - Debug.warning(s"""Could not parse $propName = "$timeout" as an Int""") - None - } - } getOrElse defaultTimeoutMillis - } - - var BufSize: Int = 65536 -} - -/* Class TcpService. - * - * @version 0.9.10 - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { - val serializer: JavaSerializer = new JavaSerializer(this, cl) - - private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port) - def node: Node = internalNode - - private val pendingSends = new mutable.HashMap[Node, List[Array[Byte]]] - - /** - * Sends a byte array to another node on the network. - * If the node is not yet up, up to `TcpService.BufSize` - * messages are buffered. - */ - def send(node: Node, data: Array[Byte]): Unit = synchronized { - - def bufferMsg(t: Throwable) { - // buffer message, so that it can be re-sent - // when remote net kernel comes up - (pendingSends.get(node): @unchecked) match { - case None => - pendingSends(node) = List(data) - case Some(msgs) if msgs.length < TcpService.BufSize => - pendingSends(node) = data :: msgs - } - } - - // retrieve worker thread (if any) that already has connection - getConnection(node) match { - case None => - // we are not connected, yet - try { - val newWorker = connect(node) - - // any pending sends? - pendingSends.get(node) match { - case None => - // do nothing - case Some(msgs) => - msgs.reverse foreach {newWorker transmit _} - pendingSends -= node - } - - newWorker transmit data - } catch { - case uhe: UnknownHostException => - bufferMsg(uhe) - case ioe: IOException => - bufferMsg(ioe) - case se: SecurityException => - // do nothing - } - case Some(worker) => - worker transmit data - } - } - - def terminate() { - shouldTerminate = true - try { - new Socket(internalNode.address, internalNode.port) - } catch { - case ce: java.net.ConnectException => - Debug.info(this+": caught "+ce) - } - } - - private var shouldTerminate = false - - override def run() { - try { - val socket = new ServerSocket(port) - while (!shouldTerminate) { - Debug.info(this+": waiting for new connection on port "+port+"...") - val nextClient = socket.accept() - if (!shouldTerminate) { - val worker = new TcpServiceWorker(this, nextClient) - Debug.info("Started new "+worker) - worker.readNode - worker.start() - } else - nextClient.close() - } - } catch { - case e: Exception => - Debug.info(this+": caught "+e) - } finally { - Debug.info(this+": shutting down...") - connections foreach { case (_, worker) => worker.halt } - } - } - - // connection management - - private val connections = - new mutable.HashMap[Node, TcpServiceWorker] - - private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized { - connections(node) = worker - } - - def getConnection(n: Node) = synchronized { - connections.get(n) - } - - def isConnected(n: Node): Boolean = synchronized { - !connections.get(n).isEmpty - } - - def connect(n: Node): TcpServiceWorker = synchronized { - val socket = new Socket() - val start = System.nanoTime - try { - socket.connect(new InetSocketAddress(n.address, n.port), TcpService.connectTimeoutMillis) - } catch { - case e: SocketTimeoutException => - Debug.warning(f"Timed out connecting to $n after ${(System.nanoTime - start) / math.pow(10, 9)}%.3f seconds") - throw e - } - val worker = new TcpServiceWorker(this, socket) - worker.sendNode(n) - worker.start() - addConnection(n, worker) - worker - } - - def disconnectNode(n: Node) = synchronized { - connections.get(n) match { - case None => - // do nothing - case Some(worker) => - connections -= n - worker.halt - } - } - - def isReachable(node: Node): Boolean = - if (isConnected(node)) true - else try { - connect(node) - return true - } catch { - case uhe: UnknownHostException => false - case ioe: IOException => false - case se: SecurityException => false - } - - def nodeDown(mnode: Node): Unit = synchronized { - connections -= mnode - } -} - - -private[actors] class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { - val datain = new DataInputStream(so.getInputStream) - val dataout = new DataOutputStream(so.getOutputStream) - - var connectedNode: Node = _ - - def sendNode(n: Node) { - connectedNode = n - parent.serializer.writeObject(dataout, parent.node) - } - - def readNode() { - val node = parent.serializer.readObject(datain) - node match { - case n: Node => - connectedNode = n - parent.addConnection(n, this) - } - } - - def transmit(data: Array[Byte]): Unit = synchronized { - Debug.info(this+": transmitting data...") - dataout.writeInt(data.length) - dataout.write(data) - dataout.flush() - } - - var running = true - - def halt() = synchronized { - so.close() - running = false - } - - override def run() { - try { - while (running) { - val msg = parent.serializer.readObject(datain); - parent.kernel.processMsg(connectedNode, msg) - } - } - catch { - case ioe: IOException => - Debug.info(this+": caught "+ioe) - parent nodeDown connectedNode - case e: Exception => - Debug.info(this+": caught "+e) - parent nodeDown connectedNode - } - Debug.info(this+": service terminated at "+parent.node) - } -} |