summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl/Future.scala
blob: 6a3487adde53646523ce57a7650dedf4a5e37cc7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.concurrent.impl



import scala.concurrent.util.Duration
import scala.concurrent.{Awaitable, ExecutionContext, CanAwait}
import scala.collection.mutable.Stack


private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {

}

private[concurrent] object Future {
  import java.{ lang => jl }

  private val toBoxed = Map[Class[_], Class[_]](
    classOf[Boolean] -> classOf[jl.Boolean],
    classOf[Byte]    -> classOf[jl.Byte],
    classOf[Char]    -> classOf[jl.Character],
    classOf[Short]   -> classOf[jl.Short],
    classOf[Int]     -> classOf[jl.Integer],
    classOf[Long]    -> classOf[jl.Long],
    classOf[Float]   -> classOf[jl.Float],
    classOf[Double]  -> classOf[jl.Double],
    classOf[Unit]    -> classOf[scala.runtime.BoxedUnit]
  )

  /** Wraps a block of code into an awaitable object. */
  private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] {
    def ready(atMost: Duration)(implicit permit: CanAwait) = {
      body
      this
    }
    def result(atMost: Duration)(implicit permit: CanAwait) = body
  }
  
  def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c

  private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T)
    extends Task {
    val promise = new Promise.DefaultPromise[T]()

    protected override def task() = {
      promise complete {
        try Right(body) catch {
          case NonFatal(e) =>
            // Commenting out reporting for now, since it produces too much output in the tests
            //executor.reportFailure(e)
            Left(e)
        }
      }
    }
  }

  def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
    val task = new PromiseCompletingTask(executor, body)
    task.dispatch()

    task.promise.future
  }

  private[impl] val throwableId: Throwable => Throwable = identity _

  // an optimization for batching futures
  // TODO we should replace this with a public queue,
  // so that it can be stolen from
  // OR: a push to the local task queue should be so cheap that this is
  // not even needed, but stealing is still possible

  private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)

  private val _taskStack = new ThreadLocal[TaskStack]()

  private[impl] trait Task extends Runnable {
    def executor: ExecutionContext

    // run the original callback (no dispatch)
    protected def task(): Unit

    // we implement Runnable to avoid creating
    // an extra object. run() runs ourselves with
    // a TaskStack pushed, and then runs any
    // other tasks that show up in the stack.
    final override def run() = {
      try {
        val taskStack = TaskStack(Stack[Task](this), executor)
        _taskStack set taskStack
        while (taskStack.stack.nonEmpty) {
          val next = taskStack.stack.pop()
          require(next.executor eq executor)
          try next.task() catch { case NonFatal(e) => executor reportFailure e }
        }
      } finally {
        _taskStack.remove()
      }
    }

    // send the task to the running executor.execute() via
    // _taskStack, or start a new executor.execute()
    def dispatch(force: Boolean = false): Unit =
      _taskStack.get match {
        case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
        case _ => executor.execute(this)
      }
  }

  private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
    extends Task {
    protected override def task() = {
      _taskStack.get.stack.elems = elems
    }
  }

  private[impl] def releaseStack(executor: ExecutionContext): Unit =
    _taskStack.get match {
      case stack if (stack ne null) && stack.stack.nonEmpty =>
        val tasks = stack.stack.elems
        stack.stack.clear()
        _taskStack.remove()
        val release = new ReleaseTask(executor, tasks)
        release.dispatch(force=true)
      case null =>
        // do nothing - there is no local batching stack anymore
      case _ =>
        _taskStack.remove()
    }

  private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
    extends Task {
    private var value: Either[Throwable, T] = null

    protected override def task() = {
      require(value ne null) // dispatch(value) must be called before dispatch()
      onComplete(value)
    }

    def dispatch(value: Either[Throwable, T]): Unit = {
      this.value = value
      dispatch()
    }
  }
}