aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxiaojian.fxj <xiaojian.fxj@alibaba-inc.com>2017-03-12 10:29:00 -0700
committerShixiong Zhu <shixiong@databricks.com>2017-03-12 10:29:00 -0700
commit2f5187bde1544c452fe5116a2bd243653332a079 (patch)
tree5c3d32d0b6e49cbca18499e5444d3f1015293dad
parente29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d (diff)
downloadspark-2f5187bde1544c452fe5116a2bd243653332a079.tar.gz
spark-2f5187bde1544c452fe5116a2bd243653332a079.tar.bz2
spark-2f5187bde1544c452fe5116a2bd243653332a079.zip
[SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block
Cleaning the application may cost much time at worker, then it will block that the worker send heartbeats master because the worker is extend ThreadSafeRpcEndpoint. If the heartbeat from a worker is blocked by the message ApplicationFinished, master will think the worker is dead. If the worker has a driver, the driver will be scheduled by master again. It had better reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block. Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com> Closes #17189 from hustfxj/worker-hearbeat.
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala17
1 files changed, 11 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e48817ebba..00b9d1af37 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,8 +62,8 @@ private[deploy] class Worker(
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
- // A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
- // methods.
+ // A separated thread to clean up the workDir and the directories of finished applications.
+ // Used to provide the implicit parameter of `Future` methods.
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
@@ -578,10 +578,15 @@ private[deploy] class Worker(
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach { dirList =>
- logInfo(s"Cleaning up local directories for application $id")
- dirList.foreach { dir =>
- Utils.deleteRecursively(new File(dir))
- }
+ concurrent.Future {
+ logInfo(s"Cleaning up local directories for application $id")
+ dirList.foreach { dir =>
+ Utils.deleteRecursively(new File(dir))
+ }
+ }(cleanupThreadExecutor).onFailure {
+ case e: Throwable =>
+ logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+ }(cleanupThreadExecutor)
}
shuffleService.applicationRemoved(id)
}