aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-12-19 18:43:59 -0800
committerYin Huai <yhuai@databricks.com>2016-12-19 18:43:59 -0800
commitfa829ce21fb84028d90b739a49c4ece70a17ccfd (patch)
tree1ac5a7e18d76a3dfb94209c361fcb48f1b13eba0 /core/src/test/scala/org
parent5857b9ac2d9808d9b89a5b29620b5052e2beebf5 (diff)
downloadspark-fa829ce21fb84028d90b739a49c4ece70a17ccfd.tar.gz
spark-fa829ce21fb84028d90b739a49c4ece70a17ccfd.tar.bz2
spark-fa829ce21fb84028d90b739a49c4ece70a17ccfd.zip
[SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors
## What changes were proposed in this pull request? Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks. This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details. ## How was this patch tested? Tested via a new test case in `JobCancellationSuite`, plus manual testing. Author: Josh Rosen <joshrosen@databricks.com> Closes #16189 from JoshRosen/cancellation.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala77
1 files changed, 77 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index a3490fc79e..99150a1430 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -209,6 +209,83 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(jobB.get() === 100)
}
+ test("task reaper kills JVM if killed tasks keep running for too long") {
+ val conf = new SparkConf()
+ .set("spark.task.reaper.enabled", "true")
+ .set("spark.task.reaper.killTimeout", "5s")
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+
+ // Add a listener to release the semaphore once any tasks are launched.
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ sem.release()
+ }
+ })
+
+ // jobA is the one to be cancelled.
+ val jobA = Future {
+ sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
+ sc.parallelize(1 to 10000, 2).map { i =>
+ while (true) { }
+ }.count()
+ }
+
+ // Block until both tasks of job A have started and cancel job A.
+ sem.acquire(2)
+ // Small delay to ensure tasks actually start executing the task body
+ Thread.sleep(1000)
+
+ sc.clearJobGroup()
+ val jobB = sc.parallelize(1 to 100, 2).countAsync()
+ sc.cancelJobGroup("jobA")
+ val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 15.seconds) }.getCause
+ assert(e.getMessage contains "cancel")
+
+ // Once A is cancelled, job B should finish fairly quickly.
+ assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+ }
+
+ test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
+ val conf = new SparkConf()
+ .set("spark.task.reaper.enabled", "true")
+ .set("spark.task.reaper.killTimeout", "-1")
+ .set("spark.task.reaper.PollingInterval", "1s")
+ .set("spark.deploy.maxExecutorRetries", "1")
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+
+ // Add a listener to release the semaphore once any tasks are launched.
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ sem.release()
+ }
+ })
+
+ // jobA is the one to be cancelled.
+ val jobA = Future {
+ sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
+ sc.parallelize(1 to 2, 2).map { i =>
+ val startTime = System.currentTimeMillis()
+ while (System.currentTimeMillis() < startTime + 10000) { }
+ }.count()
+ }
+
+ // Block until both tasks of job A have started and cancel job A.
+ sem.acquire(2)
+ // Small delay to ensure tasks actually start executing the task body
+ Thread.sleep(1000)
+
+ sc.clearJobGroup()
+ val jobB = sc.parallelize(1 to 100, 2).countAsync()
+ sc.cancelJobGroup("jobA")
+ val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 15.seconds) }.getCause
+ assert(e.getMessage contains "cancel")
+
+ // Once A is cancelled, job B should finish fairly quickly.
+ assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+ }
+
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
// twoJobsSharingStageSemaphore: