aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlinweizhong <linweizhong@huawei.com>2015-06-08 09:34:16 +0100
committerSean Owen <sowen@cloudera.com>2015-06-08 09:34:16 +0100
commiteacd4a929bf5d697c33b1b705dcf958651cd20f4 (patch)
tree13d04f56d910150b32df30efc7999ca2f0831730
parent10fc2f6f51819f263eec941bdc1db22c554f9118 (diff)
downloadspark-eacd4a929bf5d697c33b1b705dcf958651cd20f4.tar.gz
spark-eacd4a929bf5d697c33b1b705dcf958651cd20f4.tar.bz2
spark-eacd4a929bf5d697c33b1b705dcf958651cd20f4.zip
[SPARK-7705] [YARN] Cleanup of .sparkStaging directory fails if application is killed
As I have tested, if we cancel or kill the app then the final status may be undefined, killed or succeeded, so clean up staging directory when appMaster exit at any final application status. Author: linweizhong <linweizhong@huawei.com> Closes #6409 from Sephiroth-Lin/SPARK-7705 and squashes the following commits: 3a5a0a5 [linweizhong] Update 83dc274 [linweizhong] Update 923d44d [linweizhong] Update 0dd7c2d [linweizhong] Update b76a102 [linweizhong] Update code style 7846b69 [linweizhong] Update bd6cf0d [linweizhong] Refactor aed9f18 [linweizhong] Clean up stagingDir when launch app on yarn 95595c3 [linweizhong] Cleanup of .sparkStaging directory when AppMaster exit at any final application status
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala34
1 files changed, 21 insertions, 13 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 234051eb7d..f4d43214b0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -121,25 +121,32 @@ private[spark] class Client(
} catch {
case e: Throwable =>
if (appId != null) {
- val appStagingDir = getAppStagingDir(appId)
- try {
- val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
- val stagingDirPath = new Path(appStagingDir)
- val fs = FileSystem.get(hadoopConf)
- if (!preserveFiles && fs.exists(stagingDirPath)) {
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
- }
- } catch {
- case ioe: IOException =>
- logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
- }
+ cleanupStagingDir(appId)
}
throw e
}
}
/**
+ * Cleanup application staging directory.
+ */
+ private def cleanupStagingDir(appId: ApplicationId): Unit = {
+ val appStagingDir = getAppStagingDir(appId)
+ try {
+ val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
+ val stagingDirPath = new Path(appStagingDir)
+ val fs = FileSystem.get(hadoopConf)
+ if (!preserveFiles && fs.exists(stagingDirPath)) {
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case ioe: IOException =>
+ logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
+ }
+ }
+
+ /**
* Set up the context for submitting our ApplicationMaster.
* This uses the YarnClientApplication not available in the Yarn alpha API.
*/
@@ -782,6 +789,7 @@ private[spark] class Client(
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
+ cleanupStagingDir(appId)
return (state, report.getFinalApplicationStatus)
}