diff options
-rw-r--r-- | README.md | 327 | ||||
-rw-r--r-- | src/main/scala/scala/async/Async.scala | 3 | ||||
-rw-r--r-- | src/main/scala/scala/async/AsyncAnalysis.scala | 32 | ||||
-rw-r--r-- | src/main/scala/scala/async/TransformUtils.scala | 56 | ||||
-rw-r--r-- | src/test/scala/scala/async/TreeInterrogation.scala | 20 | ||||
-rw-r--r-- | src/test/scala/scala/async/neg/NakedAwait.scala | 22 | ||||
-rw-r--r-- | src/test/scala/scala/async/run/toughtype/ToughType.scala | 31 |
7 files changed, 452 insertions, 39 deletions
@@ -1,13 +1,156 @@ -Scala Async Project -=================== +# Scala Async Project -Building --------- +## Quickstart + + - Add `scala-async.jar` to your classpath + - Use Scala 2.10.0 + +```scala +import ExecutionContext.Implicits.global +import scala.async.Async.{async, await} + +val future = async { + val f1 = async { ...; true } + val f2 = async { ...; 42 } + if (await(f1)) await(f2) else 0 +} +``` + +## What is `async`? + +`async` marks a block of asynchronous code. Such a block usually contains +one or more `await` calls, which marks a point at which the computation +will be suspended until the awaited `Future` is complete. + +By default, `async` blocks operate on `scala.concurrent.{Future, Promise}`. +The system can be adapted to alternative implementations of the +`Future` pattern. + +Consider the following example: + +```scala +def slowCalcFuture: Future[Int] = ... // 01 +def combined: Future[Int] = async { // 02 + await(slowCalcFuture) + await(slowCalcFuture) // 03 +} +val x: Int = Await.result(combined, 10.seconds) // 05 +``` + +Lines 1 defines an asynchronous method: it returns a `Future`. + +Line 3 begins an `async` block. During compilation, +the contents of this block will be analyzed to identify +the `await` calls, and transformed into non-blocking +code. + +Control flow will immediately pass to line 5, as the +computation in the `async` block is not executed +on the caller's thread. + +Line 4 begins by triggering `slowCalcFuture`, and then +suspending until it has been calculating. Only after it +has finished, we trigger it again, and suspend again. +Finally, we add the results and complete `combined`, which +in turn will release line 5 (unless it had already timed out). + +It is important to note that while we this code is non-blocking, +it is not parallel. If we wanted to parallelize the two computations, +we could rearrange the code as follows. + +```scala +def combined: Future[Int] = async { + val future1 = slowCalcFuture + val future2 = slowCalcFuture + await(future1) + await(future2) +} +``` + +## Comparison with direct use of `Future` API + +This computation could also be expressed by directly using the +higher-order functions of Futures: + +```scala +def slowCalcFuture: Future[Int] = ... +val future1 = slowCalcFuture +val future2 = slowCalcFuture +def combined: Future[Int] = for { + r1 <- future1 + r2 <- future2 +} yield r1 + r2 +``` + +The `async` approach has two advantages over the use of +`map` and `flatMap`. + 1. The code more directly reflects the programmers intent, + and does not require us to name the results `r1` and `r2`. + This advantage is even more pronounces when we mix control + structures in `async` blocks. + 2. `async` blocks are compiled to a single anonymous class, + as opposed to a seperate anonymous class for each closure + required at each generator (`<-`) in the for-comprehension. + This reduces the size of generated code, and can avoid boxing + of intermediate results. + +## Comparison with CPS plugin + +The existing continations (CPS) plugin for Scala can also be used +to provide syntactic layer like `async`. This approach has been +used in Akka's [Dataflow Concurrency](http://doc.akka.io/docs/akka/snapshot/scala/dataflow.html) + +CPS based rewriting of asynchrous code also produces an closure +for each suspension. It can also lead to type errors that are +difficult to understand. + +## How it works + + - The async macro analyses the block of code, looking for control + structures and locations of await calls. It then breaks the code + into 'chunks'. Each chunk contains a linear sequence of statements + that concludes with branching decision, or with the registration + of subsequent state handler as a continuation. + - Before this analysis and transformation, the program is normalized + into a form amenable to this manipulation. This is called the + "A Normal Form" (ANF), and roughly means that: + - `if` and `match` constructs are only used as statements; + they cannot be used as an expression. + - calls to await are not allowed in compound expressions. + - Identify vals, vars and defs that are accessed from multiple + states. These will be lifted out to fields in the state machine + object. + - Synthesize a class that holds: + - an integer representing the current state ID + - the lifted definitions + - an `apply(value: Try[Any]): Unit` method that will be + called on completion of each future. This behaviour of + this method is determined by the current state. It records + the downcast result of the future in a field, and calls the + `resume()` method. + - the `resume(): Unit` method that switches on the current state + and runs the users code for one 'chunk', and either: + a) registers the state machine as the handler for the next future + b) completes the result Promise of the async block, if at the terminal state. + - an `apply(): Unit` method that starts the computation. + +## Troubleshooting + - Logging of the transform can be enabled with `scalac -Dscala.async.debug=true`. + - Tracing of the ANF transform: `scalac -Dscala.async.trace=true` + - Debug the macro expansion by checking out the project and executing the application + [`TreeInterrogation`](https://github.com/phaller/scala-async/blob/master/src/test/scala/scala/async/TreeInterrogation.scala#L59) + +## Limitations + - See the [neg](https://github.com/phaller/scala-async/tree/master/src/test/scala/scala/async/neg) test cases for + for constructs that are not allowed in a async block + - See the [issue list](https://github.com/phaller/scala-async/issues?state=open) for which of these restrictions are planned + to be dropped in the next milestone. + - See [#13](https://github.com/phaller/scala-async/issues/13) for why `await` is not possible in closures, and for suggestions on + ways to structure the code to work around this limitation. + +## Building The async macro and its test suite can be built and run with SBT. -Contributing ------------- +## Contributing If you are interested in contributing code, we ask you to complete and submit to us the Scala Contributor License Agreement, which allows us to ensure that @@ -17,3 +160,175 @@ http://www.scala-lang.org/sites/default/files/contributor_agreement.pdf Before submitting a pull-request, please make sure you have followed the guidelines outlined in our [Pull Request Policy](https://github.com/scala/scala/wiki/Pull-Request-Policy). + +### Generated Code examples + +```scala +val future = async { + val f1 = async { true } + val x = 1 + def inc(t: Int) = t + x + val t = 0 + val f2 = async { 42 } + if (await(f1)) await(f2) else { val z = 1; inc(t + z) } +} +``` + +After ANF transform. + + - await calls are moved to only appear on the LHS of an value definition. + - `if` is not used as an expression, instead each branch writes its result + to a synthetic `var`. + +```scala + { + (); + val f1: scala.concurrent.Future[Boolean] = { + scala.concurrent.Future.apply[Boolean](true)(scala.concurrent.ExecutionContext.Implicits.global) + }; + val x: Int = 1; + def inc(t: Int): Int = t.+(x); + val t: Int = 0; + val f2: scala.concurrent.Future[Int] = { + scala.concurrent.Future.apply[Int](42)(scala.concurrent.ExecutionContext.Implicits.global) + }; + val await$1: Boolean = scala.async.Async.await[Boolean](f1); + var ifres$1: Int = 0; + if (await$1) + { + val await$2: Int = scala.async.Async.await[Int](f2); + ifres$1 = await$2 + } + else + { + ifres$1 = { + val z: Int = 1; + inc(t.+(z)) + } + }; + ifres$1 +} +``` + +After async transform: + + - one class synthesized to act as the state machine. It's `apply()` method will + be used to start the computation (even the code before the first await call + is executed asynchronously), and the the `apply(tr: scala.util.Try[Any])` method + will continue after each completed background task. + - each chunk of code moved into the a branch of the pattern match in `resume$async`. + - value and method definitions accessed from multiple states are lifted to be + members of `class stateMachine`. Others remain local, e.g. `val z`. + +```scala + { + class stateMachine$7 extends StateMachine[scala.concurrent.Promise[Int], scala.concurrent.ExecutionContext] { + def <init>() = { + super.<init>(); + () + }; + var state$async: Int = 0; + val result$async: scala.concurrent.Promise[Int] = scala.concurrent.Promise.apply[Int](); + val execContext$async = scala.concurrent.ExecutionContext.Implicits.global; + var x$1: Int = 0; + def inc$1(t: Int): Int = t.$plus(x$1); + var t$1: Int = 0; + var f2$1: scala.concurrent.Future[Int] = null; + var await$1: Boolean = false; + var ifres$1: Int = 0; + var await$2: Int = 0; + def resume$async(): Unit = try { + state$async match { + case 0 => { + (); + val f1 = { + scala.concurrent.Future.apply[Boolean](true)(scala.concurrent.ExecutionContext.Implicits.global) + }; + x$1 = 1; + t$1 = 0; + f2$1 = { + scala.concurrent.Future.apply[Int](42)(scala.concurrent.ExecutionContext.Implicits.global) + }; + f1.onComplete(this)(execContext$async) + } + case 1 => { + ifres$1 = 0; + if (await$1) + { + state$async = 2; + resume$async() + } + else + { + state$async = 3; + resume$async() + } + } + case 2 => { + f2$1.onComplete(this)(execContext$async); + () + } + case 5 => { + ifres$1 = await$2; + state$async = 4; + resume$async() + } + case 3 => { + ifres$1 = { + val z = 1; + inc$1(t$1.$plus(z)) + }; + state$async = 4; + resume$async() + } + case 4 => { + result$async.complete(scala.util.Success.apply(ifres$1)); + () + } + } + } catch { + case NonFatal((tr @ _)) => { + { + result$async.complete(scala.util.Failure.apply(tr)); + () + }; + () + } + }; + def apply(tr: scala.util.Try[Any]): Unit = state$async match { + case 0 => { + if (tr.isFailure) + { + result$async.complete(tr.asInstanceOf[scala.util.Try[Int]]); + () + } + else + { + await$1 = tr.get.asInstanceOf[Boolean]; + state$async = 1; + resume$async() + }; + () + } + case 2 => { + if (tr.isFailure) + { + result$async.complete(tr.asInstanceOf[scala.util.Try[Int]]); + () + } + else + { + await$2 = tr.get.asInstanceOf[Int]; + state$async = 5; + resume$async() + }; + () + } + }; + def apply: Unit = resume$async() + }; + val stateMachine$7: StateMachine[scala.concurrent.Promise[Int], scala.concurrent.ExecutionContext] = new stateMachine$7(); + scala.concurrent.Future.apply(stateMachine$7.apply())(scala.concurrent.ExecutionContext.Implicits.global); + stateMachine$7.result$async.future +} +``` diff --git a/src/main/scala/scala/async/Async.scala b/src/main/scala/scala/async/Async.scala index c2969a9..12fe428 100644 --- a/src/main/scala/scala/async/Async.scala +++ b/src/main/scala/scala/async/Async.scala @@ -78,7 +78,8 @@ abstract class AsyncBase { // - if/match only used in statement position. val anfTree: Block = { val anf = AnfTransform[c.type](c) - val stats1 :+ expr1 = anf(body.tree) + val restored = utils.restorePatternMatchingFunctions(body.tree) + val stats1 :+ expr1 = anf(restored) val block = Block(stats1, expr1) c.typeCheck(block).asInstanceOf[Block] } diff --git a/src/main/scala/scala/async/AsyncAnalysis.scala b/src/main/scala/scala/async/AsyncAnalysis.scala index bda4d5c..9184960 100644 --- a/src/main/scala/scala/async/AsyncAnalysis.scala +++ b/src/main/scala/scala/async/AsyncAnalysis.scala @@ -71,15 +71,21 @@ private[async] final case class AsyncAnalysis[C <: Context](c: C, asyncBase: Asy reportUnsupportedAwait(function, "nested function") } + override def patMatFunction(tree: Match) { + reportUnsupportedAwait(tree, "nested function") + } + override def traverse(tree: Tree) { def containsAwait = tree exists isAwait tree match { - case Try(_, _, _) if containsAwait => + case Try(_, _, _) if containsAwait => reportUnsupportedAwait(tree, "try/catch") super.traverse(tree) - case Return(_) => + case Return(_) => c.abort(tree.pos, "return is illegal within a async block") - case _ => + case ValDef(mods, _, _, _) if mods.hasFlag(Flag.LAZY) => + c.abort(tree.pos, "lazy vals are illegal within an async block") + case _ => super.traverse(tree) } } @@ -117,19 +123,19 @@ private[async] final case class AsyncAnalysis[C <: Context](c: C, asyncBase: Asy override def nestedMethod(defDef: DefDef) { nestedMethodsToLift += defDef - defDef.rhs foreach { - case rt: RefTree => - valDefChunkId.get(rt.symbol) match { - case Some((vd, defChunkId)) => - valDefsToLift += vd // lift all vals referred to by nested methods. - case _ => - } - case _ => - } + markReferencedVals(defDef) } override def function(function: Function) { - function foreach { + markReferencedVals(function) + } + + override def patMatFunction(tree: Match) { + markReferencedVals(tree) + } + + private def markReferencedVals(tree: Tree) { + tree foreach { case rt: RefTree => valDefChunkId.get(rt.symbol) match { case Some((vd, defChunkId)) => diff --git a/src/main/scala/scala/async/TransformUtils.scala b/src/main/scala/scala/async/TransformUtils.scala index faebcc3..db82ed6 100644 --- a/src/main/scala/scala/async/TransformUtils.scala +++ b/src/main/scala/scala/async/TransformUtils.scala @@ -25,12 +25,14 @@ private[async] final case class TransformUtils[C <: Context](c: C) { val stateMachine = newTermName(fresh("stateMachine")) val stateMachineT = stateMachine.toTypeName val apply = newTermName("apply") + val applyOrElse = newTermName("applyOrElse") val tr = newTermName("tr") val matchRes = "matchres" val ifRes = "ifres" val await = "await" val bindSuffix = "$bind" - def arg(i: Int) = "arg" + i + + def arg(i: Int) = "arg" + i def fresh(name: TermName): TermName = newTermName(fresh(name.toString)) @@ -64,7 +66,7 @@ private[async] final case class TransformUtils[C <: Context](c: C) { /** Descends into the regions of the tree that are subject to the * translation to a state machine by `async`. When a nested template, - * function, or by-name argument is encountered, the descend stops, + * function, or by-name argument is encountered, the descent stops, * and `nestedClass` etc are invoked. */ trait AsyncTraverser extends Traverser { @@ -83,20 +85,24 @@ private[async] final case class TransformUtils[C <: Context](c: C) { def function(function: Function) { } + def patMatFunction(tree: Match) { + } + override def traverse(tree: Tree) { tree match { - case cd: ClassDef => nestedClass(cd) - case md: ModuleDef => nestedModule(md) - case dd: DefDef => nestedMethod(dd) - case fun: Function => function(fun) - case Apply(fun, args) => + case cd: ClassDef => nestedClass(cd) + case md: ModuleDef => nestedModule(md) + case dd: DefDef => nestedMethod(dd) + case fun: Function => function(fun) + case m@Match(EmptyTree, _) => patMatFunction(m) // Pattern matching anonymous function under -Xoldpatmat of after `restorePatternMatchingFunctions` + case Apply(fun, args) => val isInByName = isByName(fun) for ((arg, index) <- args.zipWithIndex) { if (!isInByName(index)) traverse(arg) else byNameArgument(arg) } traverse(fun) - case _ => super.traverse(tree) + case _ => super.traverse(tree) } } } @@ -248,6 +254,40 @@ private[async] final case class TransformUtils[C <: Context](c: C) { } } + /** + * Replaces expressions of the form `{ new $anon extends PartialFunction[A, B] { ... ; def applyOrElse[..](...) = ... match <cases> }` + * with `Match(EmptyTree, cases`. + * + * This reverses the transformation performed in `Typers`, and works around non-idempotency of typechecking such trees. + */ + // TODO Reference JIRA issue. + final def restorePatternMatchingFunctions(tree: Tree) = + RestorePatternMatchingFunctions transform tree + + private object RestorePatternMatchingFunctions extends Transformer { + + import language.existentials + + override def transform(tree: Tree): Tree = { + val SYNTHETIC = (1 << 21).toLong.asInstanceOf[FlagSet] + def isSynthetic(cd: ClassDef) = cd.mods hasFlag SYNTHETIC + + tree match { + case Block( + (cd@ClassDef(_, _, _, Template(_, _, body))) :: Nil, + Apply(Select(New(a), nme.CONSTRUCTOR), Nil)) if isSynthetic(cd) => + val restored = (body collectFirst { + case DefDef(_, /*name.apply | */ name.applyOrElse, _, _, _, Match(_, cases)) => + val transformedCases = super.transformStats(cases, currentOwner).asInstanceOf[List[CaseDef]] + Match(EmptyTree, transformedCases) + }).getOrElse(c.abort(tree.pos, s"Internal Error: Unable to find original pattern matching cases in: $body")) + restored + case t => super.transform(t) + } + } + } + + def isSafeToInline(tree: Tree) = { val symtab = c.universe.asInstanceOf[scala.reflect.internal.SymbolTable] object treeInfo extends { diff --git a/src/test/scala/scala/async/TreeInterrogation.scala b/src/test/scala/scala/async/TreeInterrogation.scala index b22faa9..93cfdf5 100644 --- a/src/test/scala/scala/async/TreeInterrogation.scala +++ b/src/test/scala/scala/async/TreeInterrogation.scala @@ -62,23 +62,21 @@ object TreeInterrogation extends App { val levels = Seq("trace", "debug") def setAll(value: Boolean) = levels.foreach(set(_, value)) - setAll(true) - try t finally setAll(false) + setAll(value = true) + try t finally setAll(value = false) } withDebug { val cm = reflect.runtime.currentMirror - val tb = mkToolbox("-cp target/scala-2.10/classes -Xprint:all") + val tb = mkToolbox("-cp target/scala-2.10/classes -Xprint:flatten") val tree = tb.parse( """ import scala.async.AsyncId.{async, await} - | def foo(a: Int, b: Int) = (a, b) - | val result = async { - | var i = 0 - | def next() = { - | i += 1; - | i - | } - | foo(next(), await(next())) + | async { + | await(1) + | val neg1 = -1 + | val a = await(1) + | val f = { case x => ({case x => neg1 * x}: PartialFunction[Int, Int])(x + a) }: PartialFunction[Int, Int] + | await(f(2)) | } | () | """.stripMargin) diff --git a/src/test/scala/scala/async/neg/NakedAwait.scala b/src/test/scala/scala/async/neg/NakedAwait.scala index ecc84f9..c3537ec 100644 --- a/src/test/scala/scala/async/neg/NakedAwait.scala +++ b/src/test/scala/scala/async/neg/NakedAwait.scala @@ -93,6 +93,16 @@ class NakedAwait { } @Test + def nestedPatMatFunction() { + expectError("await must not be used under a nested class.") { // TODO more specific error message + """ + | import _root_.scala.async.AsyncId._ + | async { { case x => { await(false) } } : PartialFunction[Any, Any] } + """.stripMargin + } + } + + @Test def tryBody() { expectError("await must not be used under a try/catch.") { """ @@ -143,4 +153,16 @@ class NakedAwait { |""".stripMargin } } + + @Test + def lazyValIllegal() { + expectError("lazy vals are illegal") { + """ + | import _root_.scala.async.AsyncId._ + | def foo(): Any = async { val x = { lazy val y = 0; y } } + | () + | + |""".stripMargin + } + } } diff --git a/src/test/scala/scala/async/run/toughtype/ToughType.scala b/src/test/scala/scala/async/run/toughtype/ToughType.scala index 9cfc1ca..83f5a2d 100644 --- a/src/test/scala/scala/async/run/toughtype/ToughType.scala +++ b/src/test/scala/scala/async/run/toughtype/ToughType.scala @@ -36,4 +36,35 @@ class ToughTypeSpec { val res: (List[_], scala.async.run.toughtype.ToughTypeObject.Inner) = Await.result(fut, 2 seconds) res._1 mustBe (Nil) } + + @Test def patternMatchingPartialFunction() { + import AsyncId.{await, async} + async { + await(1) + val a = await(1) + val f = { case x => x + a }: PartialFunction[Int, Int] + await(f(2)) + } mustBe 3 + } + + @Test def patternMatchingPartialFunctionNested() { + import AsyncId.{await, async} + async { + await(1) + val neg1 = -1 + val a = await(1) + val f = { case x => ({case x => neg1 * x}: PartialFunction[Int, Int])(x + a) }: PartialFunction[Int, Int] + await(f(2)) + } mustBe -3 + } + + @Test def patternMatchingFunction() { + import AsyncId.{await, async} + async { + await(1) + val a = await(1) + val f = { case x => x + a }: Function[Int, Int] + await(f(2)) + } mustBe 3 + } } |