diff options
Diffstat (limited to 'src/actors/scala/actors/remote/TcpService.scala')
-rw-r--r-- | src/actors/scala/actors/remote/TcpService.scala | 292 |
1 files changed, 0 insertions, 292 deletions
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala deleted file mode 100644 index 69e5c46c52..0000000000 --- a/src/actors/scala/actors/remote/TcpService.scala +++ /dev/null @@ -1,292 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - - -package scala.actors -package remote - - -import java.io.{DataInputStream, DataOutputStream, IOException} -import java.lang.{Thread, SecurityException} -import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketTimeoutException, UnknownHostException} - -import scala.collection.mutable -import scala.util.Random - -/* Object TcpService. - * - * @version 0.9.9 - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -object TcpService { - private val random = new Random - private val ports = new mutable.HashMap[Int, TcpService] - - def apply(port: Int, cl: ClassLoader): TcpService = - ports.get(port) match { - case Some(service) => - service - case None => - val service = new TcpService(port, cl) - ports(port) = service - service.start() - Debug.info("created service at "+service.node) - service - } - - def generatePort: Int = { - var portnum = 0 - try { - portnum = 8000 + random.nextInt(500) - val socket = new ServerSocket(portnum) - socket.close() - } - catch { - case ioe: IOException => - // this happens when trying to open a socket twice - // at the same port - // try again - generatePort - case se: SecurityException => - // do nothing - } - portnum - } - - private val connectTimeoutMillis = { - val propName = "scala.actors.tcpSocket.connectTimeoutMillis" - val defaultTimeoutMillis = 0 - sys.props get propName flatMap { - timeout => - try { - val to = timeout.toInt - Debug.info(s"Using socket timeout $to") - Some(to) - } catch { - case e: NumberFormatException => - Debug.warning(s"""Could not parse $propName = "$timeout" as an Int""") - None - } - } getOrElse defaultTimeoutMillis - } - - var BufSize: Int = 65536 -} - -/* Class TcpService. - * - * @version 0.9.10 - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { - val serializer: JavaSerializer = new JavaSerializer(this, cl) - - private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port) - def node: Node = internalNode - - private val pendingSends = new mutable.HashMap[Node, List[Array[Byte]]] - - /** - * Sends a byte array to another node on the network. - * If the node is not yet up, up to `TcpService.BufSize` - * messages are buffered. - */ - def send(node: Node, data: Array[Byte]): Unit = synchronized { - - def bufferMsg(t: Throwable) { - // buffer message, so that it can be re-sent - // when remote net kernel comes up - (pendingSends.get(node): @unchecked) match { - case None => - pendingSends(node) = List(data) - case Some(msgs) if msgs.length < TcpService.BufSize => - pendingSends(node) = data :: msgs - } - } - - // retrieve worker thread (if any) that already has connection - getConnection(node) match { - case None => - // we are not connected, yet - try { - val newWorker = connect(node) - - // any pending sends? - pendingSends.get(node) match { - case None => - // do nothing - case Some(msgs) => - msgs.reverse foreach {newWorker transmit _} - pendingSends -= node - } - - newWorker transmit data - } catch { - case uhe: UnknownHostException => - bufferMsg(uhe) - case ioe: IOException => - bufferMsg(ioe) - case se: SecurityException => - // do nothing - } - case Some(worker) => - worker transmit data - } - } - - def terminate() { - shouldTerminate = true - try { - new Socket(internalNode.address, internalNode.port) - } catch { - case ce: java.net.ConnectException => - Debug.info(this+": caught "+ce) - } - } - - private var shouldTerminate = false - - override def run() { - try { - val socket = new ServerSocket(port) - while (!shouldTerminate) { - Debug.info(this+": waiting for new connection on port "+port+"...") - val nextClient = socket.accept() - if (!shouldTerminate) { - val worker = new TcpServiceWorker(this, nextClient) - Debug.info("Started new "+worker) - worker.readNode - worker.start() - } else - nextClient.close() - } - } catch { - case e: Exception => - Debug.info(this+": caught "+e) - } finally { - Debug.info(this+": shutting down...") - connections foreach { case (_, worker) => worker.halt } - } - } - - // connection management - - private val connections = - new mutable.HashMap[Node, TcpServiceWorker] - - private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized { - connections(node) = worker - } - - def getConnection(n: Node) = synchronized { - connections.get(n) - } - - def isConnected(n: Node): Boolean = synchronized { - !connections.get(n).isEmpty - } - - def connect(n: Node): TcpServiceWorker = synchronized { - val socket = new Socket() - val start = System.nanoTime - try { - socket.connect(new InetSocketAddress(n.address, n.port), TcpService.connectTimeoutMillis) - } catch { - case e: SocketTimeoutException => - Debug.warning(f"Timed out connecting to $n after ${(System.nanoTime - start) / math.pow(10, 9)}%.3f seconds") - throw e - } - val worker = new TcpServiceWorker(this, socket) - worker.sendNode(n) - worker.start() - addConnection(n, worker) - worker - } - - def disconnectNode(n: Node) = synchronized { - connections.get(n) match { - case None => - // do nothing - case Some(worker) => - connections -= n - worker.halt - } - } - - def isReachable(node: Node): Boolean = - if (isConnected(node)) true - else try { - connect(node) - return true - } catch { - case uhe: UnknownHostException => false - case ioe: IOException => false - case se: SecurityException => false - } - - def nodeDown(mnode: Node): Unit = synchronized { - connections -= mnode - } -} - - -private[actors] class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { - val datain = new DataInputStream(so.getInputStream) - val dataout = new DataOutputStream(so.getOutputStream) - - var connectedNode: Node = _ - - def sendNode(n: Node) { - connectedNode = n - parent.serializer.writeObject(dataout, parent.node) - } - - def readNode() { - val node = parent.serializer.readObject(datain) - node match { - case n: Node => - connectedNode = n - parent.addConnection(n, this) - } - } - - def transmit(data: Array[Byte]): Unit = synchronized { - Debug.info(this+": transmitting data...") - dataout.writeInt(data.length) - dataout.write(data) - dataout.flush() - } - - var running = true - - def halt() = synchronized { - so.close() - running = false - } - - override def run() { - try { - while (running) { - val msg = parent.serializer.readObject(datain); - parent.kernel.processMsg(connectedNode, msg) - } - } - catch { - case ioe: IOException => - Debug.info(this+": caught "+ioe) - parent nodeDown connectedNode - case e: Exception => - Debug.info(this+": caught "+e) - parent nodeDown connectedNode - } - Debug.info(this+": service terminated at "+parent.node) - } -} |