diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2015-07-01 23:01:44 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-07-01 23:01:44 -0700 |
commit | 646366b5d2f12e42f8e7287672ba29a8c918a17d (patch) | |
tree | dd7246f9802407aa165b72a672a08f7219d16ae4 | |
parent | 792fcd802c99a0aef2b67d54f0e6e58710e65956 (diff) | |
download | spark-646366b5d2f12e42f8e7287672ba29a8c918a17d.tar.gz spark-646366b5d2f12e42f8e7287672ba29a8c918a17d.tar.bz2 spark-646366b5d2f12e42f8e7287672ba29a8c918a17d.zip |
[SPARK-8688] [YARN] Bug fix: disable the cache fs to gain the HDFS connection.
If `fs.hdfs.impl.disable.cache` was `false`(default), `FileSystem` will use the cached `DFSClient` which use old token.
[AMDelegationTokenRenewer](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala#L196)
```scala
val credentials = UserGroupInformation.getCurrentUser.getCredentials
credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration)
```
Although the `credentials` had the new Token, but it still use the cached client and old token.
So It's better to set the `fs.hdfs.impl.disable.cache` as `true` to avoid token expired.
[Jira](https://issues.apache.org/jira/browse/SPARK-8688)
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes #7069 from SaintBacchus/SPARK-8688 and squashes the following commits:
f94cd0b [huangzhaowei] modify function parameter
8fb9eb9 [huangzhaowei] explicit the comment
0cd55c9 [huangzhaowei] Rename function name to be an accurate one
cf776a1 [huangzhaowei] [SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS connection.
3 files changed, 23 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 7fa75ac8c2..6d14590a1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -334,6 +334,19 @@ class SparkHadoopUtil extends Logging { * Stop the thread that does the delegation token updates. */ private[spark] def stopExecutorDelegationTokenRenewer() {} + + /** + * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism. + * This is to prevent the DFSClient from using an old cached token to connect to the NameNode. + */ + private[spark] def getConfBypassingFSCache( + hadoopConf: Configuration, + scheme: String): Configuration = { + val newConf = new Configuration(hadoopConf) + val confKey = s"fs.${scheme}.impl.disable.cache" + newConf.setBoolean(confKey, true) + newConf + } } object SparkHadoopUtil { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index 77af46c192..56e4741b93 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer( sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5) private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5) + private val freshHadoopConf = + hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme) /** * Schedule a login from the keytab and principal set using the --principal and --keytab @@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer( private def cleanupOldFiles(): Unit = { import scala.concurrent.duration._ try { - val remoteFs = FileSystem.get(hadoopConf) + val remoteFs = FileSystem.get(freshHadoopConf) val credentialsPath = new Path(credentialsFile) val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis hadoopUtil.listFilesSorted( @@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer( // Get a copy of the credentials override def run(): Void = { val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst - hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds) + hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds) null } }) // Add the temp credentials back to the original ones. UserGroupInformation.getCurrentUser.addCredentials(tempCreds) - val remoteFs = FileSystem.get(hadoopConf) + val remoteFs = FileSystem.get(freshHadoopConf) // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file // and update the lastCredentialsFileSuffix. @@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer( val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) logInfo("Writing out delegation tokens to " + tempTokenPath.toString) val credentials = UserGroupInformation.getCurrentUser.getCredentials - credentials.writeTokenStorageFile(tempTokenPath, hadoopConf) + credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf) logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr") remoteFs.rename(tempTokenPath, tokenPath) logInfo("Delegation token file rename complete.") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala index 229c2c4d5e..94feb6393f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala @@ -35,6 +35,9 @@ private[spark] class ExecutorDelegationTokenUpdater( @volatile private var lastCredentialsFileSuffix = 0 private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") + private val freshHadoopConf = + SparkHadoopUtil.get.getConfBypassingFSCache( + hadoopConf, new Path(credentialsFile).toUri.getScheme) private val delegationTokenRenewer = Executors.newSingleThreadScheduledExecutor( @@ -49,7 +52,7 @@ private[spark] class ExecutorDelegationTokenUpdater( def updateCredentialsIfRequired(): Unit = { try { val credentialsFilePath = new Path(credentialsFile) - val remoteFs = FileSystem.get(hadoopConf) + val remoteFs = FileSystem.get(freshHadoopConf) SparkHadoopUtil.get.listFilesSorted( remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) |