aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-07 11:39:44 -0700
committerDenny <dennybritz@gmail.com>2012-09-10 12:48:58 -0700
commit9ead8ab14e821046714217f24c4c476bd913f605 (patch)
tree50ef2ce67a2d129a28485f2bb7db937dcfc1ecd3
parent8bb3c739774e3ab79588a83aad6d2cf0f2e12857 (diff)
downloadspark-9ead8ab14e821046714217f24c4c476bd913f605.tar.gz
spark-9ead8ab14e821046714217f24c4c476bd913f605.tar.bz2
spark-9ead8ab14e821046714217f24c4c476bd913f605.zip
Set SPARK_LAUNCH_WITH_SCALA=0 in Executor Runner
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala20
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala3
2 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 5c535ed502..6723523d5b 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -26,14 +26,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
/* Start the Master */
- val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
+ val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterUrl = "spark://" + localIpAddress + ":" + masterPort
threadPool.execute(new Runnable {
def run() {
- val actor = actorSystem.actorOf(
+ val actor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
masterActor = actor
- actorSystem.awaitTermination()
+ masterActorSystem.awaitTermination()
}
})
@@ -52,6 +52,20 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
})
}
+
+ // Shutdown hook that kills actors on shutdown.
+ Runtime.getRuntime.addShutdownHook(
+ new Thread() {
+ override def run() {
+ masterActorSystem.stop(masterActor)
+ masterActorSystem.shutdown()
+ // Since above is asynchronous wait for the master actor to shut down
+ while(!masterActor.isTerminated) {
+ Thread.sleep(10)
+ }
+ }
+ })
+
return masterUrl
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 3e24380810..7ad216f90e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -131,6 +131,9 @@ class ExecutorRunner(
}
env.put("SPARK_CORES", cores.toString)
env.put("SPARK_MEMORY", memory.toString)
+ // In case we are running this from within the Spark Shell
+ // so we are not creating a parent process.
+ env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
// Redirect its stdout and stderr to files