aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorBryan Cutler <cutlerb@gmail.com>2016-08-11 14:49:11 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-11 14:49:11 -0700
commit1c9a386c6b6812a3931f3fb0004249894a01f657 (patch)
tree5548fcaa17b7e6a72846c25600d6df21fb95b040 /core/src/test/scala
parentcf9367826c38e5f34ae69b409f5d09c55ed1d319 (diff)
downloadspark-1c9a386c6b6812a3931f3fb0004249894a01f657.tar.gz
spark-1c9a386c6b6812a3931f3fb0004249894a01f657.tar.bz2
spark-1c9a386c6b6812a3931f3fb0004249894a01f657.zip
[SPARK-13602][CORE] Add shutdown hook to DriverRunner to prevent driver process leak
## What changes were proposed in this pull request? Added shutdown hook to DriverRunner to kill the driver process in case the Worker JVM exits suddenly and the `WorkerWatcher` was unable to properly catch this. Did some cleanup to consolidate driver state management and setting of finalized vars within the running thread. ## How was this patch tested? Added unit tests to verify that final state and exception variables are set accordingly for successfull, failed, and errors in the driver process. Retrofitted existing test to verify killing of mocked process ends with the correct state and stops properly Manually tested (with deploy-mode=cluster) that the shutdown hook is called by forcibly exiting the `Worker` and various points in the code with the `WorkerWatcher` both disabled and enabled. Also, manually killed the driver through the ui and verified that the `DriverRunner` interrupted, killed the process and exited properly. Author: Bryan Cutler <cutlerb@gmail.com> Closes #11746 from BryanCutler/DriverRunner-shutdown-hook-SPARK-13602.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala73
1 files changed, 71 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index 2a1696be36..52956045d5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -19,13 +19,18 @@ package org.apache.spark.deploy.worker
import java.io.File
+import scala.concurrent.duration._
+
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Clock
class DriverRunnerTest extends SparkFunSuite {
@@ -33,8 +38,10 @@ class DriverRunnerTest extends SparkFunSuite {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
val conf = new SparkConf()
- new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
- driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf))
+ val worker = mock(classOf[RpcEndpointRef])
+ doNothing().when(worker).send(any())
+ spy(new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
+ driverDescription, worker, "spark://1.2.3.4/worker/", new SecurityManager(conf)))
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
@@ -45,6 +52,19 @@ class DriverRunnerTest extends SparkFunSuite {
(processBuilder, process)
}
+ private def createTestableDriverRunner(
+ processBuilder: ProcessBuilderLike,
+ superviseRetry: Boolean) = {
+ val runner = createDriverRunner()
+ runner.setSleeper(mock(classOf[Sleeper]))
+ doAnswer(new Answer[Int] {
+ def answer(invocation: InvocationOnMock): Int = {
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = superviseRetry)
+ }
+ }).when(runner).prepareAndRunDriver()
+ runner
+ }
+
test("Process succeeds instantly") {
val runner = createDriverRunner()
@@ -145,4 +165,53 @@ class DriverRunnerTest extends SparkFunSuite {
verify(sleeper, times(2)).sleep(2)
}
+ test("Kill process finalized with state KILLED") {
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ val runner = createTestableDriverRunner(processBuilder, superviseRetry = true)
+
+ when(process.waitFor()).thenAnswer(new Answer[Int] {
+ def answer(invocation: InvocationOnMock): Int = {
+ runner.kill()
+ -1
+ }
+ })
+
+ runner.start()
+
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(runner.finalState.get === DriverState.KILLED)
+ }
+ verify(process, times(1)).waitFor()
+ }
+
+ test("Finalized with state FINISHED") {
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ val runner = createTestableDriverRunner(processBuilder, superviseRetry = true)
+ when(process.waitFor()).thenReturn(0)
+ runner.start()
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(runner.finalState.get === DriverState.FINISHED)
+ }
+ }
+
+ test("Finalized with state FAILED") {
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ val runner = createTestableDriverRunner(processBuilder, superviseRetry = false)
+ when(process.waitFor()).thenReturn(-1)
+ runner.start()
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(runner.finalState.get === DriverState.FAILED)
+ }
+ }
+
+ test("Handle exception starting process") {
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ val runner = createTestableDriverRunner(processBuilder, superviseRetry = false)
+ when(processBuilder.start()).thenThrow(new NullPointerException("bad command list"))
+ runner.start()
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(runner.finalState.get === DriverState.ERROR)
+ assert(runner.finalException.get.isInstanceOf[RuntimeException])
+ }
+ }
}