diff options
author | Jason Zaugg <jzaugg@gmail.com> | 2012-11-07 20:08:33 +0100 |
---|---|---|
committer | Jason Zaugg <jzaugg@gmail.com> | 2012-11-09 15:44:16 +0100 |
commit | d434c20cfb8623a243cd30f187907bb4b199dc99 (patch) | |
tree | 3d127564874feb2ffe1633018b9afcc9bedc8d9b /src/main/scala/scala/async/Async.scala | |
parent | 7dbf0a0da4987e8fd5b223437d8f5316ff33616e (diff) | |
download | scala-async-d434c20cfb8623a243cd30f187907bb4b199dc99.tar.gz scala-async-d434c20cfb8623a243cd30f187907bb4b199dc99.tar.bz2 scala-async-d434c20cfb8623a243cd30f187907bb4b199dc99.zip |
Abstract over the future implementation.
- Refactor the base macro implementation to be
parameterized by a FutureSystem, which is defines
the triple of types (Future, Promise, ExecutionContext)
and the operations on those types (at the AST level)
- Cleanup generation of ASTs, in particular, use
reify more widely.
Diffstat (limited to 'src/main/scala/scala/async/Async.scala')
-rw-r--r-- | src/main/scala/scala/async/Async.scala | 213 |
1 files changed, 111 insertions, 102 deletions
diff --git a/src/main/scala/scala/async/Async.scala b/src/main/scala/scala/async/Async.scala index d4b950e..d64e04a 100644 --- a/src/main/scala/scala/async/Async.scala +++ b/src/main/scala/scala/async/Async.scala @@ -7,131 +7,140 @@ import language.experimental.macros import scala.reflect.macros.Context import scala.collection.mutable.ListBuffer -import scala.concurrent.{ Future, Promise, ExecutionContext, future } +import scala.concurrent.{Future, Promise, ExecutionContext, future} import ExecutionContext.Implicits.global import scala.util.control.NonFatal -import scala.util.continuations.{ shift, reset, cpsParam } +import scala.util.continuations.{shift, reset, cpsParam} -/* Extending `ControlThrowable`, by default, also avoids filling in the stack trace. */ -class FallbackToCpsException extends scala.util.control.ControlThrowable /* * @author Philipp Haller */ -object Async extends AsyncUtils { +object Async extends AsyncBase { + lazy val futureSystem = ScalaConcurrentFutureSystem + type FS = ScalaConcurrentFutureSystem.type - def async[T](body: T): Future[T] = macro asyncImpl[T] + def async[T](body: T) = macro asyncImpl[T] + override def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[Future[T]] = super.asyncImpl[T](c)(body) +} + +object AsyncId extends AsyncBase { + lazy val futureSystem = IdentityFutureSystem + type FS = IdentityFutureSystem.type + + def async[T](body: T) = macro asyncImpl[T] + + override def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[T] = super.asyncImpl[T](c)(body) +} + +/** + * A base class for the `async` macro. Subclasses must provide: + * + * - Concrete types for a given future system + * - Tree manipulations to create and complete the equivalent of Future and Promise + * in that system. + * - The `async` macro declaration itself, and a forwarder for the macro implementation. + * (The latter is temporarily needed to workaround a bug in the macro system) + * + * The default implementation, [[scala.async.Async]], binds the macro to `scala.concurrent._`. + */ +abstract class AsyncBase extends AsyncUtils { + self => + + type FS <: FutureSystem + val futureSystem: FS + + /** + * A call to `await` must be nested in an enclosing `async` block. + * + * A call to await does not block the thread, rather it is a delimiter + * used by the enclosing `async` macro. Code following the `await` + * call + @ @param awaitable The future from which a value is awaited + * @tparam T The type of that value + * @return The value + */ // TODO Replace with `@compileTimeOnly when this is implemented SI-6539 @deprecated("`await` must be enclosed in an `async` block", "0.1") - def await[T](awaitable: Future[T]): T = ??? - - /* Fall back for `await` when it is called at an unsupported position. - */ - def awaitCps[T, U](awaitable: Future[T], p: Promise[U]): T @cpsParam[U, Unit] = - shift { - (k: (T => U)) => - awaitable onComplete { - case tr => p.success(k(tr.get)) - } - } - - def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[Future[T]] = { + def await[T](awaitable: futureSystem.Fut[T]): T = ??? + + def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]) = { import c.universe._ import Flag._ - - val builder = new ExprBuilder[c.type](c) - val awaitMethod = awaitSym(c) - - try { - body.tree match { - case Block(stats, expr) => - val asyncBlockBuilder = new builder.AsyncBlockBuilder(stats, expr, 0, 1000, 1000, Map()) - vprintln(s"states of current method:") - asyncBlockBuilder.asyncStates foreach vprintln + val builder = new ExprBuilder[c.type, self.FS](c, self.futureSystem) - val handlerExpr = asyncBlockBuilder.mkCombinedHandlerExpr() + import builder.defn._ + import builder.futureSystemOps - vprintln(s"GENERATED handler expr:") - vprintln(handlerExpr) + val awaitMethod = awaitSym(c) - val handlerForLastState: c.Expr[PartialFunction[Int, Unit]] = { - val tree = Apply(Select(Ident("result"), newTermName("success")), - List(asyncBlockBuilder.asyncStates.last.body)) - builder.mkHandler(asyncBlockBuilder.asyncStates.last.state, c.Expr[Unit](tree)) - } + body.tree match { + case Block(stats, expr) => + val asyncBlockBuilder = new builder.AsyncBlockBuilder(stats, expr, 0, 1000, 1000, Map()) - vprintln("GENERATED handler for last state:") - vprintln(handlerForLastState) - - val localVarTrees = asyncBlockBuilder.asyncStates.init.flatMap(_.allVarDefs).toList - - /* - def resume(): Unit = { - try { - (handlerExpr.splice orElse handlerForLastState.splice)(state) - } catch { - case NonFatal(t) => result.failure(t) - } - } - */ - val nonFatalModule = c.mirror.staticModule("scala.util.control.NonFatal") - val resumeFunTree: c.Tree = DefDef(Modifiers(), newTermName("resume"), List(), List(List()), Ident(definitions.UnitClass), - Try(Apply(Select( - Apply(Select(handlerExpr.tree, newTermName("orElse")), List(handlerForLastState.tree)), - newTermName("apply")), List(Ident(newTermName("state")))), - List( - CaseDef( - Apply(Ident(nonFatalModule), List(Bind(newTermName("t"), Ident(nme.WILDCARD)))), - EmptyTree, - Block(List( - Apply(Select(Ident(newTermName("result")), newTermName("failure")), List(Ident(newTermName("t"))))), - Literal(Constant(()))))), EmptyTree)) - - reify { - val result = Promise[T]() - var state = 0 - future { - c.Expr(Block( - localVarTrees :+ resumeFunTree, - Apply(Ident(newTermName("resume")), List()))).splice - } - result.future - } + vprintln(s"states of current method:") + asyncBlockBuilder.asyncStates foreach vprintln - case _ => - // issue error message - reify { - sys.error("expression not supported by async") - } - } - } catch { - case _: FallbackToCpsException => - // replace `await` invocations with `awaitCps` invocations - val awaitReplacer = new Transformer { - val awaitCpsMethod = awaitCpsSym(c) - override def transform(tree: Tree): Tree = tree match { - case Apply(fun @ TypeApply(_, List(futArgTpt)), args) if fun.symbol == awaitMethod => - val typeApp = treeCopy.TypeApply(fun, Ident(awaitCpsMethod), List(TypeTree(futArgTpt.tpe), TypeTree(body.tree.tpe))) - treeCopy.Apply(tree, typeApp, args.map(arg => c.resetAllAttrs(arg.duplicate)) :+ Ident(newTermName("p"))) - - case _ => - super.transform(tree) - } + val handlerExpr = asyncBlockBuilder.mkCombinedHandlerExpr() + + vprintln(s"GENERATED handler expr:") + vprintln(handlerExpr) + + val handlerForLastState: c.Expr[PartialFunction[Int, Unit]] = { + val lastState = asyncBlockBuilder.asyncStates.last + val lastStateBody = c.Expr[T](lastState.body) + builder.mkHandler(lastState.state, futureSystemOps.completeProm(c.Expr[futureSystem.Prom[T]](Ident("result")), reify(scala.util.Success(lastStateBody.splice)))) } - - val newBody = awaitReplacer.transform(body.tree) - - reify { - val p = Promise[T]() - future { - reset { - c.Expr(c.resetAllAttrs(newBody.duplicate)).asInstanceOf[c.Expr[T]].splice + + vprintln("GENERATED handler for last state:") + vprintln(handlerForLastState) + + val localVarTrees = asyncBlockBuilder.asyncStates.init.flatMap(_.allVarDefs).toList + + /* + def resume(): Unit = { + try { + (handlerExpr.splice orElse handlerForLastState.splice)(state) + } catch { + case NonFatal(t) => result.failure(t) } } - p.future + */ + val nonFatalModule = c.mirror.staticModule("scala.util.control.NonFatal") + val resumeFunTree: c.Tree = DefDef(Modifiers(), newTermName("resume"), List(), List(List()), Ident(definitions.UnitClass), + Try( + reify { + val combinedHandler = mkPartialFunction_orElse(handlerExpr)(handlerForLastState).splice + combinedHandler.apply(c.Expr[Int](Ident(newTermName("state"))).splice) + }.tree + , + List( + CaseDef( + Apply(Ident(nonFatalModule), List(Bind(newTermName("t"), Ident(nme.WILDCARD)))), + EmptyTree, + Block(List({ + val t = c.Expr[Throwable](Ident(newTermName("t"))) + futureSystemOps.completeProm[T](c.Expr[futureSystem.Prom[T]](Ident(newTermName("result"))), reify(scala.util.Failure(t.splice))).tree + }), c.literalUnit.tree))), EmptyTree)) + + val prom: Expr[futureSystem.Prom[T]] = reify { + val result = futureSystemOps.createProm[T].splice + var state = 0 + futureSystemOps.future[Unit] { + c.Expr[Unit](Block( + localVarTrees :+ resumeFunTree, + Apply(Ident(newTermName("resume")), List()))) + }(futureSystemOps.execContext).splice + result } + val result = futureSystemOps.promiseToFuture(prom) + // println(s"${c.macroApplication} \nexpands to:\n ${result.tree}") + result + + case tree => + c.abort(c.macroApplication.pos, s"expression not supported by async: ${tree}") } } } |