summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/ReactChannel.scala
blob: 7e34681fb6c6e284245209365792826543a6189b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors

/**
 * @author Philipp Haller
 */
private[actors] class ReactChannel[Msg](receiver: InternalReplyReactor) extends InputChannel[Msg] {

  private case class SendToReactor(channel: ReactChannel[Msg], msg: Msg)

  /**
   * Sends a message to this <code>ReactChannel</code>.
   *
   * @param  msg the message to be sent
   */
  def !(msg: Msg) {
    receiver ! SendToReactor(this, msg)
  }

  /**
   * Sends a message to this `ReactChannel` (asynchronous) supplying
   * explicit reply destination.
   *
   * @param  msg     the message to send
   * @param  replyTo the reply destination
   */
  def send(msg: Msg, replyTo: OutputChannel[Any]) {
    receiver.send(SendToReactor(this, msg), replyTo)
  }

  /**
   * Forwards `msg` to `'''this'''` keeping the last sender as sender
   * instead of `self`.
   */
  def forward(msg: Msg) {
    receiver forward SendToReactor(this, msg)
  }

  /**
   * Receives a message from this `ReactChannel`.
   *
   * This method ''never'' returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   *
   * @param  f    a partial function with message patterns and actions
   */
  def react(f: PartialFunction[Msg, Unit]): Nothing = {
    val C = this
    receiver.react {
      case SendToReactor(C, msg) if (f.isDefinedAt(msg.asInstanceOf[Msg])) =>
        f(msg.asInstanceOf[Msg])
    }
  }

  /**
   * Receives a message from this `ReactChannel` within a certain time span.
   *
   * This method ''never'' returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function with message patterns and actions
   */
  def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
    val C = this
    val recvActor = receiver.asInstanceOf[Actor]
    recvActor.reactWithin(msec) {
      case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) =>
        f(msg.asInstanceOf[Msg])
      case TIMEOUT => f(TIMEOUT)
    }
  }

  /**
   * Receives a message from this `ReactChannel`.
   *
   * @param  f    a partial function with message patterns and actions
   * @return      result of processing the received value
   */
  def receive[R](f: PartialFunction[Msg, R]): R = {
    val C = this
    val recvActor = receiver.asInstanceOf[Actor]
    recvActor.receive {
      case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) =>
        f(msg.asInstanceOf[Msg])
    }
  }

  /**
   * Receives a message from this `ReactChannel` within a certain time span.
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function with message patterns and actions
   * @return      result of processing the received value
   */
  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
    val C = this
    val recvActor = receiver.asInstanceOf[Actor]
    recvActor.receiveWithin(msec) {
      case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) =>
        f(msg.asInstanceOf[Msg])
      case TIMEOUT => f(TIMEOUT)
    }
  }

  /**
   * Receives the next message from this `ReactChannel`.
   */
  def ? : Msg = receive {
    case x => x
  }

}