aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala4
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala15
-rw-r--r--docs/configuration.md26
5 files changed, 83 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 83ce14a0a8..a7368f9f3d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -86,6 +86,10 @@ private[deploy] object DeployMessages {
case class KillDriver(driverId: String) extends DeployMessage
+ // Worker internal
+
+ case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
+
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8a71ddda4c..bf5a8d09dd 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -64,6 +64,12 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
+ val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
+ // How often worker will clean up old app folders
+ val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
+ // TTL for app folders/data; after TTL expires it will be cleaned up
+ val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
+
// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0
@@ -179,12 +185,28 @@ private[spark] class Worker(
registered = true
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+ if (CLEANUP_ENABLED) {
+ context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
+ CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
+ }
case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
+ case WorkDirCleanup =>
+ // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
+ val cleanupFuture = concurrent.future {
+ logInfo("Cleaning up oldest application directories in " + workDir + " ...")
+ Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
+ .foreach(Utils.deleteRecursively)
+ }
+ cleanupFuture onFailure {
+ case e: Throwable =>
+ logError("App dir cleanup failed: " + e.getMessage, e)
+ }
+
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)
@@ -331,7 +353,6 @@ private[spark] class Worker(
}
private[spark] object Worker {
-
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d3c39dee33..4435b21a75 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
}
if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
- return false;
+ return false
} else {
- return true;
+ return true
+ }
+ }
+
+ /**
+ * Finds all the files in a directory whose last modified time is older than cutoff seconds.
+ * @param dir must be the path to a directory, or IllegalArgumentException is thrown
+ * @param cutoff measured in seconds. Files older than this are returned.
+ */
+ def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+ val currentTimeMillis = System.currentTimeMillis
+ if (dir.isDirectory) {
+ val files = listFilesSafely(dir)
+ files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
+ } else {
+ throw new IllegalArgumentException(dir + " is not a directory!")
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 616214fb5e..eb7fb63182 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
import scala.util.Random
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
+import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.nio.{ByteBuffer, ByteOrder}
import com.google.common.base.Charsets
@@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}
+
+ test("findOldFiles") {
+ // create some temporary directories and files
+ val parent: File = Utils.createTempDir()
+ val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
+ val child2: File = Utils.createTempDir(parent.getCanonicalPath)
+ // set the last modified time of child1 to 10 secs old
+ child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+
+ val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
+ assert(result.size.equals(1))
+ assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index b6005acac8..57bda20edc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -349,6 +349,32 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.worker.cleanup.enabled</td>
+ <td>true</td>
+ <td>
+ Enable periodic cleanup of worker / application directories. Note that this only affects standalone
+ mode, as YARN works differently.
+ </td>
+</tr>
+<tr>
+ <td>spark.worker.cleanup.interval</td>
+ <td>1800 (30 minutes)</td>
+ <td>
+ Controls the interval, in seconds, at which the worker cleans up old application work dirs
+ on the local machine.
+ </td>
+</tr>
+<tr>
+ <td>spark.worker.cleanup.appDataTtl</td>
+ <td>7 * 24 * 3600 (7 days)</td>
+ <td>
+ The number of seconds to retain application work directories on each worker. This is a Time To Live
+ and should depend on the amount of available disk space you have. Application logs and jars are
+ downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space,
+ especially if you run jobs very frequently.
+ </td>
+</tr>
+<tr>
<td>spark.akka.frameSize</td>
<td>10</td>
<td>