summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zaugg <jzaugg@gmail.com>2013-09-21 14:41:12 -0700
committerJason Zaugg <jzaugg@gmail.com>2013-09-21 14:41:12 -0700
commit892aa93cf7b74f9bf1c3194930186b33f892fc02 (patch)
treef4b3268f6cb3d8246f8520571570ee87c022ab14
parente176a1005186f5d6da8ed5a79820d12ca7280e3b (diff)
parent7f4b44b612abdc62fba9810194cee17c7d0de37e (diff)
downloadscala-892aa93cf7b74f9bf1c3194930186b33f892fc02.tar.gz
scala-892aa93cf7b74f9bf1c3194930186b33f892fc02.tar.bz2
scala-892aa93cf7b74f9bf1c3194930186b33f892fc02.zip
Merge pull request #2969 from retronym/ticket/7861
SI-7861 Don't execute internal callbacks on the user Executor
-rw-r--r--src/library/scala/concurrent/Future.scala11
-rw-r--r--test/files/run/future-flatmap-exec-count.check6
-rw-r--r--test/files/run/future-flatmap-exec-count.scala61
3 files changed, 71 insertions, 7 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index caf91fbb0f..39946e4472 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -96,11 +96,8 @@ trait Future[+T] extends Awaitable[T] {
// of the Future trait. Note that this will
// (modulo bugs) _never_ execute a callback
// other than those below in this same file.
- // As a nice side benefit, having this implicit
- // here forces an ambiguity in those methods
- // that also have an executor parameter, which
- // keeps us from accidentally forgetting to use
- // the executor parameter.
+ //
+ // See the documentation on `InternalCallbackExecutor` for more details.
private def internalExecutor = Future.InternalCallbackExecutor
/* Callbacks */
@@ -254,7 +251,7 @@ trait Future[+T] extends Awaitable[T] {
case Success(v) => try f(v) match {
// If possible, link DefaultPromises to avoid space leaks
case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
- case fut => fut onComplete p.complete
+ case fut => fut.onComplete(p.complete)(internalExecutor)
} catch { case NonFatal(t) => p failure t }
}
p.future
@@ -344,7 +341,7 @@ trait Future[+T] extends Awaitable[T] {
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete {
- case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this) onComplete p.complete catch { case NonFatal(t) => p failure t }
+ case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t }
case other => p complete other
}
p.future
diff --git a/test/files/run/future-flatmap-exec-count.check b/test/files/run/future-flatmap-exec-count.check
new file mode 100644
index 0000000000..dd9dce64ed
--- /dev/null
+++ b/test/files/run/future-flatmap-exec-count.check
@@ -0,0 +1,6 @@
+mapping
+execute()
+flatmapping
+execute()
+recovering
+execute()
diff --git a/test/files/run/future-flatmap-exec-count.scala b/test/files/run/future-flatmap-exec-count.scala
new file mode 100644
index 0000000000..86c37be938
--- /dev/null
+++ b/test/files/run/future-flatmap-exec-count.scala
@@ -0,0 +1,61 @@
+import scala.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+
+object Test {
+ def main(args: Array[String]) {
+ test()
+ }
+
+ def test() = {
+ def await(f: Future[Any]) =
+ Await.result(f, duration.Duration.Inf)
+
+ val ec = new TestExecutionContext(ExecutionContext.Implicits.global)
+
+ {
+ val p = Promise[Int]()
+ val fp = p.future
+ println("mapping")
+ val mapped = fp.map(x => x)(ec)
+ p.success(0)
+ await(mapped)
+ }
+
+ {
+ println("flatmapping")
+ val p = Promise[Int]()
+ val fp = p.future
+ val flatMapped = fp.flatMap({ (x: Int) =>
+ Future.successful(2 * x)
+ })(ec)
+ p.success(0)
+ await(flatMapped)
+ }
+
+ {
+ println("recovering")
+ val recovered = Future.failed(new Throwable()).recoverWith {
+ case _ => Future.successful(2)
+ }(ec)
+ await(recovered)
+ }
+ }
+
+ class TestExecutionContext(delegate: ExecutionContext) extends ExecutionContext {
+ def execute(runnable: Runnable): Unit = ???
+
+ def reportFailure(t: Throwable): Unit = ???
+
+ override def prepare(): ExecutionContext = {
+ val preparedDelegate = delegate.prepare()
+ return new ExecutionContext {
+ def execute(runnable: Runnable): Unit = {
+ println("execute()")
+ preparedDelegate.execute(runnable)
+ }
+
+ def reportFailure(t: Throwable): Unit = ???
+ }
+ }
+ }
+}