aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala29
2 files changed, 37 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 3f33332a81..7e706bcc42 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -50,6 +50,15 @@ private[spark] class HttpFileServer(
def stop() {
httpServer.stop()
+
+ // If we only stop sc, but the driver process still run as a services then we need to delete
+ // the tmp dir, if not, it will create too many tmp dirs
+ try {
+ Utils.deleteRecursively(baseDir)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
+ }
}
def addFile(file: File) : String = {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f25db7f8de..b63bea5b10 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -76,6 +76,8 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
+ private var driverTmpDirToDelete: Option[String] = None
+
private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
@@ -93,6 +95,22 @@ class SparkEnv (
// actorSystem.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
+
+ // If we only stop sc, but the driver process still run as a services then we need to delete
+ // the tmp dir, if not, it will create too many tmp dirs.
+ // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
+ // current working dir in executor which we do not need to delete.
+ driverTmpDirToDelete match {
+ case Some(path) => {
+ try {
+ Utils.deleteRecursively(new File(path))
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while deleting Spark temp dir: $path", e)
+ }
+ }
+ case None => // We just need to delete tmp dir created by driver, so do nothing on executor
+ }
}
private[spark]
@@ -350,7 +368,7 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}
- new SparkEnv(
+ val envInstance = new SparkEnv(
executorId,
actorSystem,
serializer,
@@ -367,6 +385,15 @@ object SparkEnv extends Logging {
metricsSystem,
shuffleMemoryManager,
conf)
+
+ // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
+ // called, and we only need to do it for driver. Because driver may run as a service, and if we
+ // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
+ if (isDriver) {
+ envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
+ }
+
+ envInstance
}
/**