aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2014-10-24 08:51:08 -0500
committerThomas Graves <tgraves@apache.org>2014-10-24 08:51:08 -0500
commitd2987e8f7a2cb3bf971f381399d8efdccb51d3d2 (patch)
tree14deba0d436990538c8f75688d711cacc7f2f36b /yarn
parent809c785bcc33e684a68ea14240a466def864199a (diff)
downloadspark-d2987e8f7a2cb3bf971f381399d8efdccb51d3d2.tar.gz
spark-d2987e8f7a2cb3bf971f381399d8efdccb51d3d2.tar.bz2
spark-d2987e8f7a2cb3bf971f381399d8efdccb51d3d2.zip
[SPARK-3900][YARN] ApplicationMaster's shutdown hook fails and IllegalStateException is thrown.
ApplicationMaster registers a shutdown hook and it calls ApplicationMaster#cleanupStagingDir. cleanupStagingDir invokes FileSystem.get(yarnConf) and it invokes FileSystem.getInternal. FileSystem.getInternal also registers shutdown hook. In FileSystem of hadoop 0.23, the shutdown hook registration does not consider whether shutdown is in progress or not (In 2.2, it's considered). // 0.23 if (map.isEmpty() ) { ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); } // 2.2 if (map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) { ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); } Thus, in 0.23, another shutdown hook can be registered when ApplicationMaster's shutdown hook run. This issue cause IllegalStateException as follows. java.lang.IllegalStateException: Shutdown in progress, cannot add a shutdownHook at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:152) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2306) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:162) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$cleanupStagingDir(ApplicationMaster.scala:307) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:118) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2924 from sarutak/SPARK-3900-2 and squashes the following commits: 9112817 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3900-2 97018fa [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3900 2c2850e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3900 ee52db2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3900 a7d6c9b [Kousuke Saruta] Merge branch 'SPARK-3900' of github.com:sarutak/spark into SPARK-3900 1cdf03c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3900 a5f6443 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3900 57b397d [Kousuke Saruta] Fixed IllegalStateException caused by shutdown hook registration in another shutdown hook
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a3c43b4384..e6fe0265d8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -92,6 +92,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
logInfo("ApplicationAttemptId: " + appAttemptId)
+ val fs = FileSystem.get(yarnConf)
val cleanupHook = new Runnable {
override def run() {
// If the SparkContext is still registered, shut it down as a best case effort in case
@@ -115,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
- cleanupStagingDir()
+ cleanupStagingDir(fs)
}
}
}
@@ -303,8 +304,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
/**
* Clean up the staging directory.
*/
- private def cleanupStagingDir() {
- val fs = FileSystem.get(yarnConf)
+ private def cleanupStagingDir(fs: FileSystem) {
var stagingDirPath: Path = null
try {
val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean