aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2014-10-03 14:22:11 -0700
committerAaron Davidson <aaron@databricks.com>2014-10-03 14:22:52 -0700
commitcf1d32e3e1071829b152d4b597bf0a0d7a5629a2 (patch)
tree08b6945cf099f9cdcafdfe3a0614aa5f64e8f960 /core
parent79e45c9323455a51f25ed9acd0edd8682b4bbb88 (diff)
downloadspark-cf1d32e3e1071829b152d4b597bf0a0d7a5629a2.tar.gz
spark-cf1d32e3e1071829b152d4b597bf0a0d7a5629a2.tar.bz2
spark-cf1d32e3e1071829b152d4b597bf0a0d7a5629a2.zip
[SPARK-1860] More conservative app directory cleanup.
First contribution to the project, so apologize for any significant errors. This PR addresses [SPARK-1860]. The application directories are now cleaned up in a more conservative manner. Previously, app-* directories were cleaned up if the directory's timestamp was older than a given time. However, the timestamp on a directory does not reflect the modification times of the files in that directory. Therefore, app-* directories were wiped out even if the files inside them were created recently and possibly being used by Executor tasks. The solution is to change the cleanup logic to inspect all files within the app-* directory and only eliminate the app-* directory if all files in the directory are stale. Author: mcheah <mcheah@palantir.com> Closes #2609 from mccheah/worker-better-app-dir-cleanup and squashes the following commits: 87b5d03 [mcheah] [SPARK-1860] Using more string interpolation. Better error logging. 802473e [mcheah] [SPARK-1860] Cleaning up the logs generated when cleaning directories. e0a1f2e [mcheah] [SPARK-1860] Fixing broken unit test. 77a9de0 [mcheah] [SPARK-1860] More conservative app directory cleanup.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala8
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala23
4 files changed, 62 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 00a43673e5..71650cd773 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
- val workDir: File,
+ val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
var state: ExecutorState.Value)
@@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
*/
def fetchAndRunExecutor() {
try {
- // Create the executor's working directory
- val executorDir = new File(workDir, appId + "/" + execId)
- if (!executorDir.mkdirs()) {
- throw new IOException("Failed to create directory " + executorDir)
- }
-
// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0c454e4138..3b13f43a18 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -18,15 +18,18 @@
package org.apache.spark.deploy.worker
import java.io.File
+import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
+import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.commons.io.FileUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -191,6 +194,7 @@ private[spark] class Worker(
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
+ logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}
@@ -201,10 +205,23 @@ private[spark] class Worker(
case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
- logInfo("Cleaning up oldest application directories in " + workDir + " ...")
- Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
- .foreach(Utils.deleteRecursively)
+ val appDirs = workDir.listFiles()
+ if (appDirs == null) {
+ throw new IOException("ERROR: Failed to list files in " + appDirs)
+ }
+ appDirs.filter { dir =>
+ // the directory is used by an application - check that the application is not running
+ // when cleaning up
+ val appIdFromDir = dir.getName
+ val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
+ dir.isDirectory && !isAppStillRunning &&
+ !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
+ }.foreach { dir =>
+ logInfo(s"Removing directory: ${dir.getPath}")
+ Utils.deleteRecursively(dir)
+ }
}
+
cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
@@ -233,8 +250,15 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+
+ // Create the executor's working directory
+ val executorDir = new File(workDir, appId + "/" + execId)
+ if (!executorDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + executorDir)
+ }
+
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
+ self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -242,12 +266,13 @@ private[spark] class Worker(
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
- logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
- master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+ master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
+ Some(e.toString), None)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9399ddab76..a67124140f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.PropertyConfigurator
@@ -705,17 +707,20 @@ private[spark] object Utils extends Logging {
}
/**
- * Finds all the files in a directory whose last modified time is older than cutoff seconds.
- * @param dir must be the path to a directory, or IllegalArgumentException is thrown
- * @param cutoff measured in seconds. Files older than this are returned.
+ * Determines if a directory contains any files newer than cutoff seconds.
+ *
+ * @param dir must be the path to a directory, or IllegalArgumentException is thrown
+ * @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
*/
- def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+ def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
val currentTimeMillis = System.currentTimeMillis
- if (dir.isDirectory) {
- val files = listFilesSafely(dir)
- files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
+ if (!dir.isDirectory) {
+ throw new IllegalArgumentException (dir + " is not a directory!")
} else {
- throw new IllegalArgumentException(dir + " is not a directory!")
+ val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
+ val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
+ val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
+ newFiles.nonEmpty
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 70d423ba8a..e63d9d085e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
assert(Utils.getIteratorSize(iterator) === 5L)
}
- test("findOldFiles") {
+ test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
- // set the last modified time of child1 to 10 secs old
- child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+ val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+ // set the last modified time of child1 to 30 secs old
+ child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
- val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
- assert(result.size.equals(1))
- assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+ // although child1 is old, child2 is still new so return true
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+ // although parent and its immediate children are new, child3 is still old
+ // we expect a full recursive search for new files.
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+ assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}
test("resolveURI") {