aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJoshi <rekhajoshm@gmail.com>2015-06-30 14:00:35 -0700
committerAndrew Or <andrew@databricks.com>2015-06-30 14:00:35 -0700
commit7dda0844e1eb6df7455af68592751806b3b92251 (patch)
treec7505cc4aa0b7582bbfd6903b7f03f40a7acb71b /core
parent79f0b371a36560a009c1b0943c928adc5a1bdd8f (diff)
downloadspark-7dda0844e1eb6df7455af68592751806b3b92251.tar.gz
spark-7dda0844e1eb6df7455af68592751806b3b92251.tar.bz2
spark-7dda0844e1eb6df7455af68592751806b3b92251.zip
[SPARK-2645] [CORE] Allow SparkEnv.stop() to be called multiple times without side effects.
Fix for SparkContext stop behavior - Allow sc.stop() to be called multiple times without side effects. Author: Joshi <rekhajoshm@gmail.com> Author: Rekha Joshi <rekhajoshm@gmail.com> Closes #6973 from rekhajoshm/SPARK-2645 and squashes the following commits: 277043e [Joshi] Fix for SparkContext stop behavior 446b0a4 [Joshi] Fix for SparkContext stop behavior 2ce5760 [Joshi] Fix for SparkContext stop behavior c97839a [Joshi] Fix for SparkContext stop behavior 1aff39c [Joshi] Fix for SparkContext stop behavior 12f66b5 [Joshi] Fix for SparkContext stop behavior 72bb484 [Joshi] Fix for SparkContext stop behavior a5a7d7f [Joshi] Fix for SparkContext stop behavior 9193a0c [Joshi] Fix for SparkContext stop behavior 58dba70 [Joshi] SPARK-2645: Fix for SparkContext stop behavior 380c5b0 [Joshi] SPARK-2645: Fix for SparkContext stop behavior b566b66 [Joshi] SPARK-2645: Fix for SparkContext stop behavior 0be142d [Rekha Joshi] Merge pull request #3 from apache/master 106fd8e [Rekha Joshi] Merge pull request #2 from apache/master e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala66
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala13
2 files changed, 47 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index b0665570e2..1b133fbdfa 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -22,7 +22,6 @@ import java.net.Socket
import akka.actor.ActorSystem
-import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties
@@ -90,39 +89,42 @@ class SparkEnv (
private var driverTmpDirToDelete: Option[String] = None
private[spark] def stop() {
- isStopped = true
- pythonWorkers.foreach { case(key, worker) => worker.stop() }
- Option(httpFileServer).foreach(_.stop())
- mapOutputTracker.stop()
- shuffleManager.stop()
- broadcastManager.stop()
- blockManager.stop()
- blockManager.master.stop()
- metricsSystem.stop()
- outputCommitCoordinator.stop()
- rpcEnv.shutdown()
-
- // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
- // down, but let's call it anyway in case it gets fixed in a later release
- // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
- // actorSystem.awaitTermination()
-
- // Note that blockTransferService is stopped by BlockManager since it is started by it.
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs.
- // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
- // current working dir in executor which we do not need to delete.
- driverTmpDirToDelete match {
- case Some(path) => {
- try {
- Utils.deleteRecursively(new File(path))
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: $path", e)
+
+ if (!isStopped) {
+ isStopped = true
+ pythonWorkers.values.foreach(_.stop())
+ Option(httpFileServer).foreach(_.stop())
+ mapOutputTracker.stop()
+ shuffleManager.stop()
+ broadcastManager.stop()
+ blockManager.stop()
+ blockManager.master.stop()
+ metricsSystem.stop()
+ outputCommitCoordinator.stop()
+ rpcEnv.shutdown()
+
+ // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
+ // down, but let's call it anyway in case it gets fixed in a later release
+ // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
+ // actorSystem.awaitTermination()
+
+ // Note that blockTransferService is stopped by BlockManager since it is started by it.
+
+ // If we only stop sc, but the driver process still run as a services then we need to delete
+ // the tmp dir, if not, it will create too many tmp dirs.
+ // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
+ // current working dir in executor which we do not need to delete.
+ driverTmpDirToDelete match {
+ case Some(path) => {
+ try {
+ Utils.deleteRecursively(new File(path))
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while deleting Spark temp dir: $path", e)
+ }
}
+ case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
- case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 6838b35ab4..5c57940fa5 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
import scala.concurrent.Await
import scala.concurrent.duration.Duration
+import org.scalatest.Matchers._
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
@@ -272,4 +273,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}
+
+ test("calling multiple sc.stop() must not throw any exception") {
+ noException should be thrownBy {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val cnt = sc.parallelize(1 to 4).count()
+ sc.cancelAllJobs()
+ sc.stop()
+ // call stop second time
+ sc.stop()
+ }
+ }
+
}