aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala4
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
7 files changed, 61 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c46f84de84..243d8edb72 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -88,6 +88,8 @@ private[deploy] object DeployMessages {
case class KillDriver(driverId: String) extends DeployMessage
+ case class ApplicationFinished(id: String)
+
// Worker internal
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
@@ -175,4 +177,5 @@ private[deploy] object DeployMessages {
// Liveness checks in various places
case object SendHeartbeat
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 76b870a304..aeb15adb9a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -689,6 +689,11 @@ private[spark] class Master(
}
persistenceEngine.removeApplication(app)
schedule()
+
+ // Tell all workers that the application has finished, so they can clean up any app state.
+ workers.foreach { w =>
+ w.actor ! ApplicationFinished(app.id)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f4fedc6327..acbdf0d8bd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -47,6 +47,7 @@ private[spark] class ExecutorRunner(
val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
+ val appLocalDirs: Seq[String],
var state: ExecutorState.Value)
extends Logging {
@@ -77,7 +78,7 @@ private[spark] class ExecutorRunner(
/**
* Kill executor process, wait for exit and notify worker to update resource status.
*
- * @param message the exception message which caused the executor's death
+ * @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
var exitCode: Option[Int] = None
@@ -129,6 +130,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
+ builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 6863b62551..86a87ec222 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
import java.util.{UUID, Date}
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
@@ -109,6 +109,8 @@ private[spark] class Worker(
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val finishedDrivers = new HashMap[String, DriverRunner]
+ val appDirectories = new HashMap[String, Seq[String]]
+ val finishedApps = new HashSet[String]
// The shuffle service is not actually started unless configured.
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
@@ -294,7 +296,7 @@ private[spark] class Worker(
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
- }.foreach { dir =>
+ }.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
@@ -339,8 +341,19 @@ private[spark] class Worker(
throw new IOException("Failed to create directory " + executorDir)
}
+ // Create local dirs for the executor. These are passed to the executor via the
+ // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
+ // application finishes.
+ val appLocalDirs = appDirectories.get(appId).getOrElse {
+ Utils.getOrCreateLocalRootDirs(conf).map { dir =>
+ Utils.createDirectory(dir).getAbsolutePath()
+ }.toSeq
+ }
+ appDirectories(appId) = appLocalDirs
+
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
+ self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
+ ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -377,6 +390,7 @@ private[spark] class Worker(
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
+ maybeCleanupApplication(appId)
}
case KillExecutor(masterUrl, appId, execId) =>
@@ -446,6 +460,9 @@ private[spark] class Worker(
case ReregisterWithMaster =>
reregisterWithMaster()
+ case ApplicationFinished(id) =>
+ finishedApps += id
+ maybeCleanupApplication(id)
}
private def masterDisconnected() {
@@ -454,6 +471,19 @@ private[spark] class Worker(
registerWithMaster()
}
+ private def maybeCleanupApplication(id: String): Unit = {
+ val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
+ if (shouldCleanup) {
+ finishedApps -= id
+ appDirectories.remove(id).foreach { dirList =>
+ logInfo(s"Cleaning up local directories for application $id")
+ dirList.foreach { dir =>
+ Utils.deleteRecursively(new File(dir))
+ }
+ }
+ }
+ }
+
def generateWorkerId(): String = {
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 91f5d6427a..94632844a1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -246,8 +246,11 @@ private[spark] object Utils extends Logging {
retval
}
- /** Create a temporary directory inside the given parent directory */
- def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+ /**
+ * Create a directory inside the given parent directory. The directory is guaranteed to be
+ * newly created, and is not marked for automatic deletion.
+ */
+ def createDirectory(root: String): File = {
var attempts = 0
val maxAttempts = 10
var dir: File = null
@@ -265,6 +268,15 @@ private[spark] object Utils extends Logging {
} catch { case e: SecurityException => dir = null; }
}
+ dir
+ }
+
+ /**
+ * Create a temporary directory inside the given parent directory. The directory will be
+ * automatically deleted when the VM shuts down.
+ */
+ def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+ val dir = createDirectory(root)
registerShutdownDeleteDir(dir)
dir
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 3f1cd0752e..aa65f7e891 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"), "akka://worker",
- new SparkConf, ExecutorState.RUNNING)
+ new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 1962170629..6f233d7cf9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
- new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
+ new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
+ ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
}