aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-06-22 08:35:41 -0500
committerImran Rashid <irashid@cloudera.com>2016-06-22 08:35:41 -0500
commitcf1995a97645f0b44c997f4fdbba631fd6b91a16 (patch)
tree7b5ef038baa09c960ac60a058b57f9884a01c51a /core
parent01277d4b259dcf9cad25eece1377162b7a8c946d (diff)
downloadspark-cf1995a97645f0b44c997f4fdbba631fd6b91a16.tar.gz
spark-cf1995a97645f0b44c997f4fdbba631fd6b91a16.tar.bz2
spark-cf1995a97645f0b44c997f4fdbba631fd6b91a16.zip
[SPARK-15783][CORE] Fix Flakiness in BlacklistIntegrationSuite
## What changes were proposed in this pull request? Three changes here -- first two were causing failures w/ BlacklistIntegrationSuite 1. The testing framework didn't include the reviveOffers thread, so the test which involved delay scheduling might never submit offers late enough for the delay scheduling to kick in. So added in the periodic revive offers, just like the real scheduler. 2. `assertEmptyDataStructures` would occasionally fail, because it appeared there was still an active job. This is because in DAGScheduler, the jobWaiter is notified of the job completion before the data structures are cleaned up. Most of the time the test code that is waiting on the jobWaiter won't become active until after the data structures are cleared, but occasionally the race goes the other way, and the assertions fail. 3. `DAGSchedulerSuite` was not stopping all the inner parts it was setting up, so each test was leaking a number of threads. So we stop those parts too. 4. Turns out that `assertMapOutputAvailable` is not terribly useful in this framework -- most of the places I was trying to use it suffer from some race. 5. When there is an exception in the backend, try to improve the error msg a little bit. Before the exception was printed to the console, but the test would fail w/ a timeout, and the logs wouldn't show anything. ## How was this patch tested? I ran all the tests in `BlacklistIntegrationSuite` 5k times and everything in `DAGSchedulerSuite` 1k times on my laptop. Also I ran a full jenkins build with `BlacklistIntegrationSuite` 500 times and `DAGSchedulerSuite` 50 times, see https://github.com/apache/spark/pull/13548. (I tried more times but jenkins timed out.) To check for more leaked threads, I added some code to dump the list of all threads at the end of each test in DAGSchedulerSuite, which is how I discovered the mapOutputTracker and eventLoop were leaking threads. (I removed that code from the final pr, just part of the testing.) And I'll run Jenkins on this a couple of times to do one more check. Author: Imran Rashid <irashid@cloudera.com> Closes #13565 from squito/blacklist_extra_tests.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala73
4 files changed, 76 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d4e0d6db0c..4eb7c81f9e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1465,8 +1465,10 @@ class DAGScheduler(
}
if (ableToCancelStages) {
- job.listener.jobFailed(error)
+ // SPARK-15783 important to cleanup state first, just for tests where we have some asserts
+ // against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
+ job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index d8a4b1921a..8ba2697dd9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark._
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{
val badHost = "host-0"
+ val duration = Duration(10, SECONDS)
/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
@@ -41,12 +42,11 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
- ignore("If preferred node is bad, without blacklist job will fail") {
+ testScheduler("If preferred node is bad, without blacklist job will fail") {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
- val duration = Duration(1, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}
@@ -54,7 +54,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
// even with the blacklist turned on, if maxTaskFailures is not more than the number
// of executors on the bad node, then locality preferences will lead to us cycling through
// the executors on the bad node, and still failing the job
- ignoreScheduler(
+ testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
@@ -64,15 +64,14 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
- val duration = Duration(3, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}
// Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
// schedule on a good node and succeed the job
- ignoreScheduler(
+ testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
@@ -86,8 +85,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
- val duration = Duration(1, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty(noFailure = true)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 63a494006c..33824749ae 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -214,7 +214,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
results.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
- mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
+ mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
+ override def sendTracker(message: Any): Unit = {
+ // no-op, just so we can stop this to avoid leaking threads
+ }
+ }
scheduler = new DAGScheduler(
sc,
taskScheduler,
@@ -228,6 +232,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def afterEach(): Unit = {
try {
scheduler.stop()
+ dagEventProcessLoopTester.stop()
+ mapOutputTracker.stop()
+ broadcastManager.stop()
} finally {
super.afterEach()
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 12dfa56626..634b94f4c3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.scheduler
import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
@@ -32,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
/**
* Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
@@ -55,6 +56,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
results.clear()
failure = null
+ backendException.set(null)
super.beforeEach()
}
@@ -90,11 +92,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}
- // still a few races to work out in the blacklist tests, so ignore some tests
- def ignoreScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = {
- ignore(name)(testBody)
- }
-
/**
* A map from partition -> results for all tasks of a job when you call this test framework's
* [[submit]] method. Two important considerations:
@@ -167,6 +164,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
assert(failure != null)
}
assert(scheduler.activeJobs.isEmpty)
+ assert(backendException.get() == null)
}
/**
@@ -204,6 +202,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
new MockRDD(sc, nParts, shuffleDeps)
}
+ val backendException = new AtomicReference[Exception](null)
+
/**
* Helper which makes it a little easier to setup a test, which starts a mock backend in another
* thread, responding to tasks with your custom function. You also supply the "body" of your
@@ -218,7 +218,17 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
override def run(): Unit = {
while (backendContinue.get()) {
if (backend.hasTasksWaitingToRun) {
- backendFunc()
+ try {
+ backendFunc()
+ } catch {
+ case ex: Exception =>
+ // Try to do a little error handling around exceptions that might occur here --
+ // otherwise it can just look like a TimeoutException in the test itself.
+ logError("Exception in mock backend:", ex)
+ backendException.set(ex)
+ backendContinue.set(false)
+ throw ex
+ }
} else {
Thread.sleep(10)
}
@@ -234,6 +244,25 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}
+ /**
+ * Helper to do a little extra error checking while waiting for the job to terminate. Primarily
+ * just does a little extra error handling if there is an exception from the backend.
+ */
+ def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
+ try {
+ Await.ready(jobFuture, duration)
+ } catch {
+ case te: TimeoutException if backendException.get() != null =>
+ val msg = raw"""
+ | ----- Begin Backend Failure Msg -----
+ | ${Utils.exceptionString(backendException.get())}
+ | ----- End Backend Failure Msg ----
+ """.
+ stripMargin
+
+ fail(s"Future timed out after ${duration}, likely because of failure in backend: $msg")
+ }
+ }
}
/**
@@ -245,6 +274,17 @@ private[spark] abstract class MockBackend(
conf: SparkConf,
val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging {
+ // Periodically revive offers to allow delay scheduling to work
+ private val reviveThread =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
+ private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
+
+ reviveThread.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ reviveOffers()
+ }
+ }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
+
/**
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
@@ -310,7 +350,9 @@ private[spark] abstract class MockBackend(
override def start(): Unit = {}
- override def stop(): Unit = {}
+ override def stop(): Unit = {
+ reviveThread.shutdown()
+ }
val env = SparkEnv.get
@@ -334,8 +376,9 @@ private[spark] abstract class MockBackend(
}
/**
- * This is called by the scheduler whenever it has tasks it would like to schedule. It gets
- * called in the scheduling thread, not the backend thread.
+ * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks
+ * completes (which will be in a result-getter thread), and by the reviveOffers thread for delay
+ * scheduling.
*/
override def reviveOffers(): Unit = {
val offers: Seq[WorkerOffer] = generateOffers()
@@ -484,7 +527,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty()
@@ -536,7 +579,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(d, (0 until 30).toArray)
val duration = Duration(1, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap)
assertDataStructuresEmpty()
@@ -576,7 +619,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(shuffledRdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
@@ -591,7 +634,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
- Await.ready(jobFuture, duration)
+ awaitJobTermination(jobFuture, duration)
failure.getMessage.contains("test task failure")
}
assertDataStructuresEmpty(noFailure = false)