aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Chan <ev@ooyala.com>2014-04-06 19:17:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-06 19:21:40 -0700
commit1440154c27ca48b5a75103eccc9057286d3f6ca8 (patch)
tree7f4b2fb31c31ba5a457c759b48a884492fe472dd
parent4106558435889261243d186f5f0b51c5f9e98d56 (diff)
downloadspark-1440154c27ca48b5a75103eccc9057286d3f6ca8.tar.gz
spark-1440154c27ca48b5a75103eccc9057286d3f6ca8.tar.bz2
spark-1440154c27ca48b5a75103eccc9057286d3f6ca8.zip
SPARK-1154: Clean up app folders in worker nodes
This is a fix for [SPARK-1154](https://issues.apache.org/jira/browse/SPARK-1154). The issue is that worker nodes fill up with a huge number of app-* folders after some time. This change adds a periodic cleanup task which asynchronously deletes app directories older than a configurable TTL. Two new configuration parameters have been introduced: spark.worker.cleanup_interval spark.worker.app_data_ttl This change does not include moving the downloads of application jars to a location outside of the work directory. We will address that if we have time, but that potentially involves caching so it will come either as part of this PR or a separate PR. Author: Evan Chan <ev@ooyala.com> Author: Kelvin Chu <kelvinkwchu@yahoo.com> Closes #288 from velvia/SPARK-1154-cleanup-app-folders and squashes the following commits: 0689995 [Evan Chan] CR from @aarondav - move config, clarify for standalone mode 9f10d96 [Evan Chan] CR from @pwendell - rename configs and add cleanup.enabled f2f6027 [Evan Chan] CR from @andrewor14 553d8c2 [Kelvin Chu] change the variable name to currentTimeMillis since it actually tracks in seconds 8dc9cb5 [Kelvin Chu] Fixed a bug in Utils.findOldFiles() after merge. cb52f2b [Kelvin Chu] Change the name of findOldestFiles() to findOldFiles() 72f7d2d [Kelvin Chu] Fix a bug of Utils.findOldestFiles(). file.lastModified is returned in milliseconds. ad99955 [Kelvin Chu] Add unit test for Utils.findOldestFiles() dc1a311 [Evan Chan] Don't recompute current time with every new file e3c408e [Evan Chan] Document the two new settings b92752b [Evan Chan] SPARK-1154: Add a periodic task to clean up app directories
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala4
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala15
-rw-r--r--docs/configuration.md26
5 files changed, 83 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 83ce14a0a8..a7368f9f3d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -86,6 +86,10 @@ private[deploy] object DeployMessages {
case class KillDriver(driverId: String) extends DeployMessage
+ // Worker internal
+
+ case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
+
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8a71ddda4c..bf5a8d09dd 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -64,6 +64,12 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
+ val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
+ // How often worker will clean up old app folders
+ val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
+ // TTL for app folders/data; after TTL expires it will be cleaned up
+ val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
+
// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0
@@ -179,12 +185,28 @@ private[spark] class Worker(
registered = true
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+ if (CLEANUP_ENABLED) {
+ context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
+ CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
+ }
case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
+ case WorkDirCleanup =>
+ // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
+ val cleanupFuture = concurrent.future {
+ logInfo("Cleaning up oldest application directories in " + workDir + " ...")
+ Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
+ .foreach(Utils.deleteRecursively)
+ }
+ cleanupFuture onFailure {
+ case e: Throwable =>
+ logError("App dir cleanup failed: " + e.getMessage, e)
+ }
+
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)
@@ -331,7 +353,6 @@ private[spark] class Worker(
}
private[spark] object Worker {
-
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d3c39dee33..4435b21a75 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
}
if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
- return false;
+ return false
} else {
- return true;
+ return true
+ }
+ }
+
+ /**
+ * Finds all the files in a directory whose last modified time is older than cutoff seconds.
+ * @param dir must be the path to a directory, or IllegalArgumentException is thrown
+ * @param cutoff measured in seconds. Files older than this are returned.
+ */
+ def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+ val currentTimeMillis = System.currentTimeMillis
+ if (dir.isDirectory) {
+ val files = listFilesSafely(dir)
+ files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
+ } else {
+ throw new IllegalArgumentException(dir + " is not a directory!")
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 616214fb5e..eb7fb63182 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
import scala.util.Random
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
+import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.nio.{ByteBuffer, ByteOrder}
import com.google.common.base.Charsets
@@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}
+
+ test("findOldFiles") {
+ // create some temporary directories and files
+ val parent: File = Utils.createTempDir()
+ val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
+ val child2: File = Utils.createTempDir(parent.getCanonicalPath)
+ // set the last modified time of child1 to 10 secs old
+ child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+
+ val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
+ assert(result.size.equals(1))
+ assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index b6005acac8..57bda20edc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -349,6 +349,32 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.worker.cleanup.enabled</td>
+ <td>true</td>
+ <td>
+ Enable periodic cleanup of worker / application directories. Note that this only affects standalone
+ mode, as YARN works differently.
+ </td>
+</tr>
+<tr>
+ <td>spark.worker.cleanup.interval</td>
+ <td>1800 (30 minutes)</td>
+ <td>
+ Controls the interval, in seconds, at which the worker cleans up old application work dirs
+ on the local machine.
+ </td>
+</tr>
+<tr>
+ <td>spark.worker.cleanup.appDataTtl</td>
+ <td>7 * 24 * 3600 (7 days)</td>
+ <td>
+ The number of seconds to retain application work directories on each worker. This is a Time To Live
+ and should depend on the amount of available disk space you have. Application logs and jars are
+ downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space,
+ especially if you run jobs very frequently.
+ </td>
+</tr>
+<tr>
<td>spark.akka.frameSize</td>
<td>10</td>
<td>