aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/scala/async/Async.scala
blob: 0b77c7898f07fee6a04a50b096b94b1b91f5e2fc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/**
 * Copyright (C) 2012 Typesafe Inc. <http://www.typesafe.com>
 */
package scala.async

import scala.language.experimental.macros
import scala.reflect.macros.Context

/*
 * @author Philipp Haller
 */
object Async extends AsyncBase {
  import scala.concurrent.Future

  lazy val futureSystem = ScalaConcurrentFutureSystem
  type FS = ScalaConcurrentFutureSystem.type

  def async[T](body: T) = macro asyncImpl[T]

  override def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[Future[T]] = super.asyncImpl[T](c)(body)
}

object AsyncId extends AsyncBase {
  lazy val futureSystem = IdentityFutureSystem
  type FS = IdentityFutureSystem.type

  def async[T](body: T) = macro asyncImpl[T]

  override def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[T] = super.asyncImpl[T](c)(body)
}

/**
 * A base class for the `async` macro. Subclasses must provide:
 *
 * - Concrete types for a given future system
 * - Tree manipulations to create and complete the equivalent of Future and Promise
 * in that system.
 * - The `async` macro declaration itself, and a forwarder for the macro implementation.
 * (The latter is temporarily needed to workaround bug SI-6650 in the macro system)
 *
 * The default implementation, [[scala.async.Async]], binds the macro to `scala.concurrent._`.
 */
abstract class AsyncBase {
  self =>

  type FS <: FutureSystem
  val futureSystem: FS

  /**
   * A call to `await` must be nested in an enclosing `async` block.
   *
   * A call to `await` does not block the current thread, rather it is a delimiter
   * used by the enclosing `async` macro. Code following the `await`
   * call is executed asynchronously, when the argument of `await` has been completed.
   *
   * @param awaitable the future from which a value is awaited.
   * @tparam T        the type of that value.
   * @return          the value.
   */
  // TODO Replace with `@compileTimeOnly when this is implemented SI-6539
  @deprecated("`await` must be enclosed in an `async` block", "0.1")
  def await[T](awaitable: futureSystem.Fut[T]): T = ???

  def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[futureSystem.Fut[T]] = {
    import c.universe._
    import Flag._

    val builder = new ExprBuilder[c.type, futureSystem.type](c, self.futureSystem)
    val anaylzer = new AsyncAnalysis[c.type](c)

    import builder.defn._
    import builder.name
    import builder.futureSystemOps

    anaylzer.reportUnsupportedAwaits(body.tree)

    // Transform to A-normal form:
    //  - no await calls in qualifiers or arguments,
    //  - if/match only used in statement position.
    val anfTree: Block = {
      val transform = new AnfTransform[c.type](c)
      val unique = transform.uniqueNames(body.tree)
      val stats1 :+ expr1 = transform.anf.transformToList(unique)

      val block = Block(stats1, expr1)

      c.typeCheck(block).asInstanceOf[Block]
    }

    // Analyze the block to find locals that will be accessed from multiple
    // states of our generated state machine, e.g. a value assigned before
    // an `await` and read afterwards.
    val renameMap: Map[Symbol, TermName] = {
      anaylzer.valDefsUsedInSubsequentStates(anfTree).map {
        vd =>
          (vd.symbol, builder.name.fresh(vd.name))
      }.toMap
    }

    val startState = builder.stateAssigner.nextState()
    val endState = Int.MaxValue

    val asyncBlockBuilder = new builder.AsyncBlockBuilder(anfTree.stats, anfTree.expr, startState, endState, renameMap)
    val handlerCases: List[CaseDef] = asyncBlockBuilder.mkCombinedHandlerCases[T]()

    import asyncBlockBuilder.asyncStates
    logDiagnostics(c)(anfTree, asyncStates.map(_.toString))
    val initStates = asyncStates.init
    val localVarTrees = asyncStates.flatMap(_.allVarDefs).toList

    /*
      lazy val onCompleteHandler = (tr: Try[Any]) => state match {
        case 0 => {
          x11 = tr.get.asInstanceOf[Double];
          state = 1;
          resume()
        }
        ...
     */
    val onCompleteHandler = {
      val onCompleteHandlers = initStates.flatMap(_.mkOnCompleteHandler()).toList
      Function(
        List(ValDef(Modifiers(PARAM), name.tr, TypeTree(TryAnyType), EmptyTree)),
        Match(Ident(name.state), onCompleteHandlers))
    }

    /*
      def resume(): Unit = {
        try {
          state match {
            case 0 => {
               f11 = exprReturningFuture
               f11.onComplete(onCompleteHandler)(context)
             }
            ...
           }
        } catch {
          case NonFatal(t) => result.failure(t)
        }
      }
     */
    val resumeFunTree: c.Tree = DefDef(Modifiers(), name.resume, Nil, List(Nil), Ident(definitions.UnitClass),
      Try(
        Match(Ident(name.state), handlerCases),
        List(
          CaseDef(
            Apply(Ident(NonFatalClass), List(Bind(name.tr, Ident(nme.WILDCARD)))),
            EmptyTree,
            Block(List({
              val t = c.Expr[Throwable](Ident(name.tr))
              futureSystemOps.completeProm[T](c.Expr[futureSystem.Prom[T]](Ident(name.result)), reify(scala.util.Failure(t.splice))).tree
            }), c.literalUnit.tree))), EmptyTree))


    val prom: Expr[futureSystem.Prom[T]] = reify {
      // Create the empty promise
      val result$async = futureSystemOps.createProm[T].splice
      // Initialize the state
      var state$async = 0
      // Resolve the execution context
      val execContext$async = futureSystemOps.execContext.splice
      var onCompleteHandler$async: util.Try[Any] => Unit = null

      // Spawn a future to:
      futureSystemOps.future[Unit] {
        c.Expr[Unit](Block(
          // define vars for all intermediate results that are accessed from multiple states
          localVarTrees :+
            // define the resume() method
            resumeFunTree :+
            // assign onComplete function. (The var breaks the circular dependency with resume)`
            Assign(Ident(name.onCompleteHandler), onCompleteHandler),
          // and get things started by calling resume()
          Apply(Ident(name.resume), Nil)))
      }(c.Expr[futureSystem.ExecContext](Ident(name.execContext))).splice
      // Return the promise from this reify block...
      result$async
    }
    // ... and return its Future from the macro.
    val result = futureSystemOps.promiseToFuture(prom)

    AsyncUtils.vprintln(s"async state machine transform expands to:\n ${result.tree}")

    result
  }

  def logDiagnostics(c: Context)(anfTree: c.Tree, states: Seq[String]) {
    def location = try {
      c.macroApplication.pos.source.path
    } catch {
      case _: UnsupportedOperationException =>
        c.macroApplication.pos.toString
    }

    AsyncUtils.vprintln(s"In file '$location':")
    AsyncUtils.vprintln(s"${c.macroApplication}")
    AsyncUtils.vprintln(s"ANF transform expands to:\n $anfTree")
    states foreach (s => AsyncUtils.vprintln(s))
  }
}