summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/remote/NetKernel.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/actors/scala/actors/remote/NetKernel.scala')
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala147
1 files changed, 0 insertions, 147 deletions
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
deleted file mode 100644
index 57d7af6d26..0000000000
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-package scala.actors
-package remote
-
-import scala.collection.mutable
-
-case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)
-
-case class RemoteApply0(senderLoc: Locator, receiverLoc: Locator, rfun: Function2[AbstractActor, Proxy, Unit])
-case class LocalApply0(rfun: Function2[AbstractActor, Proxy, Unit], a: AbstractActor)
-
-case class SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
-case object Terminate
-
-case class Locator(node: Node, name: Symbol)
-
-/**
- * @version 0.9.17
- * @author Philipp Haller
- */
-private[remote] class NetKernel(service: Service) {
-
- def sendToNode(node: Node, msg: AnyRef) = {
- val bytes = service.serializer.serialize(msg)
- service.send(node, bytes)
- }
-
- def namedSend(senderLoc: Locator, receiverLoc: Locator,
- msg: AnyRef, session: Symbol) {
- val bytes = service.serializer.serialize(msg)
- sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session))
- }
-
- private val actors = new mutable.HashMap[Symbol, OutputChannel[Any]]
- private val names = new mutable.HashMap[OutputChannel[Any], Symbol]
-
- def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized {
- actors(name) = a
- names(a) = name
- }
-
- def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match {
- case None =>
- val freshName = FreshNameCreator.newName("remotesender")
- register(freshName, from)
- freshName
- case Some(name) =>
- name
- }
-
- def send(node: Node, name: Symbol, msg: AnyRef): Unit =
- send(node, name, msg, 'nosession)
-
- def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
- val senderLoc = Locator(service.node, getOrCreateName(Actor.self(Scheduler)))
- val receiverLoc = Locator(node, name)
- namedSend(senderLoc, receiverLoc, msg, session)
- }
-
- def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
- val senderLoc = Locator(service.node, getOrCreateName(from))
- val receiverLoc = Locator(node, name)
- namedSend(senderLoc, receiverLoc, msg, session)
- }
-
- def remoteApply(node: Node, name: Symbol, from: OutputChannel[Any], rfun: Function2[AbstractActor, Proxy, Unit]) {
- val senderLoc = Locator(service.node, getOrCreateName(from))
- val receiverLoc = Locator(node, name)
- sendToNode(receiverLoc.node, RemoteApply0(senderLoc, receiverLoc, rfun))
- }
-
- def createProxy(node: Node, sym: Symbol): Proxy = {
- val p = new Proxy(node, sym, this)
- proxies((node, sym)) = p
- p
- }
-
- val proxies = new mutable.HashMap[(Node, Symbol), Proxy]
-
- def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy =
- proxies.synchronized {
- proxies.get((senderNode, senderName)) match {
- case Some(senderProxy) => senderProxy
- case None => createProxy(senderNode, senderName)
- }
- }
-
- /* Register proxy if no other proxy has been registered.
- */
- def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit =
- proxies.synchronized {
- proxies.get((senderNode, senderName)) match {
- case Some(senderProxy) => // do nothing
- case None => proxies((senderNode, senderName)) = p
- }
- }
-
- def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
- msg match {
- case cmd@RemoteApply0(senderLoc, receiverLoc, rfun) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiverLoc.name) match {
- case Some(a) =>
- val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(LocalApply0(rfun, a.asInstanceOf[AbstractActor]), null)
-
- case None =>
- // message is lost
- Debug.info(this+": lost message")
- }
-
- case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
- Debug.info(this+": processing "+cmd)
- actors.get(receiverLoc.name) match {
- case Some(a) =>
- try {
- val msg = service.serializer.deserialize(data)
- val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
- senderProxy.send(SendTo(a, msg, session), null)
- } catch {
- case e: Exception =>
- Debug.error(this+": caught "+e)
- }
-
- case None =>
- // message is lost
- Debug.info(this+": lost message")
- }
- }
- }
-
- def terminate() {
- // tell all proxies to terminate
- proxies.values foreach { _.send(Terminate, null) }
-
- // tell service to terminate
- service.terminate()
- }
-}