summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/InternalReplyReactor.scala
blob: c744984fd8e7b733219df454706c051f9b2287c1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package scala.actors

import java.util.{TimerTask}

/** 
 * Extends the [[scala.actors.Reactor]]
 *    trait with methods to reply to the sender of a message.
 *    Sending a message to a <code>ReplyReactor</code> implicitly
 *    passes a reference to the sender together with the message.
 *
 *  @author Philipp Haller
 *
 *  @define actor `ReplyReactor`
 */
@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
trait InternalReplyReactor extends Reactor[Any] with ReactorCanReply {

  /* A list of the current senders. The head of the list is
   * the sender of the message that was received last.
   */
  @volatile
  private[actors] var senders: List[OutputChannel[Any]] = List()

  /* This option holds a TimerTask when the actor waits in a
   * reactWithin. The TimerTask is cancelled when the actor
   * resumes.
   *
   * guarded by this
   */
  private[actors] var onTimeout: Option[TimerTask] = None

  /**
   * Returns the $actor which sent the last received message.
   */
  protected[actors] def internalSender: OutputChannel[Any] = senders.head

  /**
   * Replies with <code>msg</code> to the sender.
   */
  protected[actors] def reply(msg: Any) {
    internalSender ! msg
  }

  override def !(msg: Any) {
    send(msg, Actor.rawSelf(scheduler))
  }

  override def forward(msg: Any) {
    send(msg, Actor.sender)
  }

  private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
    synchronized {
      if (!onTimeout.isEmpty) {
        onTimeout.get.cancel()
        onTimeout = None
      }
    }
    senders = List(item._2)
    super.resumeReceiver(item, handler, onSameThread)
  }

  private[actors] override def searchMailbox(startMbox: MQueue[Any],
                                             handler: PartialFunction[Any, Any],
                                             resumeOnSameThread: Boolean) {
    var tmpMbox = startMbox
    var done = false
    while (!done) {
      val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
        senders = List(replyTo)
        handler.isDefinedAt(msg)
      })
      if (tmpMbox ne mailbox)
        tmpMbox.foreach((m, s) => mailbox.append(m, s))
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            tmpMbox = new MQueue[Any]("Temp")
            drainSendBuffer(tmpMbox)
            // keep going
          } else {
            waitingFor = handler
            // see Reactor.searchMailbox
            throw Actor.suspendException
          }
        }
      } else {
        resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
        done = true
      }
    }
  }

  private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
    new ReplyReactorTask(this, fun, handler, msg)

  protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
    assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
    super.react(handler)
  }
   
  
  /**
   * Receives a message from this $actor's mailbox within a certain
   * time span.
   *
   * This method never returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   *
   * @param  msec     the time span before timeout
   * @param  handler  a partial function with message patterns and actions
   */
  protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
    assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")

    synchronized { drainSendBuffer(mailbox) }

    // first, remove spurious TIMEOUT message from mailbox if any
    mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)

    while (true) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = List(replyTo)
        handler isDefinedAt m
      })
      if (null eq qel) {
        synchronized {
          // in mean time new messages might have arrived
          if (!sendBuffer.isEmpty) {
            drainSendBuffer(mailbox)
            // keep going
          } else if (msec == 0L) {
            // throws Actor.suspendException
            resumeReceiver((TIMEOUT, this), handler, false)
          } else {
            waitingFor = handler
            val thisActor = this
            onTimeout = Some(new TimerTask {
              def run() { thisActor.send(TIMEOUT, thisActor) }
            })
            Actor.timer.schedule(onTimeout.get, msec)
            throw Actor.suspendException
          }
        }
      } else
        resumeReceiver((qel.msg, qel.session), handler, false)
    }
    throw Actor.suspendException
  }

  override def getState: Actor.State.Value = synchronized {
    if (waitingFor ne Reactor.waitingForNone) {
      if (onTimeout.isEmpty)
        Actor.State.Suspended
      else
        Actor.State.TimedSuspended
    } else
      _state
  }

}