summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/remote/Proxy.scala
blob: 2cb03544f29ce5a8bd75c540e1dad0b86680e92d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors
package remote

import scala.collection.mutable

/**
 * @author Philipp Haller
 */
private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor with Serializable {
  import java.io.{IOException, ObjectOutputStream, ObjectInputStream}

  type Future[+P] = scala.actors.Future[P]

  @transient
  private[remote] var del: Actor = null
  startDelegate()

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream) {
    out.defaultWriteObject()
  }

  @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream) {
    in.defaultReadObject()
    setupKernel()
    startDelegate()
  }

  private def startDelegate() {
    del = new DelegateActor(this, node, name, kernel)
    del.start()
  }

  private def setupKernel() {
    kernel = RemoteActor.someNetKernel
    kernel.registerProxy(node, name, this)
  }

  def !(msg: Any): Unit =
    del ! msg

  def send(msg: Any, replyCh: OutputChannel[Any]): Unit =
    del.send(msg, replyCh)

  def forward(msg: Any): Unit =
    del.forward(msg)

  def receiver: Actor =
    del

  def !?(msg: Any): Any =
    del !? msg

  def !?(msec: Long, msg: Any): Option[Any] =
    del !? (msec, msg)

  def !!(msg: Any): Future[Any] =
    del !! msg

  def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] =
    del !! (msg, f)

  def linkTo(to: AbstractActor): Unit =
    del ! Apply0(new LinkToFun)

  def unlinkFrom(from: AbstractActor): Unit =
    del ! Apply0(new UnlinkFromFun)

  def exit(from: AbstractActor, reason: AnyRef): Unit =
    del ! Apply0(new ExitFun(reason))

  override def toString() =
    name+"@"+node
}

// Proxy is private[remote], but these classes are public and use it in a public
// method signature.  That makes the only method they have non-overridable.
// So I made them final, which seems appropriate anyway.

final class LinkToFun extends Function2[AbstractActor, Proxy, Unit] with Serializable {
  def apply(target: AbstractActor, creator: Proxy) {
    target.linkTo(creator)
  }
  override def toString =
    "<LinkToFun>"
}

final class UnlinkFromFun extends Function2[AbstractActor, Proxy, Unit] with Serializable {
  def apply(target: AbstractActor, creator: Proxy) {
    target.unlinkFrom(creator)
  }
  override def toString =
    "<UnlinkFromFun>"
}

final class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] with Serializable {
  def apply(target: AbstractActor, creator: Proxy) {
    target.exit(creator, reason)
  }
  override def toString =
    "<ExitFun>("+reason.toString+")"
}

private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit])

/**
 * @author Philipp Haller
 */
private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor {
  var channelMap = new mutable.HashMap[Symbol, OutputChannel[Any]]
  var sessionMap = new mutable.HashMap[OutputChannel[_], Symbol]

  def act() {
    Actor.loop {
      react {
        case cmd@Apply0(rfun) =>
          kernel.remoteApply(node, name, sender, rfun)

        case cmd@LocalApply0(rfun, target) =>
          rfun(target, creator)

        // Request from remote proxy.
        // `this` is local proxy.
        case cmd@SendTo(out, msg, session) =>
          if (session.name == "nosession") {
            // local send
            out.send(msg, this)
          } else {
            // is this an active session?
            channelMap.get(session) match {
              case None =>
                // create a new reply channel...
                val replyCh = new Channel[Any](this)
                // ...that maps to session
                sessionMap(replyCh) = session
                // local send
                out.send(msg, replyCh)

              // finishes request-reply cycle
              case Some(replyCh) =>
                channelMap -= session
                replyCh ! msg
            }
          }

        case cmd@Terminate =>
          exit()

        // local proxy receives response to
        // reply channel
        case ch ! resp =>
          // lookup session ID
          sessionMap.get(ch) match {
            case Some(sid) =>
              sessionMap -= ch
              val msg = resp.asInstanceOf[AnyRef]
              // send back response
              kernel.forward(sender, node, name, msg, sid)

            case None =>
              Debug.info(this+": cannot find session for "+ch)
          }

        // remote proxy receives request
        case msg: AnyRef =>
          // find out whether it's a synchronous send
          if (sender.getClass.toString.contains("Channel")) {
            // create fresh session ID...
            val fresh = FreshNameCreator.newName(node+"@"+name)
            // ...that maps to reply channel
            channelMap(fresh) = sender
            kernel.forward(sender, node, name, msg, fresh)
          } else {
            kernel.forward(sender, node, name, msg, 'nosession)
          }
      }
    }
  }

}