aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2013-06-25 02:54:40 +0200
committerPhilipp Haller <hallerp@gmail.com>2013-06-25 02:55:26 +0200
commit295904a7aa53a5cfce96c2003d3f15eb36ba40c9 (patch)
tree444fcde60a1c388b2b41bb5a1dff8212602d3d46
parent6f6851c68659eae0d5d04ac9713413a3e592bd90 (diff)
downloadscala-async-topic/seq-debugging.tar.gz
scala-async-topic/seq-debugging.tar.bz2
scala-async-topic/seq-debugging.zip
Add BlockingAsync and BlockingFutureSystemtopic/seq-debugging
-rw-r--r--src/main/scala/scala/async/Async.scala2
-rw-r--r--src/main/scala/scala/async/BlockingAsync.scala21
-rw-r--r--src/main/scala/scala/async/ExprBuilder.scala6
-rw-r--r--src/main/scala/scala/async/FutureSystem.scala61
-rw-r--r--src/test/scala/scala/async/run/sequential/SequentialSpec.scala48
5 files changed, 131 insertions, 7 deletions
diff --git a/src/main/scala/scala/async/Async.scala b/src/main/scala/scala/async/Async.scala
index 2efebaf..c162edc 100644
--- a/src/main/scala/scala/async/Async.scala
+++ b/src/main/scala/scala/async/Async.scala
@@ -92,7 +92,7 @@ abstract class AsyncBase {
}.toMap
}
- val builder = ExprBuilder[c.type, futureSystem.type](c, self.futureSystem, anfTree)
+ val builder = ExprBuilder[c.type, futureSystem.type](c, self.futureSystem, anfTree, utils)
import builder.futureSystemOps
val asyncBlock: builder.AsyncBlock = builder.build(anfTree, renameMap)
import asyncBlock.asyncStates
diff --git a/src/main/scala/scala/async/BlockingAsync.scala b/src/main/scala/scala/async/BlockingAsync.scala
new file mode 100644
index 0000000..3deeb37
--- /dev/null
+++ b/src/main/scala/scala/async/BlockingAsync.scala
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2012 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.async
+
+import scala.language.experimental.macros
+import scala.reflect.macros.Context
+import scala.reflect.internal.annotations.compileTimeOnly
+
+object BlockingAsync extends AsyncBase {
+
+ import scala.concurrent.Future
+
+ lazy val futureSystem = BlockingFutureSystem
+ type FS = BlockingFutureSystem.type
+
+ def async[T](body: T) = macro asyncImpl[T]
+
+ override def asyncImpl[T: c.WeakTypeTag](c: Context)(body: c.Expr[T]): c.Expr[Future[T]] = super.asyncImpl[T](c)(body)
+}
diff --git a/src/main/scala/scala/async/ExprBuilder.scala b/src/main/scala/scala/async/ExprBuilder.scala
index ca46a83..d10b15d 100644
--- a/src/main/scala/scala/async/ExprBuilder.scala
+++ b/src/main/scala/scala/async/ExprBuilder.scala
@@ -8,10 +8,10 @@ import scala.collection.mutable.ListBuffer
import collection.mutable
import language.existentials
-private[async] final case class ExprBuilder[C <: Context, FS <: FutureSystem](c: C, futureSystem: FS, origTree: C#Tree) {
+private[async] final case class ExprBuilder[C <: Context, FS <: FutureSystem](c: C, futureSystem: FS, origTree: C#Tree, preUtils: TransformUtils[C]) {
builder =>
- val utils = TransformUtils[c.type](c)
+ val utils = preUtils.asInstanceOf[TransformUtils[c.type]]
import c.universe._
import utils._
@@ -68,7 +68,7 @@ private[async] final case class ExprBuilder[C <: Context, FS <: FutureSystem](c:
override def mkHandlerCaseForState: CaseDef = {
val callOnComplete = futureSystemOps.onComplete(c.Expr(awaitable.expr),
- c.Expr(This(tpnme.EMPTY)), c.Expr(Ident(name.execContext))).tree
+ c.Expr(This(tpnme.EMPTY)), c.Expr(Ident(name.execContext)), c.Expr(This(name.stateMachineT))).tree
mkHandlerCase(state, stats :+ callOnComplete)
}
diff --git a/src/main/scala/scala/async/FutureSystem.scala b/src/main/scala/scala/async/FutureSystem.scala
index a050bec..250d6ed 100644
--- a/src/main/scala/scala/async/FutureSystem.scala
+++ b/src/main/scala/scala/async/FutureSystem.scala
@@ -47,7 +47,7 @@ trait FutureSystem {
/** Register an call back to run on completion of the given future */
def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U],
- execContext: Expr[ExecContext]): Expr[Unit]
+ execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit]
/** Complete a promise with a value */
def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit]
@@ -95,7 +95,7 @@ object ScalaConcurrentFutureSystem extends FutureSystem {
}
def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U],
- execContext: Expr[ExecContext]): Expr[Unit] = reify {
+ execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] = reify {
future.splice.onComplete(fun.splice)(execContext.splice)
}
@@ -110,6 +110,61 @@ object ScalaConcurrentFutureSystem extends FutureSystem {
}
}
+/** This future system implements `await` using blocking. Note that this
+ * future system should only be used for the purpose of debugging.
+ */
+object BlockingFutureSystem extends FutureSystem {
+
+ import scala.concurrent._
+ import scala.concurrent.duration._
+
+ type Prom[A] = Promise[A]
+ type Fut[A] = Future[A]
+ type ExecContext = ExecutionContext
+
+ def mkOps(c: Context): Ops {val context: c.type} = new Ops {
+ val context: c.type = c
+
+ import context.universe._
+
+ def execContext: Expr[ExecContext] = c.Expr(c.inferImplicitValue(c.weakTypeOf[ExecutionContext]) match {
+ case EmptyTree => c.abort(c.macroApplication.pos, "Unable to resolve implicit ExecutionContext")
+ case context => context
+ })
+
+ def promType[A: WeakTypeTag]: Type = c.weakTypeOf[Promise[A]]
+ def execContextType: Type = c.weakTypeOf[ExecutionContext]
+
+ def createProm[A: WeakTypeTag]: Expr[Prom[A]] = reify {
+ Promise[A]()
+ }
+
+ def promiseToFuture[A: WeakTypeTag](prom: Expr[Prom[A]]) = reify {
+ prom.splice.future
+ }
+
+ def future[A: WeakTypeTag](a: Expr[A])(execContext: Expr[ExecContext]) = reify {
+ Future(a.splice)(execContext.splice)
+ }
+
+ def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U],
+ execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] = reify {
+ Await.ready(future.splice, Duration.Inf)
+ val tr = future.splice.value.get
+ stateMachine.splice.task$async = Some(() => fun.splice(tr))
+ }
+
+ def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit] = reify {
+ prom.splice.complete(value.splice)
+ context.literalUnit.splice
+ }
+
+ def castTo[A: WeakTypeTag](future: Expr[Fut[Any]]): Expr[Fut[A]] = reify {
+ future.splice.asInstanceOf[Fut[A]]
+ }
+ }
+}
+
/**
* A trivial implementation of [[scala.async.FutureSystem]] that performs computations
* on the current thread. Useful for testing.
@@ -142,7 +197,7 @@ object IdentityFutureSystem extends FutureSystem {
def future[A: WeakTypeTag](t: Expr[A])(execContext: Expr[ExecContext]) = t
def onComplete[A, U](future: Expr[Fut[A]], fun: Expr[scala.util.Try[A] => U],
- execContext: Expr[ExecContext]): Expr[Unit] = reify {
+ execContext: Expr[ExecContext], stateMachine: Expr[StateMachine[Prom[A], ExecContext]]): Expr[Unit] = reify {
fun.splice.apply(util.Success(future.splice))
context.literalUnit.splice
}
diff --git a/src/test/scala/scala/async/run/sequential/SequentialSpec.scala b/src/test/scala/scala/async/run/sequential/SequentialSpec.scala
new file mode 100644
index 0000000..eec4d1e
--- /dev/null
+++ b/src/test/scala/scala/async/run/sequential/SequentialSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2012 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.async
+package run
+package sequential
+
+import scala.concurrent.{Future, Promise, ExecutionContext, future, Await}
+import scala.concurrent.duration._
+import scala.async.BlockingAsync._
+
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.junit.Test
+
+@RunWith(classOf[JUnit4])
+class SequentialSpec {
+
+ import ExecutionContext.Implicits.global
+
+ def m1(y: Int): Future[Int] = async {
+ val f = future { y + 2 }
+ val f2 = future { y + 3 }
+ val x1 = await(f)
+ val x2 = await(f2)
+ x1 + x2
+ }
+
+ def m2(y: Int): Future[Int] = async {
+ val f = future { y + 2 }
+ val res = await(f)
+ if (y > 0) res + 2
+ else res - 2
+ }
+
+ @Test
+ def testSequentialExecution() {
+ val fut1 = m1(10)
+ val res1 = Await.result(fut1, 2.seconds)
+ assert(res1 == 25, s"expected 25, got $res1")
+
+ val fut2 = m2(10)
+ val res2 = Await.result(fut2, 2.seconds)
+ assert(res2 == 14, s"expected 14, got $res2")
+ }
+
+}