aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md327
-rw-r--r--src/main/scala/scala/async/Async.scala3
-rw-r--r--src/main/scala/scala/async/AsyncAnalysis.scala32
-rw-r--r--src/main/scala/scala/async/TransformUtils.scala56
-rw-r--r--src/test/scala/scala/async/TreeInterrogation.scala20
-rw-r--r--src/test/scala/scala/async/neg/NakedAwait.scala22
-rw-r--r--src/test/scala/scala/async/run/toughtype/ToughType.scala31
7 files changed, 452 insertions, 39 deletions
diff --git a/README.md b/README.md
index d2f6b5d..2a18af5 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,156 @@
-Scala Async Project
-===================
+# Scala Async Project
-Building
---------
+## Quickstart
+
+ - Add `scala-async.jar` to your classpath
+ - Use Scala 2.10.0
+
+```scala
+import ExecutionContext.Implicits.global
+import scala.async.Async.{async, await}
+
+val future = async {
+ val f1 = async { ...; true }
+ val f2 = async { ...; 42 }
+ if (await(f1)) await(f2) else 0
+}
+```
+
+## What is `async`?
+
+`async` marks a block of asynchronous code. Such a block usually contains
+one or more `await` calls, which marks a point at which the computation
+will be suspended until the awaited `Future` is complete.
+
+By default, `async` blocks operate on `scala.concurrent.{Future, Promise}`.
+The system can be adapted to alternative implementations of the
+`Future` pattern.
+
+Consider the following example:
+
+```scala
+def slowCalcFuture: Future[Int] = ... // 01
+def combined: Future[Int] = async { // 02
+ await(slowCalcFuture) + await(slowCalcFuture) // 03
+}
+val x: Int = Await.result(combined, 10.seconds) // 05
+```
+
+Lines 1 defines an asynchronous method: it returns a `Future`.
+
+Line 3 begins an `async` block. During compilation,
+the contents of this block will be analyzed to identify
+the `await` calls, and transformed into non-blocking
+code.
+
+Control flow will immediately pass to line 5, as the
+computation in the `async` block is not executed
+on the caller's thread.
+
+Line 4 begins by triggering `slowCalcFuture`, and then
+suspending until it has been calculating. Only after it
+has finished, we trigger it again, and suspend again.
+Finally, we add the results and complete `combined`, which
+in turn will release line 5 (unless it had already timed out).
+
+It is important to note that while we this code is non-blocking,
+it is not parallel. If we wanted to parallelize the two computations,
+we could rearrange the code as follows.
+
+```scala
+def combined: Future[Int] = async {
+ val future1 = slowCalcFuture
+ val future2 = slowCalcFuture
+ await(future1) + await(future2)
+}
+```
+
+## Comparison with direct use of `Future` API
+
+This computation could also be expressed by directly using the
+higher-order functions of Futures:
+
+```scala
+def slowCalcFuture: Future[Int] = ...
+val future1 = slowCalcFuture
+val future2 = slowCalcFuture
+def combined: Future[Int] = for {
+ r1 <- future1
+ r2 <- future2
+} yield r1 + r2
+```
+
+The `async` approach has two advantages over the use of
+`map` and `flatMap`.
+ 1. The code more directly reflects the programmers intent,
+ and does not require us to name the results `r1` and `r2`.
+ This advantage is even more pronounces when we mix control
+ structures in `async` blocks.
+ 2. `async` blocks are compiled to a single anonymous class,
+ as opposed to a seperate anonymous class for each closure
+ required at each generator (`<-`) in the for-comprehension.
+ This reduces the size of generated code, and can avoid boxing
+ of intermediate results.
+
+## Comparison with CPS plugin
+
+The existing continations (CPS) plugin for Scala can also be used
+to provide syntactic layer like `async`. This approach has been
+used in Akka's [Dataflow Concurrency](http://doc.akka.io/docs/akka/snapshot/scala/dataflow.html)
+
+CPS based rewriting of asynchrous code also produces an closure
+for each suspension. It can also lead to type errors that are
+difficult to understand.
+
+## How it works
+
+ - The async macro analyses the block of code, looking for control
+ structures and locations of await calls. It then breaks the code
+ into 'chunks'. Each chunk contains a linear sequence of statements
+ that concludes with branching decision, or with the registration
+ of subsequent state handler as a continuation.
+ - Before this analysis and transformation, the program is normalized
+ into a form amenable to this manipulation. This is called the
+ "A Normal Form" (ANF), and roughly means that:
+ - `if` and `match` constructs are only used as statements;
+ they cannot be used as an expression.
+ - calls to await are not allowed in compound expressions.
+ - Identify vals, vars and defs that are accessed from multiple
+ states. These will be lifted out to fields in the state machine
+ object.
+ - Synthesize a class that holds:
+ - an integer representing the current state ID
+ - the lifted definitions
+ - an `apply(value: Try[Any]): Unit` method that will be
+ called on completion of each future. This behaviour of
+ this method is determined by the current state. It records
+ the downcast result of the future in a field, and calls the
+ `resume()` method.
+ - the `resume(): Unit` method that switches on the current state
+ and runs the users code for one 'chunk', and either:
+ a) registers the state machine as the handler for the next future
+ b) completes the result Promise of the async block, if at the terminal state.
+ - an `apply(): Unit` method that starts the computation.
+
+## Troubleshooting
+ - Logging of the transform can be enabled with `scalac -Dscala.async.debug=true`.
+ - Tracing of the ANF transform: `scalac -Dscala.async.trace=true`
+ - Debug the macro expansion by checking out the project and executing the application
+ [`TreeInterrogation`](https://github.com/phaller/scala-async/blob/master/src/test/scala/scala/async/TreeInterrogation.scala#L59)
+
+## Limitations
+ - See the [neg](https://github.com/phaller/scala-async/tree/master/src/test/scala/scala/async/neg) test cases for
+ for constructs that are not allowed in a async block
+ - See the [issue list](https://github.com/phaller/scala-async/issues?state=open) for which of these restrictions are planned
+ to be dropped in the next milestone.
+ - See [#13](https://github.com/phaller/scala-async/issues/13) for why `await` is not possible in closures, and for suggestions on
+ ways to structure the code to work around this limitation.
+
+## Building
The async macro and its test suite can be built and run with SBT.
-Contributing
-------------
+## Contributing
If you are interested in contributing code, we ask you to complete and submit
to us the Scala Contributor License Agreement, which allows us to ensure that
@@ -17,3 +160,175 @@ http://www.scala-lang.org/sites/default/files/contributor_agreement.pdf
Before submitting a pull-request, please make sure you have followed the guidelines
outlined in our [Pull Request Policy](https://github.com/scala/scala/wiki/Pull-Request-Policy).
+
+### Generated Code examples
+
+```scala
+val future = async {
+ val f1 = async { true }
+ val x = 1
+ def inc(t: Int) = t + x
+ val t = 0
+ val f2 = async { 42 }
+ if (await(f1)) await(f2) else { val z = 1; inc(t + z) }
+}
+```
+
+After ANF transform.
+
+ - await calls are moved to only appear on the LHS of an value definition.
+ - `if` is not used as an expression, instead each branch writes its result
+ to a synthetic `var`.
+
+```scala
+ {
+ ();
+ val f1: scala.concurrent.Future[Boolean] = {
+ scala.concurrent.Future.apply[Boolean](true)(scala.concurrent.ExecutionContext.Implicits.global)
+ };
+ val x: Int = 1;
+ def inc(t: Int): Int = t.+(x);
+ val t: Int = 0;
+ val f2: scala.concurrent.Future[Int] = {
+ scala.concurrent.Future.apply[Int](42)(scala.concurrent.ExecutionContext.Implicits.global)
+ };
+ val await$1: Boolean = scala.async.Async.await[Boolean](f1);
+ var ifres$1: Int = 0;
+ if (await$1)
+ {
+ val await$2: Int = scala.async.Async.await[Int](f2);
+ ifres$1 = await$2
+ }
+ else
+ {
+ ifres$1 = {
+ val z: Int = 1;
+ inc(t.+(z))
+ }
+ };
+ ifres$1
+}
+```
+
+After async transform:
+
+ - one class synthesized to act as the state machine. It's `apply()` method will
+ be used to start the computation (even the code before the first await call
+ is executed asynchronously), and the the `apply(tr: scala.util.Try[Any])` method
+ will continue after each completed background task.
+ - each chunk of code moved into the a branch of the pattern match in `resume$async`.
+ - value and method definitions accessed from multiple states are lifted to be
+ members of `class stateMachine`. Others remain local, e.g. `val z`.
+
+```scala
+ {
+ class stateMachine$7 extends StateMachine[scala.concurrent.Promise[Int], scala.concurrent.ExecutionContext] {
+ def <init>() = {
+ super.<init>();
+ ()
+ };
+ var state$async: Int = 0;
+ val result$async: scala.concurrent.Promise[Int] = scala.concurrent.Promise.apply[Int]();
+ val execContext$async = scala.concurrent.ExecutionContext.Implicits.global;
+ var x$1: Int = 0;
+ def inc$1(t: Int): Int = t.$plus(x$1);
+ var t$1: Int = 0;
+ var f2$1: scala.concurrent.Future[Int] = null;
+ var await$1: Boolean = false;
+ var ifres$1: Int = 0;
+ var await$2: Int = 0;
+ def resume$async(): Unit = try {
+ state$async match {
+ case 0 => {
+ ();
+ val f1 = {
+ scala.concurrent.Future.apply[Boolean](true)(scala.concurrent.ExecutionContext.Implicits.global)
+ };
+ x$1 = 1;
+ t$1 = 0;
+ f2$1 = {
+ scala.concurrent.Future.apply[Int](42)(scala.concurrent.ExecutionContext.Implicits.global)
+ };
+ f1.onComplete(this)(execContext$async)
+ }
+ case 1 => {
+ ifres$1 = 0;
+ if (await$1)
+ {
+ state$async = 2;
+ resume$async()
+ }
+ else
+ {
+ state$async = 3;
+ resume$async()
+ }
+ }
+ case 2 => {
+ f2$1.onComplete(this)(execContext$async);
+ ()
+ }
+ case 5 => {
+ ifres$1 = await$2;
+ state$async = 4;
+ resume$async()
+ }
+ case 3 => {
+ ifres$1 = {
+ val z = 1;
+ inc$1(t$1.$plus(z))
+ };
+ state$async = 4;
+ resume$async()
+ }
+ case 4 => {
+ result$async.complete(scala.util.Success.apply(ifres$1));
+ ()
+ }
+ }
+ } catch {
+ case NonFatal((tr @ _)) => {
+ {
+ result$async.complete(scala.util.Failure.apply(tr));
+ ()
+ };
+ ()
+ }
+ };
+ def apply(tr: scala.util.Try[Any]): Unit = state$async match {
+ case 0 => {
+ if (tr.isFailure)
+ {
+ result$async.complete(tr.asInstanceOf[scala.util.Try[Int]]);
+ ()
+ }
+ else
+ {
+ await$1 = tr.get.asInstanceOf[Boolean];
+ state$async = 1;
+ resume$async()
+ };
+ ()
+ }
+ case 2 => {
+ if (tr.isFailure)
+ {
+ result$async.complete(tr.asInstanceOf[scala.util.Try[Int]]);
+ ()
+ }
+ else
+ {
+ await$2 = tr.get.asInstanceOf[Int];
+ state$async = 5;
+ resume$async()
+ };
+ ()
+ }
+ };
+ def apply: Unit = resume$async()
+ };
+ val stateMachine$7: StateMachine[scala.concurrent.Promise[Int], scala.concurrent.ExecutionContext] = new stateMachine$7();
+ scala.concurrent.Future.apply(stateMachine$7.apply())(scala.concurrent.ExecutionContext.Implicits.global);
+ stateMachine$7.result$async.future
+}
+```
diff --git a/src/main/scala/scala/async/Async.scala b/src/main/scala/scala/async/Async.scala
index c2969a9..12fe428 100644
--- a/src/main/scala/scala/async/Async.scala
+++ b/src/main/scala/scala/async/Async.scala
@@ -78,7 +78,8 @@ abstract class AsyncBase {
// - if/match only used in statement position.
val anfTree: Block = {
val anf = AnfTransform[c.type](c)
- val stats1 :+ expr1 = anf(body.tree)
+ val restored = utils.restorePatternMatchingFunctions(body.tree)
+ val stats1 :+ expr1 = anf(restored)
val block = Block(stats1, expr1)
c.typeCheck(block).asInstanceOf[Block]
}
diff --git a/src/main/scala/scala/async/AsyncAnalysis.scala b/src/main/scala/scala/async/AsyncAnalysis.scala
index bda4d5c..9184960 100644
--- a/src/main/scala/scala/async/AsyncAnalysis.scala
+++ b/src/main/scala/scala/async/AsyncAnalysis.scala
@@ -71,15 +71,21 @@ private[async] final case class AsyncAnalysis[C <: Context](c: C, asyncBase: Asy
reportUnsupportedAwait(function, "nested function")
}
+ override def patMatFunction(tree: Match) {
+ reportUnsupportedAwait(tree, "nested function")
+ }
+
override def traverse(tree: Tree) {
def containsAwait = tree exists isAwait
tree match {
- case Try(_, _, _) if containsAwait =>
+ case Try(_, _, _) if containsAwait =>
reportUnsupportedAwait(tree, "try/catch")
super.traverse(tree)
- case Return(_) =>
+ case Return(_) =>
c.abort(tree.pos, "return is illegal within a async block")
- case _ =>
+ case ValDef(mods, _, _, _) if mods.hasFlag(Flag.LAZY) =>
+ c.abort(tree.pos, "lazy vals are illegal within an async block")
+ case _ =>
super.traverse(tree)
}
}
@@ -117,19 +123,19 @@ private[async] final case class AsyncAnalysis[C <: Context](c: C, asyncBase: Asy
override def nestedMethod(defDef: DefDef) {
nestedMethodsToLift += defDef
- defDef.rhs foreach {
- case rt: RefTree =>
- valDefChunkId.get(rt.symbol) match {
- case Some((vd, defChunkId)) =>
- valDefsToLift += vd // lift all vals referred to by nested methods.
- case _ =>
- }
- case _ =>
- }
+ markReferencedVals(defDef)
}
override def function(function: Function) {
- function foreach {
+ markReferencedVals(function)
+ }
+
+ override def patMatFunction(tree: Match) {
+ markReferencedVals(tree)
+ }
+
+ private def markReferencedVals(tree: Tree) {
+ tree foreach {
case rt: RefTree =>
valDefChunkId.get(rt.symbol) match {
case Some((vd, defChunkId)) =>
diff --git a/src/main/scala/scala/async/TransformUtils.scala b/src/main/scala/scala/async/TransformUtils.scala
index faebcc3..db82ed6 100644
--- a/src/main/scala/scala/async/TransformUtils.scala
+++ b/src/main/scala/scala/async/TransformUtils.scala
@@ -25,12 +25,14 @@ private[async] final case class TransformUtils[C <: Context](c: C) {
val stateMachine = newTermName(fresh("stateMachine"))
val stateMachineT = stateMachine.toTypeName
val apply = newTermName("apply")
+ val applyOrElse = newTermName("applyOrElse")
val tr = newTermName("tr")
val matchRes = "matchres"
val ifRes = "ifres"
val await = "await"
val bindSuffix = "$bind"
- def arg(i: Int) = "arg" + i
+
+ def arg(i: Int) = "arg" + i
def fresh(name: TermName): TermName = newTermName(fresh(name.toString))
@@ -64,7 +66,7 @@ private[async] final case class TransformUtils[C <: Context](c: C) {
/** Descends into the regions of the tree that are subject to the
* translation to a state machine by `async`. When a nested template,
- * function, or by-name argument is encountered, the descend stops,
+ * function, or by-name argument is encountered, the descent stops,
* and `nestedClass` etc are invoked.
*/
trait AsyncTraverser extends Traverser {
@@ -83,20 +85,24 @@ private[async] final case class TransformUtils[C <: Context](c: C) {
def function(function: Function) {
}
+ def patMatFunction(tree: Match) {
+ }
+
override def traverse(tree: Tree) {
tree match {
- case cd: ClassDef => nestedClass(cd)
- case md: ModuleDef => nestedModule(md)
- case dd: DefDef => nestedMethod(dd)
- case fun: Function => function(fun)
- case Apply(fun, args) =>
+ case cd: ClassDef => nestedClass(cd)
+ case md: ModuleDef => nestedModule(md)
+ case dd: DefDef => nestedMethod(dd)
+ case fun: Function => function(fun)
+ case m@Match(EmptyTree, _) => patMatFunction(m) // Pattern matching anonymous function under -Xoldpatmat of after `restorePatternMatchingFunctions`
+ case Apply(fun, args) =>
val isInByName = isByName(fun)
for ((arg, index) <- args.zipWithIndex) {
if (!isInByName(index)) traverse(arg)
else byNameArgument(arg)
}
traverse(fun)
- case _ => super.traverse(tree)
+ case _ => super.traverse(tree)
}
}
}
@@ -248,6 +254,40 @@ private[async] final case class TransformUtils[C <: Context](c: C) {
}
}
+ /**
+ * Replaces expressions of the form `{ new $anon extends PartialFunction[A, B] { ... ; def applyOrElse[..](...) = ... match <cases> }`
+ * with `Match(EmptyTree, cases`.
+ *
+ * This reverses the transformation performed in `Typers`, and works around non-idempotency of typechecking such trees.
+ */
+ // TODO Reference JIRA issue.
+ final def restorePatternMatchingFunctions(tree: Tree) =
+ RestorePatternMatchingFunctions transform tree
+
+ private object RestorePatternMatchingFunctions extends Transformer {
+
+ import language.existentials
+
+ override def transform(tree: Tree): Tree = {
+ val SYNTHETIC = (1 << 21).toLong.asInstanceOf[FlagSet]
+ def isSynthetic(cd: ClassDef) = cd.mods hasFlag SYNTHETIC
+
+ tree match {
+ case Block(
+ (cd@ClassDef(_, _, _, Template(_, _, body))) :: Nil,
+ Apply(Select(New(a), nme.CONSTRUCTOR), Nil)) if isSynthetic(cd) =>
+ val restored = (body collectFirst {
+ case DefDef(_, /*name.apply | */ name.applyOrElse, _, _, _, Match(_, cases)) =>
+ val transformedCases = super.transformStats(cases, currentOwner).asInstanceOf[List[CaseDef]]
+ Match(EmptyTree, transformedCases)
+ }).getOrElse(c.abort(tree.pos, s"Internal Error: Unable to find original pattern matching cases in: $body"))
+ restored
+ case t => super.transform(t)
+ }
+ }
+ }
+
+
def isSafeToInline(tree: Tree) = {
val symtab = c.universe.asInstanceOf[scala.reflect.internal.SymbolTable]
object treeInfo extends {
diff --git a/src/test/scala/scala/async/TreeInterrogation.scala b/src/test/scala/scala/async/TreeInterrogation.scala
index b22faa9..93cfdf5 100644
--- a/src/test/scala/scala/async/TreeInterrogation.scala
+++ b/src/test/scala/scala/async/TreeInterrogation.scala
@@ -62,23 +62,21 @@ object TreeInterrogation extends App {
val levels = Seq("trace", "debug")
def setAll(value: Boolean) = levels.foreach(set(_, value))
- setAll(true)
- try t finally setAll(false)
+ setAll(value = true)
+ try t finally setAll(value = false)
}
withDebug {
val cm = reflect.runtime.currentMirror
- val tb = mkToolbox("-cp target/scala-2.10/classes -Xprint:all")
+ val tb = mkToolbox("-cp target/scala-2.10/classes -Xprint:flatten")
val tree = tb.parse(
""" import scala.async.AsyncId.{async, await}
- | def foo(a: Int, b: Int) = (a, b)
- | val result = async {
- | var i = 0
- | def next() = {
- | i += 1;
- | i
- | }
- | foo(next(), await(next()))
+ | async {
+ | await(1)
+ | val neg1 = -1
+ | val a = await(1)
+ | val f = { case x => ({case x => neg1 * x}: PartialFunction[Int, Int])(x + a) }: PartialFunction[Int, Int]
+ | await(f(2))
| }
| ()
| """.stripMargin)
diff --git a/src/test/scala/scala/async/neg/NakedAwait.scala b/src/test/scala/scala/async/neg/NakedAwait.scala
index ecc84f9..c3537ec 100644
--- a/src/test/scala/scala/async/neg/NakedAwait.scala
+++ b/src/test/scala/scala/async/neg/NakedAwait.scala
@@ -93,6 +93,16 @@ class NakedAwait {
}
@Test
+ def nestedPatMatFunction() {
+ expectError("await must not be used under a nested class.") { // TODO more specific error message
+ """
+ | import _root_.scala.async.AsyncId._
+ | async { { case x => { await(false) } } : PartialFunction[Any, Any] }
+ """.stripMargin
+ }
+ }
+
+ @Test
def tryBody() {
expectError("await must not be used under a try/catch.") {
"""
@@ -143,4 +153,16 @@ class NakedAwait {
|""".stripMargin
}
}
+
+ @Test
+ def lazyValIllegal() {
+ expectError("lazy vals are illegal") {
+ """
+ | import _root_.scala.async.AsyncId._
+ | def foo(): Any = async { val x = { lazy val y = 0; y } }
+ | ()
+ |
+ |""".stripMargin
+ }
+ }
}
diff --git a/src/test/scala/scala/async/run/toughtype/ToughType.scala b/src/test/scala/scala/async/run/toughtype/ToughType.scala
index 9cfc1ca..83f5a2d 100644
--- a/src/test/scala/scala/async/run/toughtype/ToughType.scala
+++ b/src/test/scala/scala/async/run/toughtype/ToughType.scala
@@ -36,4 +36,35 @@ class ToughTypeSpec {
val res: (List[_], scala.async.run.toughtype.ToughTypeObject.Inner) = Await.result(fut, 2 seconds)
res._1 mustBe (Nil)
}
+
+ @Test def patternMatchingPartialFunction() {
+ import AsyncId.{await, async}
+ async {
+ await(1)
+ val a = await(1)
+ val f = { case x => x + a }: PartialFunction[Int, Int]
+ await(f(2))
+ } mustBe 3
+ }
+
+ @Test def patternMatchingPartialFunctionNested() {
+ import AsyncId.{await, async}
+ async {
+ await(1)
+ val neg1 = -1
+ val a = await(1)
+ val f = { case x => ({case x => neg1 * x}: PartialFunction[Int, Int])(x + a) }: PartialFunction[Int, Int]
+ await(f(2))
+ } mustBe -3
+ }
+
+ @Test def patternMatchingFunction() {
+ import AsyncId.{await, async}
+ async {
+ await(1)
+ val a = await(1)
+ val f = { case x => x + a }: Function[Int, Int]
+ await(f(2))
+ } mustBe 3
+ }
}