summaryrefslogblamecommitdiff
path: root/src/actors/scala/actors/remote/NetKernel.scala
blob: 7cb776257e0e56c8bcddc9fd64a7278646cfe864 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                                          
                                                                          
                                                                          



                                                                          
       
 

                           
                                                  
                              
 
                                                                                                  
 
                                                                   
                     
 

                                            
   
                  
                         
   






                                                 

                                                         
                                                 
                                                                                   

   

                                                              
 
                                                                          

                           

   
                                                                         

                                                              
                               


                      

   






                                                                      

   



                                                                                                 

   
                                                     
                                  
                                   











                                                                                    

                                                                      
                                                                  
                                            
                                            
                         




                                                            
                                                                                
                                                       
                                                             



                                               
 

                              
                                             


         







                                                           
 
 
                                                                             

         






                                                                  
                                                    

             


                                             
                                              
 




















                                                    

           


                                              
 












                                                              
 








                                                           
       
     
   
 
 
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2008, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id$

package scala.actors.remote

import scala.collection.mutable.{HashMap, HashSet}
import scala.actors.Actor.loop

case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)

case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
case object Terminate

case class Locator(node: Node, name: Symbol)

/**
 * @version 0.9.10
 * @author Philipp Haller
 */
class NetKernel(service: Service) {

  def sendToNode(node: Node, msg: AnyRef) = {
    val bytes = service.serializer.serialize(msg)
    service.send(node, bytes)
  }

  def namedSend(senderLoc: Locator, receiverLoc: Locator,
                msg: AnyRef, session: Symbol) {
    val bytes = service.serializer.serialize(msg)
    sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session))
  }

  private val actors = new HashMap[Symbol, OutputChannel[Any]]
  private val names = new HashMap[OutputChannel[Any], Symbol]

  def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized {
    actors += Pair(name, a)
    names += Pair(a, name)
  }

  def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match {
    case None =>
      val freshName = FreshNameCreator.newName("remotesender")
      register(freshName, from)
      freshName
    case Some(name) =>
      name
  }

  def send(node: Node, name: Symbol, msg: AnyRef): Unit =
    send(node, name, msg, 'nosession)

  def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
    val senderLoc = Locator(service.node, getOrCreateName(Actor.self))
    val receiverLoc = Locator(node, name)
    namedSend(senderLoc, receiverLoc, msg, session)
  }

  def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
    val senderLoc = Locator(service.node, getOrCreateName(from))
    val receiverLoc = Locator(node, name)
    namedSend(senderLoc, receiverLoc, msg, session)
  }

  def createProxy(node: Node, sym: Symbol): Actor = {
    val p = Proxy(node, sym, this)
    proxies += Pair((node, sym), p)
    p
  }

  val proxies = new HashMap[(Node, Symbol), Actor]

  def getOrCreateProxy(senderNode: Node, senderName: Symbol): Actor = synchronized {
    proxies.get((senderNode, senderName)) match {
      case Some(senderProxy) => senderProxy
      case None => createProxy(senderNode, senderName)
    }
  }

  def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
    msg match {
      case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
        Debug.info(this+": processing "+cmd)
        actors.get(receiverLoc.name) match {
          case Some(a) =>
            try {
              Debug.info(this+": receiver is "+a)
              val msg = service.serializer.deserialize(data)
              Debug.info(this+": deserialized msg is "+msg)

              val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
              Debug.info(this+": created "+senderProxy)
              senderProxy.send(SendTo(a, msg, session), null)
            } catch {
              case e: Exception =>
                Debug.error(this+": caught "+e)
            }

          case None =>
            // message is lost
            Debug.info(this+": lost message")
        }
    }
  }

  def terminate() {
    // tell all proxies to terminate
    proxies.values foreach { p => p.send(Terminate, null) }

    // tell service to terminate
    service.terminate()
  }
}

case class Proxy(node: Node, name: Symbol, kernel: NetKernel) extends Actor {
  start()

  @transient
  private var channelMap = new HashMap[Symbol, OutputChannel[Any]]

  @transient
  private var sessionMap = new HashMap[Channel[Any], Symbol]

  def act() {
    Debug.info(this+": waiting to process commands")
    loop {
      react {
        // Request from remote proxy.
        // `this` is local proxy.
        case cmd@SendTo(out, msg, session) =>
          Debug.info(this+": processing "+cmd)

          // is this an active session?
          channelMap.get(session) match {
            case None =>
              // create a new reply channel...
              val replyCh = new Channel[Any](this)

              // ...that maps to session
              sessionMap += Pair(replyCh, session)

              // local send to actor
              val a = out.asInstanceOf[Actor]
              a.send(msg, replyCh)

            case Some(replyCh) =>
              replyCh ! msg
              // TODO:
              // remove `replyCh` from mapping
              // to avoid memory leak (always safe?)
              // or: use WeakHashMap
              // however, it's the value (channel)
              // that should be weak!
          }

        case cmd@Terminate =>
          Debug.info(this+": processing "+cmd)
          exit()

        // local proxy receives response to
        // reply channel
        case ch ! resp =>
          // lookup session ID
          sessionMap.get(ch) match {
            case Some(sid) =>
              val msg = resp.asInstanceOf[AnyRef]
              // send back response
              kernel.forward(sender, node, name, msg, sid)

            case None =>
              Debug.info(this+": cannot find session for "+ch)
          }

        // remote proxy receives request
        case msg: AnyRef =>
          // create fresh session ID...
          val sid = FreshNameCreator.newName(node+"@"+name)

          // ...that maps to reply channel
          channelMap += Pair(sid, sender)

          kernel.forward(sender, node, name, msg, sid)
      }
    }
  }

}