summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/Future.scala
blob: 11602f52a234fc8b4caca55a179e6d5cab663e18 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors

import scala.actors.scheduler.DaemonScheduler
import scala.concurrent.SyncVar

/** A function of arity 0, returning a value of type `T` that,
 *  when applied, blocks the current actor (`Actor.self`)
 *  until the future's value is available.
 *
 *  A future can be queried to find out whether its value
 *  is already available without blocking.
 *
 *  @author Philipp Haller
 */
@deprecated("Use the scala.concurrent.Future instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
abstract class Future[+T] extends Responder[T] with Function0[T] {

  @volatile
  private[actors] var fvalue: Option[Any] = None
  private[actors] def fvalueTyped = fvalue.get.asInstanceOf[T]

  /** Tests whether the future's result is available.
   *
   *  @return `true`  if the future's result is available,
   *          `false` otherwise.
   */
  def isSet: Boolean

  /** Returns an input channel that can be used to receive the future's result.
   *
   *  @return the future's input channel
   */
  def inputChannel: InputChannel[T]

}

private case object Eval

private class FutureActor[T](fun: SyncVar[T] => Unit, channel: Channel[T]) extends Future[T] with DaemonActor {

  var enableChannel = false // guarded by this

  def isSet = !fvalue.isEmpty

  def apply(): T = {
    if (fvalue.isEmpty) {
      this !? Eval
    }
    fvalueTyped
  }

  def respond(k: T => Unit) {
    if (isSet) k(fvalueTyped)
    else {
      val ft = this !! Eval
      ft.inputChannel.react {
        case _ => k(fvalueTyped)
      }
    }
  }

  def inputChannel: InputChannel[T] = {
    synchronized {
      if (!enableChannel) {
        if (isSet)
          channel ! fvalueTyped
        enableChannel = true
      }
    }
    channel
  }

  def act() {
    val res = new SyncVar[T]

    {
      fun(res)
    } andThen {

      synchronized {
        val v = res.get
        fvalue =  Some(v)
        if (enableChannel)
          channel ! v
      }

      loop {
        react {
          // This is calling ReplyReactor#reply(msg: Any).
          // Was: reply().  Now: reply(()).
          case Eval => reply(())
        }
      }
    }
  }
}

/** Methods that operate on futures.
 *
 *  @author Philipp Haller
 */
@deprecated("Use the object scala.concurrent.Future instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
object Futures {

  /** Arranges for the asynchronous execution of `body`,
   *  returning a future representing the result.
   *
   *  @param  body the computation to be carried out asynchronously
   *  @return      the future representing the result of the
   *               computation
   */
  def future[T](body: => T): Future[T] = {
    val c = new Channel[T](Actor.self(DaemonScheduler))
    val a = new FutureActor[T](_.set(body), c)
    a.start()
    a
  }

  /** Creates a future that resolves after a given time span.
   *
   *  @param  timespan the time span in ms after which the future resolves
   *  @return          the future
   */
  def alarm(timespan: Long): Future[Unit] = {
    val c = new Channel[Unit](Actor.self(DaemonScheduler))
    val fun = (res: SyncVar[Unit]) => {
      Actor.reactWithin(timespan) {
        case TIMEOUT => res.set({})
      }
    }
    val a = new FutureActor[Unit](fun, c)
    a.start()
    a
  }

  /** Waits for the first result returned by one of two
   *  given futures.
   *
   *  @param  ft1 the first future
   *  @param  ft2 the second future
   *  @return the result of the future that resolves first
   */
  def awaitEither[A, B >: A](ft1: Future[A], ft2: Future[B]): B = {
    val FutCh1 = ft1.inputChannel
    val FutCh2 = ft2.inputChannel
    Actor.receive {
      case FutCh1 ! arg1 => arg1.asInstanceOf[B]
      case FutCh2 ! arg2 => arg2.asInstanceOf[B]
    }
  }

  /** Waits until either all futures are resolved or a given
   *  time span has passed. Results are collected in a list of
   *  options. The result of a future that resolved during the
   *  time span is its value wrapped in `Some`. The result of a
   *  future that did not resolve during the time span is `None`.
   *
   *  Note that some of the futures might already have been awaited,
   *  in which case their value is returned wrapped in `Some`.
   *  Passing a timeout of 0 causes `awaitAll` to return immediately.
   *
   *  @param  timeout the time span in ms after which waiting is
   *                  aborted
   *  @param  fts     the futures to be awaited
   *  @return         the list of optional future values
   *  @throws java.lang.IllegalArgumentException  if timeout is negative,
   *                  or timeout + `System.currentTimeMillis()` is negative.
   */
  def awaitAll(timeout: Long, fts: Future[Any]*): List[Option[Any]] = {
    val resultsMap: scala.collection.mutable.Map[Int, Option[Any]] = new scala.collection.mutable.HashMap[Int, Option[Any]]

    var cnt = 0
    val mappedFts = fts.map(ft =>
      ({cnt+=1; cnt-1}, ft))

    val unsetFts = mappedFts.filter((p: Tuple2[Int, Future[Any]]) => {
      if (p._2.isSet) { resultsMap(p._1) = Some(p._2()); false }
      else { resultsMap(p._1) = None; true }
    })

    val partFuns = unsetFts.map((p: Tuple2[Int, Future[Any]]) => {
      val FutCh = p._2.inputChannel
      val singleCase: PartialFunction[Any, Tuple2[Int, Any]] = {
        case FutCh ! any => (p._1, any)
      }
      singleCase
    })

    val thisActor = Actor.self
    val timerTask = new java.util.TimerTask {
      def run() { thisActor ! TIMEOUT }
    }
    Actor.timer.schedule(timerTask, timeout)

    def awaitWith(partFuns: Seq[PartialFunction[Any, Tuple2[Int, Any]]]) {
      val reaction: PartialFunction[Any, Unit] = new PartialFunction[Any, Unit] {
        def isDefinedAt(msg: Any) = msg match {
          case TIMEOUT => true
          case _ => partFuns exists (_ isDefinedAt msg)
        }
        def apply(msg: Any): Unit = msg match {
          case TIMEOUT => // do nothing
          case _ => {
            val pfOpt = partFuns find (_ isDefinedAt msg)
            val pf = pfOpt.get // succeeds always
            val (idx, subres) = pf(msg)
            resultsMap(idx) = Some(subres)

            val partFunsRest = partFuns filter (_ != pf)
            // wait on rest of partial functions
            if (partFunsRest.length > 0)
              awaitWith(partFunsRest)
          }
        }
      }
      Actor.receive(reaction)
    }

    if (partFuns.length > 0)
      awaitWith(partFuns)

    var results: List[Option[Any]] = Nil
    val size = resultsMap.size
    for (i <- 0 until size) {
      results = resultsMap(size - i - 1) :: results
    }

    // cancel scheduled timer task
    timerTask.cancel()

    results
  }

}