diff options
author | Denny <dennybritz@gmail.com> | 2012-09-07 14:09:12 -0700 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-09-10 12:48:58 -0700 |
commit | f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9 (patch) | |
tree | b2d1244cb94c704a56fd790875cde6e0ce0c5463 /core | |
parent | 9ead8ab14e821046714217f24c4c476bd913f605 (diff) | |
download | spark-f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9.tar.gz spark-f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9.tar.bz2 spark-f2ac55840c8b56d5ab677b6b5d37458c7ddc83a9.zip |
Add shutdown hook to Executor Runner and execute code to shutdown local cluster in Scheduler Backend
Diffstat (limited to 'core')
4 files changed, 29 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index a3a1000d30..d7bd832e52 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -110,6 +110,9 @@ class SparkContext( val sparkUrl = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) scheduler.initialize(backend) + backend.shutdownHook = (backend: SparkDeploySchedulerBackend) => { + localCluster.stop() + } scheduler case _ => diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 6723523d5b..da74df4dcf 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -16,9 +16,11 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, val localIpAddress = Utils.localIpAddress var masterActor : ActorRef = _ + var masterActorSystem : ActorSystem = _ var masterPort : Int = _ var masterUrl : String = _ + val slaveActorSystems = ArrayBuffer[ActorSystem]() val slaveActors = ArrayBuffer[ActorRef]() def start() : String = { @@ -41,6 +43,7 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, (1 to numSlaves).foreach { slaveNum => val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) + slaveActorSystems += actorSystem threadPool.execute(new Runnable { def run() { val actor = actorSystem.actorOf( @@ -52,22 +55,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, }) } - - // Shutdown hook that kills actors on shutdown. - Runtime.getRuntime.addShutdownHook( - new Thread() { - override def run() { - masterActorSystem.stop(masterActor) - masterActorSystem.shutdown() - // Since above is asynchronous wait for the master actor to shut down - while(!masterActor.isTerminated) { - Thread.sleep(10) - } - } - }) - return masterUrl } + + def stop() { + logInfo("Shutting down local Spark cluster.") + masterActorSystem.shutdown() + slaveActorSystems.foreach(_.shutdown()) + } }
\ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 7ad216f90e..393f4a3ee6 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -35,6 +35,19 @@ class ExecutorRunner( override def run() { fetchAndRunExecutor() } } workerThread.start() + + // Shutdown hook that kills actors on shutdown. + Runtime.getRuntime.addShutdownHook( + new Thread() { + override def run() { + if(process != null) { + logInfo("Shutdown Hook killing process.") + process.destroy() + process.waitFor() + } + } + }) + } /** Stop this executor runner, including killing the process it launched */ diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 0bd2d15479..ec3ff38d5c 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend( var client: Client = null var stopping = false + var shutdownHook : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt @@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend( stopping = true; super.stop() client.stop() + if (shutdownHook != null) { + shutdownHook(this) + } } def connected(jobId: String) { |