1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.concurrent.impl
import scala.concurrent.{Awaitable, ExecutionContext}
import scala.collection.mutable.Stack
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
implicit def executor: ExecutionContext
/** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
//def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any]))
/** Tests whether this Future has been completed.
*/
final def isCompleted: Boolean = value.isDefined
/** The contained value of this Future. Before this Future is completed
* the value will be None. After completion the value will be Some(Right(t))
* if it contains a valid result, or Some(Left(error)) if it contains
* an exception.
*/
def value: Option[Either[Throwable, T]]
def onComplete[U](func: Either[Throwable, T] => U): this.type
}
object Future {
import java.{ lang => jl }
private val toBoxed = Map[Class[_], Class[_]](
classOf[Boolean] -> classOf[jl.Boolean],
classOf[Byte] -> classOf[jl.Byte],
classOf[Char] -> classOf[jl.Character],
classOf[Short] -> classOf[jl.Short],
classOf[Int] -> classOf[jl.Integer],
classOf[Long] -> classOf[jl.Long],
classOf[Float] -> classOf[jl.Float],
classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
def boxedType(c: Class[_]): Class[_] = {
if (c.isPrimitive) toBoxed(c) else c
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
val promise = new Promise.DefaultPromise[T]()
executor.execute(new Runnable {
def run = {
promise complete {
try {
Right(body)
} catch {
case e => scala.concurrent.resolver(e)
}
}
}
})
promise.future
}
// an optimization for batching futures
// TODO we should replace this with a public queue,
// so that it can be stolen from
// OR: a push to the local task queue should be so cheap that this is
// not even needed, but stealing is still possible
private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
private[impl] def releaseStack(executor: ExecutionContext): Unit =
_taskStack.get match {
case stack if (stack ne null) && stack.nonEmpty =>
val tasks = stack.elems
stack.clear()
_taskStack.remove()
dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
case null =>
// do nothing - there is no local batching stack anymore
case _ =>
_taskStack.remove()
}
private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
_taskStack.get match {
case stack if (stack ne null) && !force => stack push task
case _ => executor.execute(new Runnable {
def run() {
try {
val taskStack = Stack[() => Unit](task)
_taskStack set taskStack
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case e =>
// TODO catching all and continue isn't good for OOME
executor.reportFailure(e)
}
}
} finally {
_taskStack.remove()
}
}
})
}
}
|