aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-14 15:09:43 -0700
committerAndrew Or <andrew@databricks.com>2015-09-14 15:09:43 -0700
commit7b6c856367b9c36348e80e83959150da9656c4dd (patch)
tree6b0171d7707e907c610f8570959cdb37ba523d92 /core
parentfd1e8cddf2635c55fec2ac6e1f1c221c9685af0f (diff)
downloadspark-7b6c856367b9c36348e80e83959150da9656c4dd.tar.gz
spark-7b6c856367b9c36348e80e83959150da9656c4dd.tar.bz2
spark-7b6c856367b9c36348e80e83959150da9656c4dd.zip
[SPARK-10564] ThreadingSuite: assertion failures in threads don't fail the test (round 2)
This is a follow-up patch to #8723. I missed one case there. Author: Andrew Or <andrew@databricks.com> Closes #8727 from andrewor14/fix-threading-suite.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala23
1 files changed, 15 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index cda2b24552..a96a4ce201 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
}.start()
}
sem.acquire(2)
+ throwable.foreach { t => throw t }
if (ThreadingSuiteState.failed.get()) {
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
- throwable.foreach { t => throw t }
}
test("set local properties in different thread") {
@@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())
sem.acquire(5)
- assert(sc.getLocalProperty("test") === null)
throwable.foreach { t => throw t }
+ assert(sc.getLocalProperty("test") === null)
}
test("set and get local properties in parent-children thread") {
@@ -207,15 +207,16 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())
sem.acquire(5)
+ throwable.foreach { t => throw t }
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
- throwable.foreach { t => throw t }
}
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
val jobStarted = new Semaphore(0)
val jobEnded = new Semaphore(0)
@volatile var jobResult: JobResult = null
+ var throwable: Option[Throwable] = None
sc = new SparkContext("local", "test")
sc.setJobGroup("originalJobGroupId", "description")
@@ -232,14 +233,19 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
// Create a new thread which will inherit the current thread's properties
val thread = new Thread() {
override def run(): Unit = {
- assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
- // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
try {
- sc.parallelize(1 to 100).foreach { x =>
- Thread.sleep(100)
+ assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
+ // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
+ try {
+ sc.parallelize(1 to 100).foreach { x =>
+ Thread.sleep(100)
+ }
+ } catch {
+ case s: SparkException => // ignored so that we don't print noise in test logs
}
} catch {
- case s: SparkException => // ignored so that we don't print noise in test logs
+ case t: Throwable =>
+ throwable = Some(t)
}
}
}
@@ -252,6 +258,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
// modification of the properties object should not affect the properties of running jobs
sc.cancelJobGroup("originalJobGroupId")
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
+ throwable.foreach { t => throw t }
assert(jobResult.isInstanceOf[JobFailed])
}
}