aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-09 18:37:52 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-09 18:37:52 -0800
commitd86a85e9cac09bd909a356de9181bd282c905e72 (patch)
treea815ed779e70a0efc63c56ccc38e37e2254a530f /core/src/test/scala
parent26cdb5f68a83e904e3e9a114790c729ca2eb3040 (diff)
parent67b9a33628b9934804c36620d8cbc73ef70106ce (diff)
downloadspark-d86a85e9cac09bd909a356de9181bd282c905e72.tar.gz
spark-d86a85e9cac09bd909a356de9181bd282c905e72.tar.bz2
spark-d86a85e9cac09bd909a356de9181bd282c905e72.zip
Merge pull request #293 from pwendell/standalone-driver
SPARK-998: Support Launching Driver Inside of Standalone Mode [NOTE: I need to bring the tests up to date with new changes, so for now they will fail] This patch provides support for launching driver programs inside of a standalone cluster manager. It also supports monitoring and re-launching of driver programs which is useful for long running, recoverable applications such as Spark Streaming jobs. For those jobs, this patch allows a deployment mode which is resilient to the failure of any worker node, failure of a master node (provided a multi-master setup), and even failures of the applicaiton itself, provided they are recoverable on a restart. Driver information, such as the status and logs from a driver, is displayed in the UI There are a few small TODO's here, but the code is generally feature-complete. They are: - Bring tests up to date and add test coverage - Restarting on failure should be optional and maybe off by default. - See if we can re-use akka connections to facilitate clients behind a firewall A sensible place to start for review would be to look at the `DriverClient` class which presents users the ability to launch their driver program. I've also added an example program (`DriverSubmissionTest`) that allows you to test this locally and play around with killing workers, etc. Most of the code is devoted to persisting driver state in the cluster manger, exposing it in the UI, and dealing correctly with various types of failures. Instructions to test locally: - `sbt/sbt assembly/assembly examples/assembly` - start a local version of the standalone cluster manager ``` ./spark-class org.apache.spark.deploy.client.DriverClient \ -j -Dspark.test.property=something \ -e SPARK_TEST_KEY=SOMEVALUE \ launch spark://10.99.1.14:7077 \ ../path-to-examples-assembly-jar \ org.apache.spark.examples.DriverSubmissionTest 1000 some extra options --some-option-here -X 13 ``` - Go in the UI and make sure it started correctly, look at the output etc - Kill workers, the driver program, masters, etc.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala40
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala131
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala32
4 files changed, 195 insertions, 12 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 331fa3a642..d05bbd6ff7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -25,8 +25,8 @@ import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo}
-import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
+import org.apache.spark.deploy.worker.{ExecutorRunner, DriverRunner}
class JsonProtocolSuite extends FunSuite {
test("writeApplicationInfo") {
@@ -50,11 +50,13 @@ class JsonProtocolSuite extends FunSuite {
}
test("writeMasterState") {
- val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
- val activeApps = Array[ApplicationInfo](createAppInfo())
+ val workers = Array(createWorkerInfo(), createWorkerInfo())
+ val activeApps = Array(createAppInfo())
val completedApps = Array[ApplicationInfo]()
+ val activeDrivers = Array(createDriverInfo())
+ val completedDrivers = Array(createDriverInfo())
val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
- RecoveryState.ALIVE)
+ activeDrivers, completedDrivers, RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}
@@ -62,26 +64,44 @@ class JsonProtocolSuite extends FunSuite {
test("writeWorkerState") {
val executors = List[ExecutorRunner]()
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
+ val drivers = List(createDriverRunner())
+ val finishedDrivers = List(createDriverRunner(), createDriverRunner())
val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
- finishedExecutors, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
+ finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse)
assertValidJson(output)
}
- def createAppDesc() : ApplicationDescription = {
+ def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
}
+
def createAppInfo() : ApplicationInfo = {
new ApplicationInfo(
3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue)
}
- def createWorkerInfo() : WorkerInfo = {
+
+ def createDriverCommand() = new Command(
+ "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
+ Map(("K1", "V1"), ("K2", "V2"))
+ )
+
+ def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
+ false, createDriverCommand())
+
+ def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(), new Date())
+
+ def createWorkerInfo(): WorkerInfo = {
new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
}
- def createExecutorRunner() : ExecutorRunner = {
+ def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
- new File("sparkHome"), new File("workDir"), ExecutorState.RUNNING)
+ new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING)
+ }
+ def createDriverRunner(): DriverRunner = {
+ new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
+ null, "akka://worker")
}
def assertValidJson(json: JValue) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
new file mode 100644
index 0000000000..45dbcaffae
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -0,0 +1,131 @@
+package org.apache.spark.deploy.worker
+
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.scalatest.FunSuite
+
+import org.apache.spark.deploy.{Command, DriverDescription}
+import org.mockito.stubbing.Answer
+import org.mockito.invocation.InvocationOnMock
+
+class DriverRunnerTest extends FunSuite {
+ private def createDriverRunner() = {
+ val command = new Command("mainClass", Seq(), Map())
+ val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
+ new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
+ null, "akka://1.2.3.4/worker/")
+ }
+
+ private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
+ val processBuilder = mock(classOf[ProcessBuilderLike])
+ when(processBuilder.command).thenReturn(Seq("mocked", "command"))
+ val process = mock(classOf[Process])
+ when(processBuilder.start()).thenReturn(process)
+ (processBuilder, process)
+ }
+
+ test("Process succeeds instantly") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ // One failure then a successful run
+ when(process.waitFor()).thenReturn(0)
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(process, times(1)).waitFor()
+ verify(sleeper, times(0)).sleep(anyInt())
+ }
+
+ test("Process failing several times and then succeeding") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ // fail, fail, fail, success
+ when(process.waitFor()).thenReturn(-1).thenReturn(-1).thenReturn(-1).thenReturn(0)
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(process, times(4)).waitFor()
+ verify(sleeper, times(3)).sleep(anyInt())
+ verify(sleeper, times(1)).sleep(1)
+ verify(sleeper, times(1)).sleep(2)
+ verify(sleeper, times(1)).sleep(4)
+ }
+
+ test("Process doesn't restart if not supervised") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ when(process.waitFor()).thenReturn(-1)
+
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = false)
+
+ verify(process, times(1)).waitFor()
+ verify(sleeper, times(0)).sleep(anyInt())
+ }
+
+ test("Process doesn't restart if killed") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+ when(process.waitFor()).thenAnswer(new Answer[Int] {
+ def answer(invocation: InvocationOnMock): Int = {
+ runner.kill()
+ -1
+ }
+ })
+
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(process, times(1)).waitFor()
+ verify(sleeper, times(0)).sleep(anyInt())
+ }
+
+ test("Reset of backoff counter") {
+ val runner = createDriverRunner()
+
+ val sleeper = mock(classOf[Sleeper])
+ runner.setSleeper(sleeper)
+
+ val clock = mock(classOf[Clock])
+ runner.setClock(clock)
+
+ val (processBuilder, process) = createProcessBuilderAndProcess()
+
+ when(process.waitFor())
+ .thenReturn(-1) // fail 1
+ .thenReturn(-1) // fail 2
+ .thenReturn(-1) // fail 3
+ .thenReturn(-1) // fail 4
+ .thenReturn(0) // success
+ when(clock.currentTimeMillis())
+ .thenReturn(0).thenReturn(1000) // fail 1 (short)
+ .thenReturn(1000).thenReturn(2000) // fail 2 (short)
+ .thenReturn(2000).thenReturn(10000) // fail 3 (long)
+ .thenReturn(10000).thenReturn(11000) // fail 4 (short)
+ .thenReturn(11000).thenReturn(21000) // success (long)
+
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
+
+ verify(sleeper, times(4)).sleep(anyInt())
+ // Expected sequence of sleeps is 1,2,1,2
+ verify(sleeper, times(2)).sleep(1)
+ verify(sleeper, times(2)).sleep(2)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index be93074b7b..a79ee690d3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -31,8 +31,8 @@ class ExecutorRunnerTest extends FunSuite {
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
- f("ooga"), ExecutorState.RUNNING)
+ f("ooga"), "blah", ExecutorState.RUNNING)
- assert(er.buildCommandSeq().last === appId)
+ assert(er.getCommandSeq.last === appId)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
new file mode 100644
index 0000000000..94d88d307a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
@@ -0,0 +1,32 @@
+package org.apache.spark.deploy.worker
+
+
+import akka.testkit.TestActorRef
+import org.scalatest.FunSuite
+import akka.remote.DisassociatedEvent
+import akka.actor.{ActorSystem, AddressFromURIString, Props}
+
+class WorkerWatcherSuite extends FunSuite {
+ test("WorkerWatcher shuts down on valid disassociation") {
+ val actorSystem = ActorSystem("test")
+ val targetWorkerUrl = "akka://1.2.3.4/user/Worker"
+ val targetWorkerAddress = AddressFromURIString(targetWorkerUrl)
+ val actorRef = TestActorRef[WorkerWatcher](Props(classOf[WorkerWatcher], targetWorkerUrl))(actorSystem)
+ val workerWatcher = actorRef.underlyingActor
+ workerWatcher.setTesting(testing = true)
+ actorRef.underlyingActor.receive(new DisassociatedEvent(null, targetWorkerAddress, false))
+ assert(actorRef.underlyingActor.isShutDown)
+ }
+
+ test("WorkerWatcher stays alive on invalid disassociation") {
+ val actorSystem = ActorSystem("test")
+ val targetWorkerUrl = "akka://1.2.3.4/user/Worker"
+ val otherAkkaURL = "akka://4.3.2.1/user/OtherActor"
+ val otherAkkaAddress = AddressFromURIString(otherAkkaURL)
+ val actorRef = TestActorRef[WorkerWatcher](Props(classOf[WorkerWatcher], targetWorkerUrl))(actorSystem)
+ val workerWatcher = actorRef.underlyingActor
+ workerWatcher.setTesting(testing = true)
+ actorRef.underlyingActor.receive(new DisassociatedEvent(null, otherAkkaAddress, false))
+ assert(!actorRef.underlyingActor.isShutDown)
+ }
+} \ No newline at end of file