summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/remote
diff options
context:
space:
mode:
Diffstat (limited to 'src/actors/scala/actors/remote')
-rw-r--r--src/actors/scala/actors/remote/FreshNameCreator.scala36
-rw-r--r--src/actors/scala/actors/remote/JavaSerializer.scala63
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala147
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala190
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala132
-rw-r--r--src/actors/scala/actors/remote/Serializer.scala58
-rw-r--r--src/actors/scala/actors/remote/Service.scala24
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala292
8 files changed, 0 insertions, 942 deletions
diff --git a/src/actors/scala/actors/remote/FreshNameCreator.scala b/src/actors/scala/actors/remote/FreshNameCreator.scala
deleted file mode 100644
index f7cf29387e..0000000000
--- a/src/actors/scala/actors/remote/FreshNameCreator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-package scala.actors
-package remote
-
-object FreshNameCreator {
-
- protected var counter = 0
- protected val counters = new scala.collection.mutable.HashMap[String, Int]
-
- /**
- * Create a fresh name with the given prefix. It is guaranteed
- * that the returned name has never been returned by a previous
- * call to this function (provided the prefix does not end in a digit).
- */
- def newName(prefix: String): Symbol = {
- val count = counters.get(prefix) match {
- case Some(last) => last + 1
- case None => 0
- }
- counters.update(prefix, count)
- Symbol(prefix + count)
- }
-
- def newName(): Symbol = {
- counter += 1
- Symbol("$" + counter + "$")
- }
-}
diff --git a/src/actors/scala/actors/remote/JavaSerializer.scala b/src/actors/scala/actors/remote/JavaSerializer.scala
deleted file mode 100644
index 7549bbf429..0000000000
--- a/src/actors/scala/actors/remote/JavaSerializer.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-package scala.actors
-package remote
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
- ObjectInputStream, ObjectOutputStream, InputStream,
- ObjectStreamClass}
-
-/**
- * @author Guy Oliver
- */
-private[remote] class CustomObjectInputStream(in: InputStream, cl: ClassLoader)
-extends ObjectInputStream(in) {
- override def resolveClass(cd: ObjectStreamClass): Class[_] =
- try {
- cl.loadClass(cd.getName())
- } catch {
- case cnf: ClassNotFoundException =>
- super.resolveClass(cd)
- }
- override def resolveProxyClass(interfaces: Array[String]): Class[_] =
- try {
- val ifaces = interfaces map { iface => cl.loadClass(iface) }
- java.lang.reflect.Proxy.getProxyClass(cl, ifaces: _*)
- } catch {
- case e: ClassNotFoundException =>
- super.resolveProxyClass(interfaces)
- }
-}
-
-/**
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-class JavaSerializer(serv: Service, cl: ClassLoader) extends Serializer(serv) {
- def serialize(o: AnyRef): Array[Byte] = {
- val bos = new ByteArrayOutputStream()
- val out = new ObjectOutputStream(bos)
- out.writeObject(o)
- out.flush()
- bos.toByteArray()
- }
-
- def deserialize(bytes: Array[Byte]): AnyRef = {
- val bis = new ByteArrayInputStream(bytes)
-
- // use custom stream only if cl != null
- val in = if (cl != null)
- new CustomObjectInputStream(bis, cl)
- else
- new ObjectInputStream(bis)
-
- in.readObject()
- }
-}
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
deleted file mode 100644
index 57d7af6d26..0000000000
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-package scala.actors
-package remote
-
-import scala.collection.mutable
-
-case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)
-
-case class RemoteApply0(senderLoc: Locator, receiverLoc: Locator, rfun: Function2[AbstractActor, Proxy, Unit])
-case class LocalApply0(rfun: Function2[AbstractActor, Proxy, Unit], a: AbstractActor)
-
-case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
-case object Terminate
-
-case class Locator(node: Node, name: Symbol)
-
-/**
- * @version 0.9.17
- * @author Philipp Haller
- */
-private[remote] class NetKernel(service: Service) {
-
- def sendToNode(node: Node, msg: AnyRef) = {
- val bytes = service.serializer.serialize(msg)
- service.send(node, bytes)
- }
-
- def namedSend(senderLoc: Locator, receiverLoc: Locator,
- msg: AnyRef, session: Symbol) {
- val bytes = service.serializer.serialize(msg)
- sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session))
- }
-
- private val actors = new mutable.HashMap[Symbol, OutputChannel[Any]]
- private val names = new mutable.HashMap[OutputChannel[Any], Symbol]
-
- def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized {
- actors(name) = a
- names(a) = name
- }
-
- def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match {
- case None =>
- val freshName = FreshNameCreator.newName("remotesender")
- register(freshName, from)
- freshName
- case Some(name) =>
- name
- }
-
- def send(node: Node, name: Symbol, msg: AnyRef): Unit =
- send(node, name, msg, 'nosession)
-
- def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
- val senderLoc = Locator(service.node, getOrCreateName(Actor.self(Scheduler)))
- val receiverLoc = Locator(node, name)
- namedSend(senderLoc, receiverLoc, msg, session)
- }
-
- def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
- val senderLoc = Locator(service.node, getOrCreateName(from))
- val receiverLoc = Locator(node, name)
- namedSend(senderLoc, receiverLoc, msg, session)
- }
-
- def remoteApply(node: Node, name: Symbol, from: OutputChannel[Any], rfun: Function2[AbstractActor, Proxy, Unit]) {
- val senderLoc = Locator(service.node, getOrCreateName(from))
- val receiverLoc = Locator(node, name)
- sendToNode(receiverLoc.node, RemoteApply0(senderLoc, receiverLoc, rfun))
- }
-
- def createProxy(node: Node, sym: Symbol): Proxy = {
- val p = new Proxy(node, sym, this)
- proxies((node, sym)) = p
- p
- }
-
- val proxies = new mutable.HashMap[(Node, Symbol), Proxy]
-
- def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy =
- proxies.synchronized {
- proxies.get((senderNode, senderName)) match {
- case Some(senderProxy) => senderProxy
- case None => createProxy(senderNode, senderName)
- }
- }
-
- /* Register proxy if no other proxy has been registered.
- */
- def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit =
- proxies.synchronized {
- proxies.get((senderNode, senderName)) match {
- case Some(senderProxy) => // do nothing
- case None => proxies((senderNode, senderName)) = p
- }
- }
-
- def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
- msg match {
- case cmd@RemoteApply0(senderLoc, receiverLoc, rfun) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiverLoc.name) match {
- case Some(a) =>
- val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(LocalApply0(rfun, a.asInstanceOf[AbstractActor]), null)
-
- case None =>
- // message is lost
- Debug.info(this+": lost message")
- }
-
- case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiverLoc.name) match {
- case Some(a) =>
- try {
- val msg = service.serializer.deserialize(data)
- val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(SendTo(a, msg, session), null)
- } catch {
- case e: Exception =>
- Debug.error(this+": caught "+e)
- }
-
- case None =>
- // message is lost
- Debug.info(this+": lost message")
- }
- }
- }
-
- def terminate() {
- // tell all proxies to terminate
- proxies.values foreach { _.send(Terminate, null) }
-
- // tell service to terminate
- service.terminate()
- }
-}
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
deleted file mode 100644
index 2cb03544f2..0000000000
--- a/src/actors/scala/actors/remote/Proxy.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-package scala.actors
-package remote
-
-import scala.collection.mutable
-
-/**
- * @author Philipp Haller
- */
-private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor with Serializable {
- import java.io.{IOException, ObjectOutputStream, ObjectInputStream}
-
- type Future[+P] = scala.actors.Future[P]
-
- @transient
- private[remote] var del: Actor = null
- startDelegate()
-
- @throws(classOf[IOException])
- private def writeObject(out: ObjectOutputStream) {
- out.defaultWriteObject()
- }
-
- @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException])
- private def readObject(in: ObjectInputStream) {
- in.defaultReadObject()
- setupKernel()
- startDelegate()
- }
-
- private def startDelegate() {
- del = new DelegateActor(this, node, name, kernel)
- del.start()
- }
-
- private def setupKernel() {
- kernel = RemoteActor.someNetKernel
- kernel.registerProxy(node, name, this)
- }
-
- def !(msg: Any): Unit =
- del ! msg
-
- def send(msg: Any, replyCh: OutputChannel[Any]): Unit =
- del.send(msg, replyCh)
-
- def forward(msg: Any): Unit =
- del.forward(msg)
-
- def receiver: Actor =
- del
-
- def !?(msg: Any): Any =
- del !? msg
-
- def !?(msec: Long, msg: Any): Option[Any] =
- del !? (msec, msg)
-
- def !!(msg: Any): Future[Any] =
- del !! msg
-
- def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] =
- del !! (msg, f)
-
- def linkTo(to: AbstractActor): Unit =
- del ! Apply0(new LinkToFun)
-
- def unlinkFrom(from: AbstractActor): Unit =
- del ! Apply0(new UnlinkFromFun)
-
- def exit(from: AbstractActor, reason: AnyRef): Unit =
- del ! Apply0(new ExitFun(reason))
-
- override def toString() =
- name+"@"+node
-}
-
-// Proxy is private[remote], but these classes are public and use it in a public
-// method signature. That makes the only method they have non-overridable.
-// So I made them final, which seems appropriate anyway.
-
-final class LinkToFun extends Function2[AbstractActor, Proxy, Unit] with Serializable {
- def apply(target: AbstractActor, creator: Proxy) {
- target.linkTo(creator)
- }
- override def toString =
- "<LinkToFun>"
-}
-
-final class UnlinkFromFun extends Function2[AbstractActor, Proxy, Unit] with Serializable {
- def apply(target: AbstractActor, creator: Proxy) {
- target.unlinkFrom(creator)
- }
- override def toString =
- "<UnlinkFromFun>"
-}
-
-final class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] with Serializable {
- def apply(target: AbstractActor, creator: Proxy) {
- target.exit(creator, reason)
- }
- override def toString =
- "<ExitFun>("+reason.toString+")"
-}
-
-private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit])
-
-/**
- * @author Philipp Haller
- */
-private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor {
- var channelMap = new mutable.HashMap[Symbol, OutputChannel[Any]]
- var sessionMap = new mutable.HashMap[OutputChannel[_], Symbol]
-
- def act() {
- Actor.loop {
- react {
- case cmd@Apply0(rfun) =>
- kernel.remoteApply(node, name, sender, rfun)
-
- case cmd@LocalApply0(rfun, target) =>
- rfun(target, creator)
-
- // Request from remote proxy.
- // `this` is local proxy.
- case cmd@SendTo(out, msg, session) =>
- if (session.name == "nosession") {
- // local send
- out.send(msg, this)
- } else {
- // is this an active session?
- channelMap.get(session) match {
- case None =>
- // create a new reply channel...
- val replyCh = new Channel[Any](this)
- // ...that maps to session
- sessionMap(replyCh) = session
- // local send
- out.send(msg, replyCh)
-
- // finishes request-reply cycle
- case Some(replyCh) =>
- channelMap -= session
- replyCh ! msg
- }
- }
-
- case cmd@Terminate =>
- exit()
-
- // local proxy receives response to
- // reply channel
- case ch ! resp =>
- // lookup session ID
- sessionMap.get(ch) match {
- case Some(sid) =>
- sessionMap -= ch
- val msg = resp.asInstanceOf[AnyRef]
- // send back response
- kernel.forward(sender, node, name, msg, sid)
-
- case None =>
- Debug.info(this+": cannot find session for "+ch)
- }
-
- // remote proxy receives request
- case msg: AnyRef =>
- // find out whether it's a synchronous send
- if (sender.getClass.toString.contains("Channel")) {
- // create fresh session ID...
- val fresh = FreshNameCreator.newName(node+"@"+name)
- // ...that maps to reply channel
- channelMap(fresh) = sender
- kernel.forward(sender, node, name, msg, fresh)
- } else {
- kernel.forward(sender, node, name, msg, 'nosession)
- }
- }
- }
- }
-
-}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
deleted file mode 100644
index 2daf9ceb43..0000000000
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-
-package scala.actors
-package remote
-
-
-/**
- * This object provides methods for creating, registering, and
- * selecting remotely accessible actors.
- *
- * A remote actor is typically created like this:
- * {{{
- * actor {
- * alive(9010)
- * register('myName, self)
- *
- * // behavior
- * }
- * }}}
- * It can be accessed by an actor running on a (possibly)
- * different node by selecting it in the following way:
- * {{{
- * actor {
- * // ...
- * val c = select(Node("127.0.0.1", 9010), 'myName)
- * c ! msg
- * // ...
- * }
- * }}}
- *
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-object RemoteActor {
-
- private val kernels = new scala.collection.mutable.HashMap[InternalActor, NetKernel]
-
- /* If set to <code>null</code> (default), the default class loader
- * of <code>java.io.ObjectInputStream</code> is used for deserializing
- * objects sent as messages.
- */
- private var cl: ClassLoader = null
-
- def classLoader: ClassLoader = cl
- def classLoader_=(x: ClassLoader) { cl = x }
-
- /**
- * Makes <code>self</code> remotely accessible on TCP port
- * <code>port</code>.
- */
- def alive(port: Int): Unit = synchronized {
- createNetKernelOnPort(port)
- }
-
- private def createNetKernelOnPort(port: Int): NetKernel = {
- val serv = TcpService(port, cl)
- val kern = serv.kernel
- val s = Actor.self(Scheduler)
- kernels(s) = kern
-
- s.onTerminate {
- Debug.info("alive actor "+s+" terminated")
- // remove mapping for `s`
- kernels -= s
- // terminate `kern` when it does
- // not appear as value any more
- if (!kernels.valuesIterator.contains(kern)) {
- Debug.info("terminating "+kern)
- // terminate NetKernel
- kern.terminate()
- }
- }
-
- kern
- }
-
- /**
- * Registers <code>a</code> under <code>name</code> on this
- * node.
- */
- def register(name: Symbol, a: Actor): Unit = synchronized {
- val kernel = kernels.get(Actor.self(Scheduler)) match {
- case None =>
- val serv = TcpService(TcpService.generatePort, cl)
- kernels(Actor.self(Scheduler)) = serv.kernel
- serv.kernel
- case Some(k) =>
- k
- }
- kernel.register(name, a)
- }
-
- private def selfKernel = kernels.get(Actor.self(Scheduler)) match {
- case None =>
- // establish remotely accessible
- // return path (sender)
- createNetKernelOnPort(TcpService.generatePort)
- case Some(k) =>
- k
- }
-
- /**
- * Returns (a proxy for) the actor registered under
- * <code>name</code> on <code>node</code>.
- */
- def select(node: Node, sym: Symbol): AbstractActor = synchronized {
- selfKernel.getOrCreateProxy(node, sym)
- }
-
- private[remote] def someNetKernel: NetKernel =
- kernels.valuesIterator.next
-}
-
-
-/**
- * This class represents a machine node on a TCP network.
- *
- * @param address the host name, or <code>null</code> for the loopback address.
- * @param port the port number.
- *
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-case class Node(address: String, port: Int)
diff --git a/src/actors/scala/actors/remote/Serializer.scala b/src/actors/scala/actors/remote/Serializer.scala
deleted file mode 100644
index 7be4aa6583..0000000000
--- a/src/actors/scala/actors/remote/Serializer.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-
-package scala.actors
-package remote
-
-
-import java.lang.ClassNotFoundException
-
-import java.io.{DataInputStream, DataOutputStream, EOFException, IOException}
-
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-abstract class Serializer(val service: Service) {
- def serialize(o: AnyRef): Array[Byte]
- def deserialize(a: Array[Byte]): AnyRef
-
- @throws(classOf[IOException])
- private def readBytes(inputStream: DataInputStream): Array[Byte] = {
- try {
- val length = inputStream.readInt()
- val bytes = new Array[Byte](length)
- inputStream.readFully(bytes, 0, length)
- bytes
- }
- catch {
- case npe: NullPointerException =>
- throw new EOFException("Connection closed.")
- }
- }
-
- @throws(classOf[IOException]) @throws(classOf[ClassNotFoundException])
- def readObject(inputStream: DataInputStream): AnyRef = {
- val bytes = readBytes(inputStream)
- deserialize(bytes)
- }
-
- @throws(classOf[IOException])
- private def writeBytes(outputStream: DataOutputStream, bytes: Array[Byte]) {
- val length = bytes.length;
- // original length
- outputStream.writeInt(length)
- outputStream.write(bytes, 0, length)
- outputStream.flush()
- }
-
- @throws(classOf[IOException])
- def writeObject(outputStream: DataOutputStream, obj: AnyRef) {
- val bytes = serialize(obj)
- writeBytes(outputStream, bytes)
- }
-}
diff --git a/src/actors/scala/actors/remote/Service.scala b/src/actors/scala/actors/remote/Service.scala
deleted file mode 100644
index d102df1970..0000000000
--- a/src/actors/scala/actors/remote/Service.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-package scala.actors
-package remote
-
-/**
- * @version 0.9.10
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-trait Service {
- val kernel = new NetKernel(this)
- val serializer: Serializer
- def node: Node
- def send(node: Node, data: Array[Byte]): Unit
- def terminate(): Unit
-}
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
deleted file mode 100644
index 69e5c46c52..0000000000
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ /dev/null
@@ -1,292 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-
-package scala.actors
-package remote
-
-
-import java.io.{DataInputStream, DataOutputStream, IOException}
-import java.lang.{Thread, SecurityException}
-import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketTimeoutException, UnknownHostException}
-
-import scala.collection.mutable
-import scala.util.Random
-
-/* Object TcpService.
- *
- * @version 0.9.9
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-object TcpService {
- private val random = new Random
- private val ports = new mutable.HashMap[Int, TcpService]
-
- def apply(port: Int, cl: ClassLoader): TcpService =
- ports.get(port) match {
- case Some(service) =>
- service
- case None =>
- val service = new TcpService(port, cl)
- ports(port) = service
- service.start()
- Debug.info("created service at "+service.node)
- service
- }
-
- def generatePort: Int = {
- var portnum = 0
- try {
- portnum = 8000 + random.nextInt(500)
- val socket = new ServerSocket(portnum)
- socket.close()
- }
- catch {
- case ioe: IOException =>
- // this happens when trying to open a socket twice
- // at the same port
- // try again
- generatePort
- case se: SecurityException =>
- // do nothing
- }
- portnum
- }
-
- private val connectTimeoutMillis = {
- val propName = "scala.actors.tcpSocket.connectTimeoutMillis"
- val defaultTimeoutMillis = 0
- sys.props get propName flatMap {
- timeout =>
- try {
- val to = timeout.toInt
- Debug.info(s"Using socket timeout $to")
- Some(to)
- } catch {
- case e: NumberFormatException =>
- Debug.warning(s"""Could not parse $propName = "$timeout" as an Int""")
- None
- }
- } getOrElse defaultTimeoutMillis
- }
-
- var BufSize: Int = 65536
-}
-
-/* Class TcpService.
- *
- * @version 0.9.10
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-class TcpService(port: Int, cl: ClassLoader) extends Thread with Service {
- val serializer: JavaSerializer = new JavaSerializer(this, cl)
-
- private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port)
- def node: Node = internalNode
-
- private val pendingSends = new mutable.HashMap[Node, List[Array[Byte]]]
-
- /**
- * Sends a byte array to another node on the network.
- * If the node is not yet up, up to `TcpService.BufSize`
- * messages are buffered.
- */
- def send(node: Node, data: Array[Byte]): Unit = synchronized {
-
- def bufferMsg(t: Throwable) {
- // buffer message, so that it can be re-sent
- // when remote net kernel comes up
- (pendingSends.get(node): @unchecked) match {
- case None =>
- pendingSends(node) = List(data)
- case Some(msgs) if msgs.length < TcpService.BufSize =>
- pendingSends(node) = data :: msgs
- }
- }
-
- // retrieve worker thread (if any) that already has connection
- getConnection(node) match {
- case None =>
- // we are not connected, yet
- try {
- val newWorker = connect(node)
-
- // any pending sends?
- pendingSends.get(node) match {
- case None =>
- // do nothing
- case Some(msgs) =>
- msgs.reverse foreach {newWorker transmit _}
- pendingSends -= node
- }
-
- newWorker transmit data
- } catch {
- case uhe: UnknownHostException =>
- bufferMsg(uhe)
- case ioe: IOException =>
- bufferMsg(ioe)
- case se: SecurityException =>
- // do nothing
- }
- case Some(worker) =>
- worker transmit data
- }
- }
-
- def terminate() {
- shouldTerminate = true
- try {
- new Socket(internalNode.address, internalNode.port)
- } catch {
- case ce: java.net.ConnectException =>
- Debug.info(this+": caught "+ce)
- }
- }
-
- private var shouldTerminate = false
-
- override def run() {
- try {
- val socket = new ServerSocket(port)
- while (!shouldTerminate) {
- Debug.info(this+": waiting for new connection on port "+port+"...")
- val nextClient = socket.accept()
- if (!shouldTerminate) {
- val worker = new TcpServiceWorker(this, nextClient)
- Debug.info("Started new "+worker)
- worker.readNode
- worker.start()
- } else
- nextClient.close()
- }
- } catch {
- case e: Exception =>
- Debug.info(this+": caught "+e)
- } finally {
- Debug.info(this+": shutting down...")
- connections foreach { case (_, worker) => worker.halt }
- }
- }
-
- // connection management
-
- private val connections =
- new mutable.HashMap[Node, TcpServiceWorker]
-
- private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized {
- connections(node) = worker
- }
-
- def getConnection(n: Node) = synchronized {
- connections.get(n)
- }
-
- def isConnected(n: Node): Boolean = synchronized {
- !connections.get(n).isEmpty
- }
-
- def connect(n: Node): TcpServiceWorker = synchronized {
- val socket = new Socket()
- val start = System.nanoTime
- try {
- socket.connect(new InetSocketAddress(n.address, n.port), TcpService.connectTimeoutMillis)
- } catch {
- case e: SocketTimeoutException =>
- Debug.warning(f"Timed out connecting to $n after ${(System.nanoTime - start) / math.pow(10, 9)}%.3f seconds")
- throw e
- }
- val worker = new TcpServiceWorker(this, socket)
- worker.sendNode(n)
- worker.start()
- addConnection(n, worker)
- worker
- }
-
- def disconnectNode(n: Node) = synchronized {
- connections.get(n) match {
- case None =>
- // do nothing
- case Some(worker) =>
- connections -= n
- worker.halt
- }
- }
-
- def isReachable(node: Node): Boolean =
- if (isConnected(node)) true
- else try {
- connect(node)
- return true
- } catch {
- case uhe: UnknownHostException => false
- case ioe: IOException => false
- case se: SecurityException => false
- }
-
- def nodeDown(mnode: Node): Unit = synchronized {
- connections -= mnode
- }
-}
-
-
-private[actors] class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
- val datain = new DataInputStream(so.getInputStream)
- val dataout = new DataOutputStream(so.getOutputStream)
-
- var connectedNode: Node = _
-
- def sendNode(n: Node) {
- connectedNode = n
- parent.serializer.writeObject(dataout, parent.node)
- }
-
- def readNode() {
- val node = parent.serializer.readObject(datain)
- node match {
- case n: Node =>
- connectedNode = n
- parent.addConnection(n, this)
- }
- }
-
- def transmit(data: Array[Byte]): Unit = synchronized {
- Debug.info(this+": transmitting data...")
- dataout.writeInt(data.length)
- dataout.write(data)
- dataout.flush()
- }
-
- var running = true
-
- def halt() = synchronized {
- so.close()
- running = false
- }
-
- override def run() {
- try {
- while (running) {
- val msg = parent.serializer.readObject(datain);
- parent.kernel.processMsg(connectedNode, msg)
- }
- }
- catch {
- case ioe: IOException =>
- Debug.info(this+": caught "+ioe)
- parent nodeDown connectedNode
- case e: Exception =>
- Debug.info(this+": caught "+e)
- parent nodeDown connectedNode
- }
- Debug.info(this+": service terminated at "+parent.node)
- }
-}