summaryrefslogblamecommitdiff
path: root/src/actors/scala/actors/remote/TcpService.scala
blob: e40384749bfa0ed6610954216903d61540ee2ce2 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                                          

                                                                          



                                                                          
       
 
 

                           
 
                                            
                                                               
                                                                         
 

                      

                                       




                         
                   
                                                         









                                            

                                                          






                                   

                          

 




                         


                                                           

                                                                                        
 

                                                                 




                                                                     
                                                                






                                                  
                                                              



                                                
                                                                  
                               
                  
                                    



















                                                 

                            


     
                      

                                         
                    
                                                          

                                                           
                                         



                       







                                           
     
   



                           
                                                                
 
                                                                                          


                                 
                                             



                                                    
                               

   
                                                         








                                                 







                              













                                             
                                                  











                                                                       
                             
 
                           




                                                       

                                                   
                       






                                                        
                                             











                                 
                      

                       

                                                       



                              
                                        

                                     
                                      

                                     
                                         

   
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2007, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id$


package scala.actors.remote


import java.lang.{Thread, SecurityException}
import java.io.{DataInputStream, DataOutputStream, IOException}
import java.net.{InetAddress, ServerSocket, Socket, UnknownHostException}

import compat.Platform

import scala.collection.mutable.HashMap

/* Object TcpService.
 *
 * @version 0.9.8
 * @author Philipp Haller
 */
object TcpService {
  val random = new java.util.Random(Platform.currentTime)

  def generatePort: int = {
    var portnum = 0
    try {
      portnum = 8000 + random.nextInt(500)
      val socket = new ServerSocket(portnum)
      socket.close()
    }
    catch {
      case ioe: IOException =>
        // this happens when trying to open a socket twice
        // at the same port
        // try again
        generatePort
      case se: SecurityException =>
        // do nothing
    }
    portnum
  }

  var BufSize: int = 65536
}

/* Class TcpService.
 *
 * @version 0.9.8
 * @author Philipp Haller
 */
class TcpService(port: Int) extends Thread with Service {
  val serializer: JavaSerializer = new JavaSerializer(this)

  private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port)
  def node: Node = internalNode

  private val pendingSends = new HashMap[Node, List[Array[byte]]]

  /**
   * Sends a byte array to another node on the network.
   * If the node is not yet up, up to <code>TcpService.BufSize</code>
   * messages are buffered.
   */
  def send(node: Node, data: Array[byte]): Unit = synchronized {

    def bufferMsg(t: Throwable) = {
      // buffer message, so that it can be re-sent
      // when remote net kernel comes up
      pendingSends.get(node) match {
        case None =>
          pendingSends += node -> (data :: Nil)
        case Some(msgs) if msgs.length < TcpService.BufSize =>
          pendingSends += node -> (data :: msgs)
      }
    }

    // retrieve worker thread (if any) that already has connection
    getConnection(node) match {
      case None =>
        // we are not connected, yet
        try {
          val newWorker = connect(node)
          newWorker transmit data

          // any pending sends?
          pendingSends.get(node) match {
            case None =>
              // do nothing
            case Some(msgs) =>
              msgs foreach {newWorker transmit _}
              pendingSends -= node
          }
        } catch {
          case uhe: UnknownHostException =>
            bufferMsg(uhe)
          case ioe: IOException =>
            bufferMsg(ioe)
          case se: SecurityException =>
            // do nothing
        }
      case Some(worker) =>
        worker transmit data
    }
  }

  override def run() {
    try {
      val socket = new ServerSocket(port)
      while (true) {
        Debug.info(this+": waiting for new connection...")
        val nextClient = socket.accept()
        val worker = new TcpServiceWorker(this, nextClient)
        Debug.info("Started new "+worker)
        worker.readNode
        worker.start()
      }
    } catch {
      case ioe: IOException =>
        Debug.info(this+": caught "+ioe)
      case sec: SecurityException =>
        Debug.info(this+": caught "+sec)
      case e: Exception =>
        Debug.info(this+": caught "+e)
    } finally {
      Debug.info(this+": shutting down...")
    }
  }

  // connection management

  private val connections =
    new scala.collection.mutable.HashMap[Node, TcpServiceWorker]

  private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized {
    connections += node -> worker
  }

  def getConnection(n: Node) = synchronized {
    connections.get(n)
  }

  def isConnected(n: Node): Boolean = synchronized {
    !connections.get(n).isEmpty
  }

  def connect(n: Node): TcpServiceWorker = synchronized {
    val sock = new Socket(n.address, n.port)
    val worker = new TcpServiceWorker(this, sock)
    worker.sendNode(n)
    worker.start()
    addConnection(n, worker)
    worker
  }

  def disconnectNode(n: Node) = synchronized {
    connections.get(n) match {
      case None => {
        // do nothing
      }
      case Some(worker) => {
        connections -= n
        worker.halt
      }
    }
  }

  def isReachable(node: Node): boolean =
    if (isConnected(node)) true
    else try {
      connect(node)
      return true
    } catch {
      case uhe: UnknownHostException => false
      case ioe: IOException => false
      case se: SecurityException => false
    }

  def nodeDown(mnode: Node): Unit = synchronized {
    connections -= mnode
  }
}


class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
  val in = so.getInputStream()
  val out = so.getOutputStream()

  val datain = new DataInputStream(in)
  val dataout = new DataOutputStream(out)

  var connectedNode: Node = _

  def sendNode(n: Node) = {
    connectedNode = n
    parent.serializer.writeObject(dataout, parent.node)
  }

  def readNode = {
    val node = parent.serializer.readObject(datain)
    node match {
      case n: Node => {
        connectedNode = n
        parent.addConnection(n, this)
      }
    }
  }

  def transmit(data: Array[byte]): Unit = synchronized {
    Debug.info(this+": transmitting data...")
    dataout.writeInt(data.length)
    dataout.write(data)
    dataout.flush()
  }

  var running = true

  def halt = synchronized {
    so.close()
    running = false
  }

  override def run() {
    try {
      while (running) {
        val msg = parent.serializer.readObject(datain);
        parent.kernel.processMsg(connectedNode, msg)
      }
    }
    catch {
      case ioe: IOException =>
        Debug.info(this+": caught "+ioe)
        parent nodeDown connectedNode
      case e: Exception =>
        Debug.info(this+": caught "+e)
        parent nodeDown connectedNode
    }
    Debug.info(this+": shutting down...")
  }
}