1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
|
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.actors
import scala.actors.scheduler.DaemonScheduler
import scala.concurrent.SyncVar
/** A function of arity 0, returning a value of type `T` that,
* when applied, blocks the current actor (`Actor.self`)
* until the future's value is available.
*
* A future can be queried to find out whether its value
* is already available without blocking.
*
* @author Philipp Haller
*/
@deprecated("Use the scala.concurrent.Future instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
abstract class Future[+T] extends Responder[T] with Function0[T] {
@volatile
private[actors] var fvalue: Option[Any] = None
private[actors] def fvalueTyped = fvalue.get.asInstanceOf[T]
/** Tests whether the future's result is available.
*
* @return `true` if the future's result is available,
* `false` otherwise.
*/
def isSet: Boolean
/** Returns an input channel that can be used to receive the future's result.
*
* @return the future's input channel
*/
def inputChannel: InputChannel[T]
}
private case object Eval
private class FutureActor[T](fun: SyncVar[T] => Unit, channel: Channel[T]) extends Future[T] with DaemonActor {
var enableChannel = false // guarded by this
def isSet = !fvalue.isEmpty
def apply(): T = {
if (fvalue.isEmpty) {
this !? Eval
}
fvalueTyped
}
def respond(k: T => Unit) {
if (isSet) k(fvalueTyped)
else {
val ft = this !! Eval
ft.inputChannel.react {
case _ => k(fvalueTyped)
}
}
}
def inputChannel: InputChannel[T] = {
synchronized {
if (!enableChannel) {
if (isSet)
channel ! fvalueTyped
enableChannel = true
}
}
channel
}
def act() {
val res = new SyncVar[T]
{
fun(res)
} andThen {
synchronized {
val v = res.get
fvalue = Some(v)
if (enableChannel)
channel ! v
}
loop {
react {
// This is calling ReplyReactor#reply(msg: Any).
// Was: reply(). Now: reply(()).
case Eval => reply(())
}
}
}
}
}
/** Methods that operate on futures.
*
* @author Philipp Haller
*/
@deprecated("Use the object scala.concurrent.Future instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
object Futures {
/** Arranges for the asynchronous execution of `body`,
* returning a future representing the result.
*
* @param body the computation to be carried out asynchronously
* @return the future representing the result of the
* computation
*/
def future[T](body: => T): Future[T] = {
val c = new Channel[T](Actor.self(DaemonScheduler))
val a = new FutureActor[T](_.set(body), c)
a.start()
a
}
/** Creates a future that resolves after a given time span.
*
* @param timespan the time span in ms after which the future resolves
* @return the future
*/
def alarm(timespan: Long): Future[Unit] = {
val c = new Channel[Unit](Actor.self(DaemonScheduler))
val fun = (res: SyncVar[Unit]) => {
Actor.reactWithin(timespan) {
case TIMEOUT => res.set({})
}
}
val a = new FutureActor[Unit](fun, c)
a.start()
a
}
/** Waits for the first result returned by one of two
* given futures.
*
* @param ft1 the first future
* @param ft2 the second future
* @return the result of the future that resolves first
*/
def awaitEither[A, B >: A](ft1: Future[A], ft2: Future[B]): B = {
val FutCh1 = ft1.inputChannel
val FutCh2 = ft2.inputChannel
Actor.receive {
case FutCh1 ! arg1 => arg1.asInstanceOf[B]
case FutCh2 ! arg2 => arg2.asInstanceOf[B]
}
}
/** Waits until either all futures are resolved or a given
* time span has passed. Results are collected in a list of
* options. The result of a future that resolved during the
* time span is its value wrapped in `Some`. The result of a
* future that did not resolve during the time span is `None`.
*
* Note that some of the futures might already have been awaited,
* in which case their value is returned wrapped in `Some`.
* Passing a timeout of 0 causes `awaitAll` to return immediately.
*
* @param timeout the time span in ms after which waiting is
* aborted
* @param fts the futures to be awaited
* @return the list of optional future values
* @throws java.lang.IllegalArgumentException if timeout is negative,
* or timeout + `System.currentTimeMillis()` is negative.
*/
def awaitAll(timeout: Long, fts: Future[Any]*): List[Option[Any]] = {
val resultsMap: scala.collection.mutable.Map[Int, Option[Any]] = new scala.collection.mutable.HashMap[Int, Option[Any]]
var cnt = 0
val mappedFts = fts.map(ft =>
({cnt+=1; cnt-1}, ft))
val unsetFts = mappedFts.filter((p: Tuple2[Int, Future[Any]]) => {
if (p._2.isSet) { resultsMap(p._1) = Some(p._2()); false }
else { resultsMap(p._1) = None; true }
})
val partFuns = unsetFts.map((p: Tuple2[Int, Future[Any]]) => {
val FutCh = p._2.inputChannel
val singleCase: PartialFunction[Any, Tuple2[Int, Any]] = {
case FutCh ! any => (p._1, any)
}
singleCase
})
val thisActor = Actor.self
val timerTask = new java.util.TimerTask {
def run() { thisActor ! TIMEOUT }
}
Actor.timer.schedule(timerTask, timeout)
def awaitWith(partFuns: Seq[PartialFunction[Any, Tuple2[Int, Any]]]) {
val reaction: PartialFunction[Any, Unit] = new PartialFunction[Any, Unit] {
def isDefinedAt(msg: Any) = msg match {
case TIMEOUT => true
case _ => partFuns exists (_ isDefinedAt msg)
}
def apply(msg: Any): Unit = msg match {
case TIMEOUT => // do nothing
case _ => {
val pfOpt = partFuns find (_ isDefinedAt msg)
val pf = pfOpt.get // succeeds always
val (idx, subres) = pf(msg)
resultsMap(idx) = Some(subres)
val partFunsRest = partFuns filter (_ != pf)
// wait on rest of partial functions
if (partFunsRest.length > 0)
awaitWith(partFunsRest)
}
}
}
Actor.receive(reaction)
}
if (partFuns.length > 0)
awaitWith(partFuns)
var results: List[Option[Any]] = Nil
val size = resultsMap.size
for (i <- 0 until size) {
results = resultsMap(size - i - 1) :: results
}
// cancel scheduled timer task
timerTask.cancel()
results
}
}
|