diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-12-05 21:53:40 -0800 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-12-05 23:01:18 -0800 |
commit | ee888f6b251c4f06f2edf15267d12e42e28fd22f (patch) | |
tree | 3336b599221689cd01114f6ec470a7e5f3f25eb0 | |
parent | aebb123fd3b4bf0d57d867f33ca0325340ee42e4 (diff) | |
download | spark-ee888f6b251c4f06f2edf15267d12e42e28fd22f.tar.gz spark-ee888f6b251c4f06f2edf15267d12e42e28fd22f.tar.bz2 spark-ee888f6b251c4f06f2edf15267d12e42e28fd22f.zip |
FutureAction result tests
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index da032b17d9..0d4c10db8e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.rdd import java.util.concurrent.Semaphore +import scala.concurrent.{Await, TimeoutException} +import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts sem.acquire(2) } } + + /** + * Awaiting FutureAction results + */ + test("FutureAction result, infinite wait") { + val f = sc.parallelize(1 to 100, 4) + .countAsync() + assert(Await.result(f, Duration.Inf) === 100) + } + + test("FutureAction result, finite wait") { + val f = sc.parallelize(1 to 100, 4) + .countAsync() + assert(Await.result(f, Duration(30, "seconds")) === 100) + } + + test("FutureAction result, timeout") { + val f = sc.parallelize(1 to 100, 4) + .mapPartitions(itr => { Thread.sleep(20); itr }) + .countAsync() + intercept[TimeoutException] { + Await.result(f, Duration(20, "milliseconds")) + } + } } |