From 295904a7aa53a5cfce96c2003d3f15eb36ba40c9 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 25 Jun 2013 02:54:40 +0200 Subject: Add BlockingAsync and BlockingFutureSystem --- src/main/scala/scala/async/Async.scala | 2 +- src/main/scala/scala/async/BlockingAsync.scala | 21 ++++++++ src/main/scala/scala/async/ExprBuilder.scala | 6 +-- src/main/scala/scala/async/FutureSystem.scala | 61 ++++++++++++++++++++-- .../async/run/sequential/SequentialSpec.scala | 48 +++++++++++++++++ 5 files changed, 131 insertions(+), 7 deletions(-) create mode 100644 src/main/scala/scala/async/BlockingAsync.scala create mode 100644 src/test/scala/scala/async/run/sequential/SequentialSpec.scala diff --git a/src/main/scala/scala/async/Async.scala b/src/main/scala/scala/async/Async.scala index 2efebaf..c162edc 100644 --- a/src/main/scala/scala/async/Async.scala +++ b/src/main/scala/scala/async/Async.scala @@ -92,7 +92,7 @@ abstract class AsyncBase { }.toMap } - val builder = ExprBuilder[c.type, futureSystem.type](c, self.futureSystem, anfTree) + val builder = ExprBuilder[c.type, futureSystem.type](c, self.futureSystem, anfTree, utils) import builder.futureSystemOps val asyncBlock: builder.AsyncBlock = builder.build(anfTree, renameMap) import asyncBlock.asyncStates diff --git a/src/main/scala/scala/async/BlockingAsync.scala b/src/main/scala/scala/async/BlockingAsync.scala new file mode 100644 index 0000000..3deeb37 --- /dev/null +++ b/src/main/scala/scala/async/BlockingAsync.scala @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2012 Typesafe Inc. + */ + +package scala.async + +import scala.language.experimental.macros +import scala.reflect.macros.Context +import scala.reflect.internal.annotations.compileTimeOnly + +object BlockingAsync extends AsyncBase { + + import scala.concurrent.Future + + lazy val futureSystem = BlockingFutureSystem + type FS = BlockingFutureSystem.type + + def async[T](body: T) = macro asyncImpl[T] + + override def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[Future[T]] = super.asyncImpl[T](c)(body) +} diff --git a/src/main/scala/scala/async/ExprBuilder.scala b/src/main/scala/scala/async/ExprBuilder.scala index ca46a83..d10b15d 100644 --- a/src/main/scala/scala/async/ExprBuilder.scala +++ b/src/main/scala/scala/async/ExprBuilder.scala @@ -8,10 +8,10 @@ import scala.collection.mutable.ListBuffer import collection.mutable import language.existentials -private[async] final case class ExprBuilder[C <: Context, FS <: FutureSystem](c: C, futureSystem: FS, origTree: C#Tree) { +private[async] final case class ExprBuilder[C <: Context, FS <: FutureSystem](c: C, futureSystem: FS, origTree: C#Tree, preUtils: TransformUtils[C]) { builder => - val utils = TransformUtils[c.type](c) + val utils = preUtils.asInstanceOf[TransformUtils[c.type]] import c.universe._ import utils._ @@ -68,7 +68,7 @@ private[async] final case class ExprBuilder[C <: Context, FS <: FutureSystem](c: override def mkHandlerCaseForState: CaseDef = { val callOnComplete = futureSystemOps.onComplete(c.Expr(awaitable.expr), - c.Expr(This(tpnme.EMPTY)), c.Expr(Ident(name.execContext))).tree + c.Expr(This(tpnme.EMPTY)), c.Expr(Ident(name.execContext)), c.Expr(This(name.stateMachineT))).tree mkHandlerCase(state, stats :+ callOnComplete) } diff --git a/src/main/scala/scala/async/FutureSystem.scala b/src/main/scala/scala/async/FutureSystem.scala index a050bec..250d6ed 100644 --- a/src/main/scala/scala/async/FutureSystem.scala +++ b/src/main/scala/scala/async/FutureSystem.scala @@ -47,7 +47,7 @@ trait FutureSystem { /** Register an call back to run on completion of the given future */ def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U], - execContext: Expr[ExecContext]): Expr[Unit] + execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] /** Complete a promise with a value */ def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit] @@ -95,7 +95,7 @@ object ScalaConcurrentFutureSystem extends FutureSystem { } def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U], - execContext: Expr[ExecContext]): Expr[Unit] = reify { + execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] = reify { future.splice.onComplete(fun.splice)(execContext.splice) } @@ -110,6 +110,61 @@ object ScalaConcurrentFutureSystem extends FutureSystem { } } +/** This future system implements `await` using blocking. Note that this + * future system should only be used for the purpose of debugging. + */ +object BlockingFutureSystem extends FutureSystem { + + import scala.concurrent._ + import scala.concurrent.duration._ + + type Prom[A] = Promise[A] + type Fut[A] = Future[A] + type ExecContext = ExecutionContext + + def mkOps(c: Context): Ops {val context: c.type} = new Ops { + val context: c.type = c + + import context.universe._ + + def execContext: Expr[ExecContext] = c.Expr(c.inferImplicitValue(c.weakTypeOf[ExecutionContext]) match { + case EmptyTree => c.abort(c.macroApplication.pos, "Unable to resolve implicit ExecutionContext") + case context => context + }) + + def promType[A: WeakTypeTag]: Type = c.weakTypeOf[Promise[A]] + def execContextType: Type = c.weakTypeOf[ExecutionContext] + + def createProm[A: WeakTypeTag]: Expr[Prom[A]] = reify { + Promise[A]() + } + + def promiseToFuture[A: WeakTypeTag](prom: Expr[Prom[A]]) = reify { + prom.splice.future + } + + def future[A: WeakTypeTag](a: Expr[A])(execContext: Expr[ExecContext]) = reify { + Future(a.splice)(execContext.splice) + } + + def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U], + execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] = reify { + Await.ready(future.splice, Duration.Inf) + val tr = future.splice.value.get + stateMachine.splice.task$async = Some(() => fun.splice(tr)) + } + + def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit] = reify { + prom.splice.complete(value.splice) + context.literalUnit.splice + } + + def castTo[A: WeakTypeTag](future: Expr[Fut[Any]]): Expr[Fut[A]] = reify { + future.splice.asInstanceOf[Fut[A]] + } + } +} + /** * A trivial implementation of [[scala.async.FutureSystem]] that performs computations * on the current thread. Useful for testing. @@ -142,7 +197,7 @@ object IdentityFutureSystem extends FutureSystem { def future[A: WeakTypeTag](t: Expr[A])(execContext: Expr[ExecContext]) = t def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U], - execContext: Expr[ExecContext]): Expr[Unit] = reify { + execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] = reify { fun.splice.apply(util.Success(future.splice)) context.literalUnit.splice } diff --git a/src/test/scala/scala/async/run/sequential/SequentialSpec.scala b/src/test/scala/scala/async/run/sequential/SequentialSpec.scala new file mode 100644 index 0000000..eec4d1e --- /dev/null +++ b/src/test/scala/scala/async/run/sequential/SequentialSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2012 Typesafe Inc. + */ + +package scala.async +package run +package sequential + +import scala.concurrent.{Future, Promise, ExecutionContext, future, Await} +import scala.concurrent.duration._ +import scala.async.BlockingAsync._ + +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.junit.Test + +@RunWith(classOf[JUnit4]) +class SequentialSpec { + + import ExecutionContext.Implicits.global + + def m1(y: Int): Future[Int] = async { + val f = future { y + 2 } + val f2 = future { y + 3 } + val x1 = await(f) + val x2 = await(f2) + x1 + x2 + } + + def m2(y: Int): Future[Int] = async { + val f = future { y + 2 } + val res = await(f) + if (y > 0) res + 2 + else res - 2 + } + + @Test + def testSequentialExecution() { + val fut1 = m1(10) + val res1 = Await.result(fut1, 2.seconds) + assert(res1 == 25, s"expected 25, got $res1") + + val fut2 = m2(10) + val res2 = Await.result(fut2, 2.seconds) + assert(res2 == 14, s"expected 14, got $res2") + } + +} -- cgit v1.2.3