summaryrefslogblamecommitdiff
path: root/src/actors/scala/actors/remote/TcpService.scala
blob: 69e5c46c526ee4cb06000fdf103fc95d65aa9a3f (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                                          
                                                                          
                                                                          



                                                                          
 
 

                    
 
 
                                                               
                                            
                                                                                                                    
 
                               
                        
 

                     
                 

                         
                                                                                                                                             
                   
                                 
                                                          
 
                                                     
                           

                           
                  
                                              
                             
                       
                                                      

               
 
                           







                                            

                                                          






                                   
 






                                                                
                                                 








                                                                                  
                          

 

                    
                  

                         
                                                                                                                                             

                                                                          
 

                                                                                        
 
                                                                         
 

                                                       
                                                          

                           
                                                                
 
                                 

                                                  
                                                  
                    
                                         
                                                              
                                           


       
                                                                  
                               
                  
                                    

                                       





                                        
                                                         

                                  

                                 







                                           

                            


     

                          





                                                         



                                     
                      

                                         
                                
                                                                           
                                        






                                                             

             



                                           
                                                             
     
   



                           
                                               
 
                                                                                          
                              

   
                                             



                                                    
                               

   
                                                         








                                                                                                                     
                                                   






                                              
                              
                  
                     
                          

                        


     
                                        









                                             
                                                  




                        


                                                                                       
 
                             
 
                         



                                                       
                  

                                                   
                     

                                     


     
                                                        
                                             






                                 
                             



                   
                      

                       

                                                       



                              
                                        

                                     
                                      

                                     
                                                           

   
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */



package scala.actors
package remote


import java.io.{DataInputStream, DataOutputStream, IOException}
import java.lang.{Thread, SecurityException}
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketTimeoutException, UnknownHostException}

import scala.collection.mutable
import scala.util.Random

/* Object TcpService.
 *
 * @version 0.9.9
 * @author Philipp Haller
 */
@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
object TcpService {
  private val random = new Random
  private val ports = new mutable.HashMap[Int, TcpService]

  def apply(port: Int, cl: ClassLoader): TcpService =
    ports.get(port) match {
      case Some(service) =>
        service
      case None =>
        val service = new TcpService(port, cl)
        ports(port) = service
        service.start()
        Debug.info("created service at "+service.node)
        service
    }

  def generatePort: Int = {
    var portnum = 0
    try {
      portnum = 8000 + random.nextInt(500)
      val socket = new ServerSocket(portnum)
      socket.close()
    }
    catch {
      case ioe: IOException =>
        // this happens when trying to open a socket twice
        // at the same port
        // try again
        generatePort
      case se: SecurityException =>
        // do nothing
    }
    portnum
  }

  private val connectTimeoutMillis = {
    val propName = "scala.actors.tcpSocket.connectTimeoutMillis"
    val defaultTimeoutMillis = 0
    sys.props get propName flatMap {
      timeout =>
        try {
          val to = timeout.toInt
          Debug.info(s"Using socket timeout $to")
          Some(to)
        } catch {
          case e: NumberFormatException =>
            Debug.warning(s"""Could not parse $propName = "$timeout" as an Int""")
            None
        }
    } getOrElse defaultTimeoutMillis
  }

  var BufSize: Int = 65536
}

/* Class TcpService.
 *
 * @version 0.9.10
 * @author Philipp Haller
 */
@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
class TcpService(port: Int, cl: ClassLoader) extends Thread with Service {
  val serializer: JavaSerializer = new JavaSerializer(this, cl)

  private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port)
  def node: Node = internalNode

  private val pendingSends = new mutable.HashMap[Node, List[Array[Byte]]]

  /**
   * Sends a byte array to another node on the network.
   * If the node is not yet up, up to `TcpService.BufSize`
   * messages are buffered.
   */
  def send(node: Node, data: Array[Byte]): Unit = synchronized {

    def bufferMsg(t: Throwable) {
      // buffer message, so that it can be re-sent
      // when remote net kernel comes up
      (pendingSends.get(node): @unchecked) match {
        case None =>
          pendingSends(node) = List(data)
        case Some(msgs) if msgs.length < TcpService.BufSize =>
          pendingSends(node) = data :: msgs
      }
    }

    // retrieve worker thread (if any) that already has connection
    getConnection(node) match {
      case None =>
        // we are not connected, yet
        try {
          val newWorker = connect(node)

          // any pending sends?
          pendingSends.get(node) match {
            case None =>
              // do nothing
            case Some(msgs) =>
              msgs.reverse foreach {newWorker transmit _}
              pendingSends -= node
          }

          newWorker transmit data
        } catch {
          case uhe: UnknownHostException =>
            bufferMsg(uhe)
          case ioe: IOException =>
            bufferMsg(ioe)
          case se: SecurityException =>
            // do nothing
        }
      case Some(worker) =>
        worker transmit data
    }
  }

  def terminate() {
    shouldTerminate = true
    try {
      new Socket(internalNode.address, internalNode.port)
    } catch {
      case ce: java.net.ConnectException =>
        Debug.info(this+": caught "+ce)
    }
  }

  private var shouldTerminate = false

  override def run() {
    try {
      val socket = new ServerSocket(port)
      while (!shouldTerminate) {
        Debug.info(this+": waiting for new connection on port "+port+"...")
        val nextClient = socket.accept()
        if (!shouldTerminate) {
          val worker = new TcpServiceWorker(this, nextClient)
          Debug.info("Started new "+worker)
          worker.readNode
          worker.start()
        } else
          nextClient.close()
      }
    } catch {
      case e: Exception =>
        Debug.info(this+": caught "+e)
    } finally {
      Debug.info(this+": shutting down...")
      connections foreach { case (_, worker) => worker.halt }
    }
  }

  // connection management

  private val connections =
    new mutable.HashMap[Node, TcpServiceWorker]

  private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized {
    connections(node) = worker
  }

  def getConnection(n: Node) = synchronized {
    connections.get(n)
  }

  def isConnected(n: Node): Boolean = synchronized {
    !connections.get(n).isEmpty
  }

  def connect(n: Node): TcpServiceWorker = synchronized {
    val socket = new Socket()
    val start = System.nanoTime
    try {
      socket.connect(new InetSocketAddress(n.address, n.port), TcpService.connectTimeoutMillis)
    } catch {
      case e: SocketTimeoutException =>
        Debug.warning(f"Timed out connecting to $n after ${(System.nanoTime - start) / math.pow(10, 9)}%.3f seconds")
        throw e
    }
    val worker = new TcpServiceWorker(this, socket)
    worker.sendNode(n)
    worker.start()
    addConnection(n, worker)
    worker
  }

  def disconnectNode(n: Node) = synchronized {
    connections.get(n) match {
      case None =>
        // do nothing
      case Some(worker) =>
        connections -= n
        worker.halt
    }
  }

  def isReachable(node: Node): Boolean =
    if (isConnected(node)) true
    else try {
      connect(node)
      return true
    } catch {
      case uhe: UnknownHostException => false
      case ioe: IOException => false
      case se: SecurityException => false
    }

  def nodeDown(mnode: Node): Unit = synchronized {
    connections -= mnode
  }
}


private[actors] class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
  val datain = new DataInputStream(so.getInputStream)
  val dataout = new DataOutputStream(so.getOutputStream)

  var connectedNode: Node = _

  def sendNode(n: Node) {
    connectedNode = n
    parent.serializer.writeObject(dataout, parent.node)
  }

  def readNode() {
    val node = parent.serializer.readObject(datain)
    node match {
      case n: Node =>
        connectedNode = n
        parent.addConnection(n, this)
    }
  }

  def transmit(data: Array[Byte]): Unit = synchronized {
    Debug.info(this+": transmitting data...")
    dataout.writeInt(data.length)
    dataout.write(data)
    dataout.flush()
  }

  var running = true

  def halt() = synchronized {
    so.close()
    running = false
  }

  override def run() {
    try {
      while (running) {
        val msg = parent.serializer.readObject(datain);
        parent.kernel.processMsg(connectedNode, msg)
      }
    }
    catch {
      case ioe: IOException =>
        Debug.info(this+": caught "+ioe)
        parent nodeDown connectedNode
      case e: Exception =>
        Debug.info(this+": caught "+e)
        parent nodeDown connectedNode
    }
    Debug.info(this+": service terminated at "+parent.node)
  }
}