aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-12-23 12:02:08 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-23 12:02:25 -0800
commitec11ffddbb4074f42745ebd76cd8f32abeb4b3ce (patch)
tree7bc3983dc0761275bb3a80c5533cd5efaaf33be1 /core
parent9fb86b80a20807a0c797050aebb098f94a12e5ea (diff)
downloadspark-ec11ffddbb4074f42745ebd76cd8f32abeb4b3ce.tar.gz
spark-ec11ffddbb4074f42745ebd76cd8f32abeb4b3ce.tar.bz2
spark-ec11ffddbb4074f42745ebd76cd8f32abeb4b3ce.zip
[SPARK-4834] [standalone] Clean up application files after app finishes.
Commit 7aacb7bfa added support for sharing downloaded files among multiple executors of the same app. That works great in Yarn, since the app's directory is cleaned up after the app is done. But Spark standalone mode didn't do that, so the lock/cache files created by that change were left around and could eventually fill up the disk hosting /tmp. To solve that, create app-specific directories under the local dirs when launching executors. Multiple executors launched by the same Worker will use the same app directories, so they should be able to share the downloaded files. When the application finishes, a new message is sent to all workers telling them the application has finished; once that message has been received, and all executors registered for the application shut down, then those directories will be cleaned up by the Worker. Note: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after `sc.stop()` is called for the clean up protocol to take effect. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3705 from vanzin/SPARK-4834 and squashes the following commits: b430534 [Marcelo Vanzin] Remove seemingly unnecessary synchronization. 50eb4b9 [Marcelo Vanzin] Review feedback. c0e5ea5 [Marcelo Vanzin] [SPARK-4834] [standalone] Clean up application files after app finishes. (cherry picked from commit dd155369a04d7dfbf6a5745cbb243e22218367dc) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala4
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
7 files changed, 61 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c46f84de84..243d8edb72 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -88,6 +88,8 @@ private[deploy] object DeployMessages {
case class KillDriver(driverId: String) extends DeployMessage
+ case class ApplicationFinished(id: String)
+
// Worker internal
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
@@ -175,4 +177,5 @@ private[deploy] object DeployMessages {
// Liveness checks in various places
case object SendHeartbeat
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 76b870a304..aeb15adb9a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -689,6 +689,11 @@ private[spark] class Master(
}
persistenceEngine.removeApplication(app)
schedule()
+
+ // Tell all workers that the application has finished, so they can clean up any app state.
+ workers.foreach { w =>
+ w.actor ! ApplicationFinished(app.id)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f4fedc6327..acbdf0d8bd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -47,6 +47,7 @@ private[spark] class ExecutorRunner(
val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
+ val appLocalDirs: Seq[String],
var state: ExecutorState.Value)
extends Logging {
@@ -77,7 +78,7 @@ private[spark] class ExecutorRunner(
/**
* Kill executor process, wait for exit and notify worker to update resource status.
*
- * @param message the exception message which caused the executor's death
+ * @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
var exitCode: Option[Int] = None
@@ -129,6 +130,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
+ builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 6863b62551..86a87ec222 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
import java.util.{UUID, Date}
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
@@ -109,6 +109,8 @@ private[spark] class Worker(
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val finishedDrivers = new HashMap[String, DriverRunner]
+ val appDirectories = new HashMap[String, Seq[String]]
+ val finishedApps = new HashSet[String]
// The shuffle service is not actually started unless configured.
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
@@ -294,7 +296,7 @@ private[spark] class Worker(
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
- }.foreach { dir =>
+ }.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
@@ -339,8 +341,19 @@ private[spark] class Worker(
throw new IOException("Failed to create directory " + executorDir)
}
+ // Create local dirs for the executor. These are passed to the executor via the
+ // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
+ // application finishes.
+ val appLocalDirs = appDirectories.get(appId).getOrElse {
+ Utils.getOrCreateLocalRootDirs(conf).map { dir =>
+ Utils.createDirectory(dir).getAbsolutePath()
+ }.toSeq
+ }
+ appDirectories(appId) = appLocalDirs
+
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
+ self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
+ ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -377,6 +390,7 @@ private[spark] class Worker(
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
+ maybeCleanupApplication(appId)
}
case KillExecutor(masterUrl, appId, execId) =>
@@ -446,6 +460,9 @@ private[spark] class Worker(
case ReregisterWithMaster =>
reregisterWithMaster()
+ case ApplicationFinished(id) =>
+ finishedApps += id
+ maybeCleanupApplication(id)
}
private def masterDisconnected() {
@@ -454,6 +471,19 @@ private[spark] class Worker(
registerWithMaster()
}
+ private def maybeCleanupApplication(id: String): Unit = {
+ val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
+ if (shouldCleanup) {
+ finishedApps -= id
+ appDirectories.remove(id).foreach { dirList =>
+ logInfo(s"Cleaning up local directories for application $id")
+ dirList.foreach { dir =>
+ Utils.deleteRecursively(new File(dir))
+ }
+ }
+ }
+ }
+
def generateWorkerId(): String = {
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 91f5d6427a..94632844a1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -246,8 +246,11 @@ private[spark] object Utils extends Logging {
retval
}
- /** Create a temporary directory inside the given parent directory */
- def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+ /**
+ * Create a directory inside the given parent directory. The directory is guaranteed to be
+ * newly created, and is not marked for automatic deletion.
+ */
+ def createDirectory(root: String): File = {
var attempts = 0
val maxAttempts = 10
var dir: File = null
@@ -265,6 +268,15 @@ private[spark] object Utils extends Logging {
} catch { case e: SecurityException => dir = null; }
}
+ dir
+ }
+
+ /**
+ * Create a temporary directory inside the given parent directory. The directory will be
+ * automatically deleted when the VM shuts down.
+ */
+ def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+ val dir = createDirectory(root)
registerShutdownDeleteDir(dir)
dir
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 3f1cd0752e..aa65f7e891 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"), "akka://worker",
- new SparkConf, ExecutorState.RUNNING)
+ new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 1962170629..6f233d7cf9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
- new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
+ new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
+ ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
}