aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala66
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala13
2 files changed, 47 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index b0665570e2..1b133fbdfa 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -22,7 +22,6 @@ import java.net.Socket
import akka.actor.ActorSystem
-import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties
@@ -90,39 +89,42 @@ class SparkEnv (
private var driverTmpDirToDelete: Option[String] = None
private[spark] def stop() {
- isStopped = true
- pythonWorkers.foreach { case(key, worker) => worker.stop() }
- Option(httpFileServer).foreach(_.stop())
- mapOutputTracker.stop()
- shuffleManager.stop()
- broadcastManager.stop()
- blockManager.stop()
- blockManager.master.stop()
- metricsSystem.stop()
- outputCommitCoordinator.stop()
- rpcEnv.shutdown()
-
- // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
- // down, but let's call it anyway in case it gets fixed in a later release
- // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
- // actorSystem.awaitTermination()
-
- // Note that blockTransferService is stopped by BlockManager since it is started by it.
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs.
- // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
- // current working dir in executor which we do not need to delete.
- driverTmpDirToDelete match {
- case Some(path) => {
- try {
- Utils.deleteRecursively(new File(path))
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: $path", e)
+
+ if (!isStopped) {
+ isStopped = true
+ pythonWorkers.values.foreach(_.stop())
+ Option(httpFileServer).foreach(_.stop())
+ mapOutputTracker.stop()
+ shuffleManager.stop()
+ broadcastManager.stop()
+ blockManager.stop()
+ blockManager.master.stop()
+ metricsSystem.stop()
+ outputCommitCoordinator.stop()
+ rpcEnv.shutdown()
+
+ // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
+ // down, but let's call it anyway in case it gets fixed in a later release
+ // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
+ // actorSystem.awaitTermination()
+
+ // Note that blockTransferService is stopped by BlockManager since it is started by it.
+
+ // If we only stop sc, but the driver process still run as a services then we need to delete
+ // the tmp dir, if not, it will create too many tmp dirs.
+ // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
+ // current working dir in executor which we do not need to delete.
+ driverTmpDirToDelete match {
+ case Some(path) => {
+ try {
+ Utils.deleteRecursively(new File(path))
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while deleting Spark temp dir: $path", e)
+ }
}
+ case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
- case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 6838b35ab4..5c57940fa5 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
import scala.concurrent.Await
import scala.concurrent.duration.Duration
+import org.scalatest.Matchers._
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
@@ -272,4 +273,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}
+
+ test("calling multiple sc.stop() must not throw any exception") {
+ noException should be thrownBy {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val cnt = sc.parallelize(1 to 4).count()
+ sc.cancelAllJobs()
+ sc.stop()
+ // call stop second time
+ sc.stop()
+ }
+ }
+
}