aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSephiroth-Lin <linwzhong@gmail.com>2015-02-10 23:23:35 +0000
committerSean Owen <sowen@cloudera.com>2015-02-10 23:23:35 +0000
commit52983d7f4f1a155433b6df3687cf5dc71804cfd5 (patch)
tree1c662d3a2647d0a52eae77264bb2525f64ffc030
parent5820961289eb98e45eb467efa316c7592b8d619c (diff)
downloadspark-52983d7f4f1a155433b6df3687cf5dc71804cfd5.tar.gz
spark-52983d7f4f1a155433b6df3687cf5dc71804cfd5.tar.bz2
spark-52983d7f4f1a155433b6df3687cf5dc71804cfd5.zip
[SPARK-5644] [Core]Delete tmp dir when sc is stop
When we run driver as a service, and for each time we run job we only call sc.stop, then will not delete tmp dir create by HttpFileServer and SparkEnv, it will be deleted until the service process exit, so we need to delete these tmp dirs when sc is stop directly. Author: Sephiroth-Lin <linwzhong@gmail.com> Closes #4412 from Sephiroth-Lin/bug-fix-master-01 and squashes the following commits: fbbc785 [Sephiroth-Lin] using an interpolated string b968e14 [Sephiroth-Lin] using an interpolated string 4edf394 [Sephiroth-Lin] rename the variable and update comment 1339c96 [Sephiroth-Lin] add a member to store the reference of tmp dir b2018a5 [Sephiroth-Lin] check sparkFilesDir before delete f48a3c6 [Sephiroth-Lin] don't check sparkFilesDir, check executorId dd9686e [Sephiroth-Lin] format code b38e0f0 [Sephiroth-Lin] add dir check before delete d7ccc64 [Sephiroth-Lin] Change log level 1d70926 [Sephiroth-Lin] update comment e2a2b1b [Sephiroth-Lin] update comment aeac518 [Sephiroth-Lin] Delete tmp dir when sc is stop c0d5b28 [Sephiroth-Lin] Delete tmp dir when sc is stop
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala29
2 files changed, 37 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 3f33332a81..7e706bcc42 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -50,6 +50,15 @@ private[spark] class HttpFileServer(
def stop() {
httpServer.stop()
+
+ // If we only stop sc, but the driver process still run as a services then we need to delete
+ // the tmp dir, if not, it will create too many tmp dirs
+ try {
+ Utils.deleteRecursively(baseDir)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
+ }
}
def addFile(file: File) : String = {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f25db7f8de..b63bea5b10 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -76,6 +76,8 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
+ private var driverTmpDirToDelete: Option[String] = None
+
private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
@@ -93,6 +95,22 @@ class SparkEnv (
// actorSystem.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
+
+ // If we only stop sc, but the driver process still run as a services then we need to delete
+ // the tmp dir, if not, it will create too many tmp dirs.
+ // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
+ // current working dir in executor which we do not need to delete.
+ driverTmpDirToDelete match {
+ case Some(path) => {
+ try {
+ Utils.deleteRecursively(new File(path))
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while deleting Spark temp dir: $path", e)
+ }
+ }
+ case None => // We just need to delete tmp dir created by driver, so do nothing on executor
+ }
}
private[spark]
@@ -350,7 +368,7 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}
- new SparkEnv(
+ val envInstance = new SparkEnv(
executorId,
actorSystem,
serializer,
@@ -367,6 +385,15 @@ object SparkEnv extends Logging {
metricsSystem,
shuffleMemoryManager,
conf)
+
+ // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
+ // called, and we only need to do it for driver. Because driver may run as a service, and if we
+ // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
+ if (isDriver) {
+ envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
+ }
+
+ envInstance
}
/**