aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-07 11:39:44 -0700
committerDenny <dennybritz@gmail.com>2012-09-07 11:39:44 -0700
commit4e7b264cf72b3e7ea9f69c46d040df3de073caed (patch)
treef0461dcc937e49e606f807f39f17710897cb40af
parent886183e5910cfc6df5f88317fafb5f68dabd5bd6 (diff)
downloadspark-4e7b264cf72b3e7ea9f69c46d040df3de073caed.tar.gz
spark-4e7b264cf72b3e7ea9f69c46d040df3de073caed.tar.bz2
spark-4e7b264cf72b3e7ea9f69c46d040df3de073caed.zip
Set SPARK_LAUNCH_WITH_SCALA=0 in Executor Runner
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala20
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala3
2 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 5c535ed502..6723523d5b 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -26,14 +26,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
/* Start the Master */
- val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
+ val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterUrl = "spark://" + localIpAddress + ":" + masterPort
threadPool.execute(new Runnable {
def run() {
- val actor = actorSystem.actorOf(
+ val actor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
masterActor = actor
- actorSystem.awaitTermination()
+ masterActorSystem.awaitTermination()
}
})
@@ -52,6 +52,20 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
})
}
+
+ // Shutdown hook that kills actors on shutdown.
+ Runtime.getRuntime.addShutdownHook(
+ new Thread() {
+ override def run() {
+ masterActorSystem.stop(masterActor)
+ masterActorSystem.shutdown()
+ // Since above is asynchronous wait for the master actor to shut down
+ while(!masterActor.isTerminated) {
+ Thread.sleep(10)
+ }
+ }
+ })
+
return masterUrl
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 3e24380810..7ad216f90e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -131,6 +131,9 @@ class ExecutorRunner(
}
env.put("SPARK_CORES", cores.toString)
env.put("SPARK_MEMORY", memory.toString)
+ // In case we are running this from within the Spark Shell
+ // so we are not creating a parent process.
+ env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
// Redirect its stdout and stderr to files