diff options
7 files changed, 61 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c46f84de84..243d8edb72 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -88,6 +88,8 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage + case class ApplicationFinished(id: String) + // Worker internal case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders @@ -175,4 +177,5 @@ private[deploy] object DeployMessages { // Liveness checks in various places case object SendHeartbeat + } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 76b870a304..aeb15adb9a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -689,6 +689,11 @@ private[spark] class Master( } persistenceEngine.removeApplication(app) schedule() + + // Tell all workers that the application has finished, so they can clean up any app state. + workers.foreach { w => + w.actor ! ApplicationFinished(app.id) + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index f4fedc6327..acbdf0d8bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -47,6 +47,7 @@ private[spark] class ExecutorRunner( val executorDir: File, val workerUrl: String, val conf: SparkConf, + val appLocalDirs: Seq[String], var state: ExecutorState.Value) extends Logging { @@ -77,7 +78,7 @@ private[spark] class ExecutorRunner( /** * Kill executor process, wait for exit and notify worker to update resource status. * - * @param message the exception message which caused the executor's death + * @param message the exception message which caused the executor's death */ private def killProcess(message: Option[String]) { var exitCode: Option[Int] = None @@ -129,6 +130,7 @@ private[spark] class ExecutorRunner( logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) builder.directory(executorDir) + builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(",")) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 6863b62551..86a87ec222 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat import java.util.{UUID, Date} import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap, HashSet} import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random @@ -109,6 +109,8 @@ private[spark] class Worker( val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] + val appDirectories = new HashMap[String, Seq[String]] + val finishedApps = new HashSet[String] // The shuffle service is not actually started unless configured. val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) @@ -294,7 +296,7 @@ private[spark] class Worker( val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) - }.foreach { dir => + }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) } @@ -339,8 +341,19 @@ private[spark] class Worker( throw new IOException("Failed to create directory " + executorDir) } + // Create local dirs for the executor. These are passed to the executor via the + // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the + // application finishes. + val appLocalDirs = appDirectories.get(appId).getOrElse { + Utils.getOrCreateLocalRootDirs(conf).map { dir => + Utils.createDirectory(dir).getAbsolutePath() + }.toSeq + } + appDirectories(appId) = appLocalDirs + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING) + self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, + ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -377,6 +390,7 @@ private[spark] class Worker( message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) } + maybeCleanupApplication(appId) } case KillExecutor(masterUrl, appId, execId) => @@ -446,6 +460,9 @@ private[spark] class Worker( case ReregisterWithMaster => reregisterWithMaster() + case ApplicationFinished(id) => + finishedApps += id + maybeCleanupApplication(id) } private def masterDisconnected() { @@ -454,6 +471,19 @@ private[spark] class Worker( registerWithMaster() } + private def maybeCleanupApplication(id: String): Unit = { + val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id) + if (shouldCleanup) { + finishedApps -= id + appDirectories.remove(id).foreach { dirList => + logInfo(s"Cleaning up local directories for application $id") + dirList.foreach { dir => + Utils.deleteRecursively(new File(dir)) + } + } + } + } + def generateWorkerId(): String = { "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 91f5d6427a..94632844a1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -246,8 +246,11 @@ private[spark] object Utils extends Logging { retval } - /** Create a temporary directory inside the given parent directory */ - def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { + /** + * Create a directory inside the given parent directory. The directory is guaranteed to be + * newly created, and is not marked for automatic deletion. + */ + def createDirectory(root: String): File = { var attempts = 0 val maxAttempts = 10 var dir: File = null @@ -265,6 +268,15 @@ private[spark] object Utils extends Logging { } catch { case e: SecurityException => dir = null; } } + dir + } + + /** + * Create a temporary directory inside the given parent directory. The directory will be + * automatically deleted when the VM shuts down. + */ + def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { + val dir = createDirectory(root) registerShutdownDeleteDir(dir) dir } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 3f1cd0752e..aa65f7e891 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite { def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", new File("sparkHome"), new File("workDir"), "akka://worker", - new SparkConf, ExecutorState.RUNNING) + new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } def createDriverRunner(): DriverRunner = { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 1962170629..6f233d7cf9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite { val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", - new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) + new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), + ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) assert(builder.command().last === appId) } |