aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-05-10 15:48:48 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-06-21 12:14:16 -0400
commit62c4781400dd908c2fccdcebf0dc816ff0cb8ed4 (patch)
treeb3632497d0b6532258324de4e12ce6d208dfd31d /core/src
parentc79a6078c34c207ad9f9910252f5849424828bf1 (diff)
downloadspark-62c4781400dd908c2fccdcebf0dc816ff0cb8ed4.tar.gz
spark-62c4781400dd908c2fccdcebf0dc816ff0cb8ed4.tar.bz2
spark-62c4781400dd908c2fccdcebf0dc816ff0cb8ed4.zip
Add tests and fixes for Python daemon shutdown
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/spark/api/python/PythonWorker.scala4
2 files changed, 5 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 5691e24c32..5b55d45212 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -44,6 +44,7 @@ class SparkEnv (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorker]()
def stop() {
+ pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
shuffleFetcher.stop()
diff --git a/core/src/main/scala/spark/api/python/PythonWorker.scala b/core/src/main/scala/spark/api/python/PythonWorker.scala
index 8ee3c6884f..74c8c6d37a 100644
--- a/core/src/main/scala/spark/api/python/PythonWorker.scala
+++ b/core/src/main/scala/spark/api/python/PythonWorker.scala
@@ -33,6 +33,10 @@ private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, Strin
}
}
+ def stop() {
+ stopDaemon
+ }
+
private def startDaemon() {
synchronized {
// Is it already running?