diff options
author | Bryan Cutler <cutlerb@gmail.com> | 2016-08-11 14:49:11 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-11 14:49:11 -0700 |
commit | 1c9a386c6b6812a3931f3fb0004249894a01f657 (patch) | |
tree | 5548fcaa17b7e6a72846c25600d6df21fb95b040 /core/src/test/scala | |
parent | cf9367826c38e5f34ae69b409f5d09c55ed1d319 (diff) | |
download | spark-1c9a386c6b6812a3931f3fb0004249894a01f657.tar.gz spark-1c9a386c6b6812a3931f3fb0004249894a01f657.tar.bz2 spark-1c9a386c6b6812a3931f3fb0004249894a01f657.zip |
[SPARK-13602][CORE] Add shutdown hook to DriverRunner to prevent driver process leak
## What changes were proposed in this pull request?
Added shutdown hook to DriverRunner to kill the driver process in case the Worker JVM exits suddenly and the `WorkerWatcher` was unable to properly catch this. Did some cleanup to consolidate driver state management and setting of finalized vars within the running thread.
## How was this patch tested?
Added unit tests to verify that final state and exception variables are set accordingly for successfull, failed, and errors in the driver process. Retrofitted existing test to verify killing of mocked process ends with the correct state and stops properly
Manually tested (with deploy-mode=cluster) that the shutdown hook is called by forcibly exiting the `Worker` and various points in the code with the `WorkerWatcher` both disabled and enabled. Also, manually killed the driver through the ui and verified that the `DriverRunner` interrupted, killed the process and exited properly.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes #11746 from BryanCutler/DriverRunner-shutdown-hook-SPARK-13602.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala | 73 |
1 files changed, 71 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index 2a1696be36..52956045d5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -19,13 +19,18 @@ package org.apache.spark.deploy.worker import java.io.File +import scala.concurrent.duration._ + import org.mockito.Matchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.{Command, DriverDescription} +import org.apache.spark.deploy.master.DriverState +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Clock class DriverRunnerTest extends SparkFunSuite { @@ -33,8 +38,10 @@ class DriverRunnerTest extends SparkFunSuite { val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) val conf = new SparkConf() - new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), - driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf)) + val worker = mock(classOf[RpcEndpointRef]) + doNothing().when(worker).send(any()) + spy(new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), + driverDescription, worker, "spark://1.2.3.4/worker/", new SecurityManager(conf))) } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { @@ -45,6 +52,19 @@ class DriverRunnerTest extends SparkFunSuite { (processBuilder, process) } + private def createTestableDriverRunner( + processBuilder: ProcessBuilderLike, + superviseRetry: Boolean) = { + val runner = createDriverRunner() + runner.setSleeper(mock(classOf[Sleeper])) + doAnswer(new Answer[Int] { + def answer(invocation: InvocationOnMock): Int = { + runner.runCommandWithRetry(processBuilder, p => (), supervise = superviseRetry) + } + }).when(runner).prepareAndRunDriver() + runner + } + test("Process succeeds instantly") { val runner = createDriverRunner() @@ -145,4 +165,53 @@ class DriverRunnerTest extends SparkFunSuite { verify(sleeper, times(2)).sleep(2) } + test("Kill process finalized with state KILLED") { + val (processBuilder, process) = createProcessBuilderAndProcess() + val runner = createTestableDriverRunner(processBuilder, superviseRetry = true) + + when(process.waitFor()).thenAnswer(new Answer[Int] { + def answer(invocation: InvocationOnMock): Int = { + runner.kill() + -1 + } + }) + + runner.start() + + eventually(timeout(10.seconds), interval(100.millis)) { + assert(runner.finalState.get === DriverState.KILLED) + } + verify(process, times(1)).waitFor() + } + + test("Finalized with state FINISHED") { + val (processBuilder, process) = createProcessBuilderAndProcess() + val runner = createTestableDriverRunner(processBuilder, superviseRetry = true) + when(process.waitFor()).thenReturn(0) + runner.start() + eventually(timeout(10.seconds), interval(100.millis)) { + assert(runner.finalState.get === DriverState.FINISHED) + } + } + + test("Finalized with state FAILED") { + val (processBuilder, process) = createProcessBuilderAndProcess() + val runner = createTestableDriverRunner(processBuilder, superviseRetry = false) + when(process.waitFor()).thenReturn(-1) + runner.start() + eventually(timeout(10.seconds), interval(100.millis)) { + assert(runner.finalState.get === DriverState.FAILED) + } + } + + test("Handle exception starting process") { + val (processBuilder, process) = createProcessBuilderAndProcess() + val runner = createTestableDriverRunner(processBuilder, superviseRetry = false) + when(processBuilder.start()).thenThrow(new NullPointerException("bad command list")) + runner.start() + eventually(timeout(10.seconds), interval(100.millis)) { + assert(runner.finalState.get === DriverState.ERROR) + assert(runner.finalException.get.isInstanceOf[RuntimeException]) + } + } } |