aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala131
1 files changed, 131 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
new file mode 100644
index 0000000000..45dbcaffae
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -0,0 +1,131 @@
+package org.apache.spark.deploy.worker
+
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.scalatest.FunSuite
+
+import org.apache.spark.deploy.{Command, DriverDescription}
+import org.mockito.stubbing.Answer
+import org.mockito.invocation.InvocationOnMock
+
+class DriverRunnerTest extends FunSuite {
+ private def createDriverRunner() = {
+ val command = new Command("mainClass", Seq(), Map())
+ val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
+ new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
+ null, "akka://1.2.3.4/worker/")
+ }
+
+ private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
+ val processBuilder = mock(classOf[ProcessBuilderLike])
+ when(processBuilder.command).thenReturn(Seq("mocked", "command"))
+ val process = mock(classOf[Process])
+ when(processBuilder.start()).thenReturn(process)
+ (processBuilder, process)
+ }
+
+ test("Process succeeds instantly") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ // One failure then a successful run
+ when(process.waitFor()).thenReturn(0)
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(process, times(1)).waitFor()
+ verify(sleeper, times(0)).sleep(anyInt())
+ }
+
+ test("Process failing several times and then succeeding") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ // fail, fail, fail, success
+ when(process.waitFor()).thenReturn(-1).thenReturn(-1).thenReturn(-1).thenReturn(0)
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(process, times(4)).waitFor()
+ verify(sleeper, times(3)).sleep(anyInt())
+ verify(sleeper, times(1)).sleep(1)
+ verify(sleeper, times(1)).sleep(2)
+ verify(sleeper, times(1)).sleep(4)
+ }
+
+ test("Process doesn't restart if not supervised") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ when(process.waitFor()).thenReturn(-1)
+
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = false)
+
+ verify(process, times(1)).waitFor()
+ verify(sleeper, times(0)).sleep(anyInt())
+ }
+
+ test("Process doesn't restart if killed") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ when(process.waitFor()).thenAnswer(new Answer[Int] {
+ def answer(invocation: InvocationOnMock): Int = {
+ runner.kill()
+ -1
+ }
+ })
+
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(process, times(1)).waitFor()
+ verify(sleeper, times(0)).sleep(anyInt())
+ }
+
+ test("Reset of backoff counter") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val clock = mock(classOf[Clock])
+ runner.setClock(clock)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+
+ when(process.waitFor())
+ .thenReturn(-1) // fail 1
+ .thenReturn(-1) // fail 2
+ .thenReturn(-1) // fail 3
+ .thenReturn(-1) // fail 4
+ .thenReturn(0) // success
+ when(clock.currentTimeMillis())
+ .thenReturn(0).thenReturn(1000) // fail 1 (short)
+ .thenReturn(1000).thenReturn(2000) // fail 2 (short)
+ .thenReturn(2000).thenReturn(10000) // fail 3 (long)
+ .thenReturn(10000).thenReturn(11000) // fail 4 (short)
+ .thenReturn(11000).thenReturn(21000) // success (long)
+
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(sleeper, times(4)).sleep(anyInt())
+ // Expected sequence of sleeps is 1,2,1,2
+ verify(sleeper, times(2)).sleep(1)
+ verify(sleeper, times(2)).sleep(2)
+ }
+
+}