diff options
Diffstat (limited to 'src/actors/scala/actors/remote/Proxy.scala')
-rw-r--r-- | src/actors/scala/actors/remote/Proxy.scala | 190 |
1 files changed, 0 insertions, 190 deletions
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala deleted file mode 100644 index 2cb03544f2..0000000000 --- a/src/actors/scala/actors/remote/Proxy.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package remote - -import scala.collection.mutable - -/** - * @author Philipp Haller - */ -private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor with Serializable { - import java.io.{IOException, ObjectOutputStream, ObjectInputStream} - - type Future[+P] = scala.actors.Future[P] - - @transient - private[remote] var del: Actor = null - startDelegate() - - @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) { - out.defaultWriteObject() - } - - @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) { - in.defaultReadObject() - setupKernel() - startDelegate() - } - - private def startDelegate() { - del = new DelegateActor(this, node, name, kernel) - del.start() - } - - private def setupKernel() { - kernel = RemoteActor.someNetKernel - kernel.registerProxy(node, name, this) - } - - def !(msg: Any): Unit = - del ! msg - - def send(msg: Any, replyCh: OutputChannel[Any]): Unit = - del.send(msg, replyCh) - - def forward(msg: Any): Unit = - del.forward(msg) - - def receiver: Actor = - del - - def !?(msg: Any): Any = - del !? msg - - def !?(msec: Long, msg: Any): Option[Any] = - del !? (msec, msg) - - def !!(msg: Any): Future[Any] = - del !! msg - - def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = - del !! (msg, f) - - def linkTo(to: AbstractActor): Unit = - del ! Apply0(new LinkToFun) - - def unlinkFrom(from: AbstractActor): Unit = - del ! Apply0(new UnlinkFromFun) - - def exit(from: AbstractActor, reason: AnyRef): Unit = - del ! Apply0(new ExitFun(reason)) - - override def toString() = - name+"@"+node -} - -// Proxy is private[remote], but these classes are public and use it in a public -// method signature. That makes the only method they have non-overridable. -// So I made them final, which seems appropriate anyway. - -final class LinkToFun extends Function2[AbstractActor, Proxy, Unit] with Serializable { - def apply(target: AbstractActor, creator: Proxy) { - target.linkTo(creator) - } - override def toString = - "<LinkToFun>" -} - -final class UnlinkFromFun extends Function2[AbstractActor, Proxy, Unit] with Serializable { - def apply(target: AbstractActor, creator: Proxy) { - target.unlinkFrom(creator) - } - override def toString = - "<UnlinkFromFun>" -} - -final class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] with Serializable { - def apply(target: AbstractActor, creator: Proxy) { - target.exit(creator, reason) - } - override def toString = - "<ExitFun>("+reason.toString+")" -} - -private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit]) - -/** - * @author Philipp Haller - */ -private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor { - var channelMap = new mutable.HashMap[Symbol, OutputChannel[Any]] - var sessionMap = new mutable.HashMap[OutputChannel[_], Symbol] - - def act() { - Actor.loop { - react { - case cmd@Apply0(rfun) => - kernel.remoteApply(node, name, sender, rfun) - - case cmd@LocalApply0(rfun, target) => - rfun(target, creator) - - // Request from remote proxy. - // `this` is local proxy. - case cmd@SendTo(out, msg, session) => - if (session.name == "nosession") { - // local send - out.send(msg, this) - } else { - // is this an active session? - channelMap.get(session) match { - case None => - // create a new reply channel... - val replyCh = new Channel[Any](this) - // ...that maps to session - sessionMap(replyCh) = session - // local send - out.send(msg, replyCh) - - // finishes request-reply cycle - case Some(replyCh) => - channelMap -= session - replyCh ! msg - } - } - - case cmd@Terminate => - exit() - - // local proxy receives response to - // reply channel - case ch ! resp => - // lookup session ID - sessionMap.get(ch) match { - case Some(sid) => - sessionMap -= ch - val msg = resp.asInstanceOf[AnyRef] - // send back response - kernel.forward(sender, node, name, msg, sid) - - case None => - Debug.info(this+": cannot find session for "+ch) - } - - // remote proxy receives request - case msg: AnyRef => - // find out whether it's a synchronous send - if (sender.getClass.toString.contains("Channel")) { - // create fresh session ID... - val fresh = FreshNameCreator.newName(node+"@"+name) - // ...that maps to reply channel - channelMap(fresh) = sender - kernel.forward(sender, node, name, msg, fresh) - } else { - kernel.forward(sender, node, name, msg, 'nosession) - } - } - } - } - -} |