aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-11 15:02:59 -0700
committerAndrew Or <andrew@databricks.com>2015-09-11 15:02:59 -0700
commitd74c6a143cbd060c25bf14a8d306841b3ec55d03 (patch)
treeaa6d6999ca95f3d95435f5efa6f88ed4788acef1
parentc2af42b5f32287ff595ad027a8191d4b75702d8d (diff)
downloadspark-d74c6a143cbd060c25bf14a8d306841b3ec55d03.tar.gz
spark-d74c6a143cbd060c25bf14a8d306841b3ec55d03.tar.bz2
spark-d74c6a143cbd060c25bf14a8d306841b3ec55d03.zip
[SPARK-10564] ThreadingSuite: assertion failures in threads don't fail the test
This commit ensures if an assertion fails within a thread, it will ultimately fail the test. Otherwise we end up potentially masking real bugs by not propagating assertion failures properly. Author: Andrew Or <andrew@databricks.com> Closes #8723 from andrewor14/fix-threading-suite.
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala68
1 files changed, 45 insertions, 23 deletions
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 48509f0759..cda2b24552 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -119,23 +119,30 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
val nums = sc.parallelize(1 to 2, 2)
val sem = new Semaphore(0)
ThreadingSuiteState.clear()
+ var throwable: Option[Throwable] = None
for (i <- 0 until 2) {
new Thread {
override def run() {
- val ans = nums.map(number => {
- val running = ThreadingSuiteState.runningThreads
- running.getAndIncrement()
- val time = System.currentTimeMillis()
- while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
- Thread.sleep(100)
- }
- if (running.get() != 4) {
- ThreadingSuiteState.failed.set(true)
- }
- number
- }).collect()
- assert(ans.toList === List(1, 2))
- sem.release()
+ try {
+ val ans = nums.map(number => {
+ val running = ThreadingSuiteState.runningThreads
+ running.getAndIncrement()
+ val time = System.currentTimeMillis()
+ while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
+ Thread.sleep(100)
+ }
+ if (running.get() != 4) {
+ ThreadingSuiteState.failed.set(true)
+ }
+ number
+ }).collect()
+ assert(ans.toList === List(1, 2))
+ } catch {
+ case t: Throwable =>
+ throwable = Some(t)
+ } finally {
+ sem.release()
+ }
}
}.start()
}
@@ -145,18 +152,25 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
+ throwable.foreach { t => throw t }
}
test("set local properties in different thread") {
sc = new SparkContext("local", "test")
val sem = new Semaphore(0)
-
+ var throwable: Option[Throwable] = None
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
- sc.setLocalProperty("test", i.toString)
- assert(sc.getLocalProperty("test") === i.toString)
- sem.release()
+ try {
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ } catch {
+ case t: Throwable =>
+ throwable = Some(t)
+ } finally {
+ sem.release()
+ }
}
}
}
@@ -165,20 +179,27 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
+ throwable.foreach { t => throw t }
}
test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)
-
+ var throwable: Option[Throwable] = None
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
- assert(sc.getLocalProperty("test") === "parent")
- sc.setLocalProperty("test", i.toString)
- assert(sc.getLocalProperty("test") === i.toString)
- sem.release()
+ try {
+ assert(sc.getLocalProperty("test") === "parent")
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ } catch {
+ case t: Throwable =>
+ throwable = Some(t)
+ } finally {
+ sem.release()
+ }
}
}
}
@@ -188,6 +209,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
+ throwable.foreach { t => throw t }
}
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {