aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala8
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala23
4 files changed, 62 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 00a43673e5..71650cd773 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
- val workDir: File,
+ val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
var state: ExecutorState.Value)
@@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
*/
def fetchAndRunExecutor() {
try {
- // Create the executor's working directory
- val executorDir = new File(workDir, appId + "/" + execId)
- if (!executorDir.mkdirs()) {
- throw new IOException("Failed to create directory " + executorDir)
- }
-
// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0c454e4138..3b13f43a18 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -18,15 +18,18 @@
package org.apache.spark.deploy.worker
import java.io.File
+import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
+import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.commons.io.FileUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -191,6 +194,7 @@ private[spark] class Worker(
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
+ logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}
@@ -201,10 +205,23 @@ private[spark] class Worker(
case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
- logInfo("Cleaning up oldest application directories in " + workDir + " ...")
- Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
- .foreach(Utils.deleteRecursively)
+ val appDirs = workDir.listFiles()
+ if (appDirs == null) {
+ throw new IOException("ERROR: Failed to list files in " + appDirs)
+ }
+ appDirs.filter { dir =>
+ // the directory is used by an application - check that the application is not running
+ // when cleaning up
+ val appIdFromDir = dir.getName
+ val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
+ dir.isDirectory && !isAppStillRunning &&
+ !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
+ }.foreach { dir =>
+ logInfo(s"Removing directory: ${dir.getPath}")
+ Utils.deleteRecursively(dir)
+ }
}
+
cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
@@ -233,8 +250,15 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+
+ // Create the executor's working directory
+ val executorDir = new File(workDir, appId + "/" + execId)
+ if (!executorDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + executorDir)
+ }
+
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
+ self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -242,12 +266,13 @@ private[spark] class Worker(
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
- logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
- master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+ master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
+ Some(e.toString), None)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9399ddab76..a67124140f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.PropertyConfigurator
@@ -705,17 +707,20 @@ private[spark] object Utils extends Logging {
}
/**
- * Finds all the files in a directory whose last modified time is older than cutoff seconds.
- * @param dir must be the path to a directory, or IllegalArgumentException is thrown
- * @param cutoff measured in seconds. Files older than this are returned.
+ * Determines if a directory contains any files newer than cutoff seconds.
+ *
+ * @param dir must be the path to a directory, or IllegalArgumentException is thrown
+ * @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
*/
- def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+ def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
val currentTimeMillis = System.currentTimeMillis
- if (dir.isDirectory) {
- val files = listFilesSafely(dir)
- files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
+ if (!dir.isDirectory) {
+ throw new IllegalArgumentException (dir + " is not a directory!")
} else {
- throw new IllegalArgumentException(dir + " is not a directory!")
+ val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
+ val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
+ val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
+ newFiles.nonEmpty
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 70d423ba8a..e63d9d085e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
assert(Utils.getIteratorSize(iterator) === 5L)
}
- test("findOldFiles") {
+ test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
- // set the last modified time of child1 to 10 secs old
- child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+ val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+ // set the last modified time of child1 to 30 secs old
+ child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
- val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
- assert(result.size.equals(1))
- assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+ // although child1 is old, child2 is still new so return true
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+ // although parent and its immediate children are new, child3 is still old
+ // we expect a full recursive search for new files.
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+ assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}
test("resolveURI") {