aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-07 14:09:12 -0700
committerDenny <dennybritz@gmail.com>2012-09-10 12:48:58 -0700
commitf2ac55840c8b56d5ab677b6b5d37458c7ddc83a9 (patch)
treeb2d1244cb94c704a56fd790875cde6e0ce0c5463 /core
parent9ead8ab14e821046714217f24c4c476bd913f605 (diff)
downloadspark-f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9.tar.gz
spark-f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9.tar.bz2
spark-f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9.zip
Add shutdown hook to Executor Runner and execute code to shutdown local cluster in Scheduler Backend
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala23
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala13
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala4
4 files changed, 29 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index a3a1000d30..d7bd832e52 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -110,6 +110,9 @@ class SparkContext(
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
+ backend.shutdownHook = (backend: SparkDeploySchedulerBackend) => {
+ localCluster.stop()
+ }
scheduler
case _ =>
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 6723523d5b..da74df4dcf 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -16,9 +16,11 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
val localIpAddress = Utils.localIpAddress
var masterActor : ActorRef = _
+ var masterActorSystem : ActorSystem = _
var masterPort : Int = _
var masterUrl : String = _
+ val slaveActorSystems = ArrayBuffer[ActorSystem]()
val slaveActors = ArrayBuffer[ActorRef]()
def start() : String = {
@@ -41,6 +43,7 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
(1 to numSlaves).foreach { slaveNum =>
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+ slaveActorSystems += actorSystem
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
@@ -52,22 +55,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
})
}
-
- // Shutdown hook that kills actors on shutdown.
- Runtime.getRuntime.addShutdownHook(
- new Thread() {
- override def run() {
- masterActorSystem.stop(masterActor)
- masterActorSystem.shutdown()
- // Since above is asynchronous wait for the master actor to shut down
- while(!masterActor.isTerminated) {
- Thread.sleep(10)
- }
- }
- })
-
return masterUrl
}
+
+ def stop() {
+ logInfo("Shutting down local Spark cluster.")
+ masterActorSystem.shutdown()
+ slaveActorSystems.foreach(_.shutdown())
+ }
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 7ad216f90e..393f4a3ee6 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -35,6 +35,19 @@ class ExecutorRunner(
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
+
+ // Shutdown hook that kills actors on shutdown.
+ Runtime.getRuntime.addShutdownHook(
+ new Thread() {
+ override def run() {
+ if(process != null) {
+ logInfo("Shutdown Hook killing process.")
+ process.destroy()
+ process.waitFor()
+ }
+ }
+ })
+
}
/** Stop this executor runner, including killing the process it launched */
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 0bd2d15479..ec3ff38d5c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend(
var client: Client = null
var stopping = false
+ var shutdownHook : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
@@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend(
stopping = true;
super.stop()
client.stop()
+ if (shutdownHook != null) {
+ shutdownHook(this)
+ }
}
def connected(jobId: String) {