aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2015-07-01 23:01:44 -0700
committerAndrew Or <andrew@databricks.com>2015-07-01 23:01:44 -0700
commit646366b5d2f12e42f8e7287672ba29a8c918a17d (patch)
treedd7246f9802407aa165b72a672a08f7219d16ae4 /yarn
parent792fcd802c99a0aef2b67d54f0e6e58710e65956 (diff)
downloadspark-646366b5d2f12e42f8e7287672ba29a8c918a17d.tar.gz
spark-646366b5d2f12e42f8e7287672ba29a8c918a17d.tar.bz2
spark-646366b5d2f12e42f8e7287672ba29a8c918a17d.zip
[SPARK-8688] [YARN] Bug fix: disable the cache fs to gain the HDFS connection.
If `fs.hdfs.impl.disable.cache` was `false`(default), `FileSystem` will use the cached `DFSClient` which use old token. [AMDelegationTokenRenewer](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala#L196) ```scala val credentials = UserGroupInformation.getCurrentUser.getCredentials credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration) ``` Although the `credentials` had the new Token, but it still use the cached client and old token. So It's better to set the `fs.hdfs.impl.disable.cache` as `true` to avoid token expired. [Jira](https://issues.apache.org/jira/browse/SPARK-8688) Author: huangzhaowei <carlmartinmax@gmail.com> Closes #7069 from SaintBacchus/SPARK-8688 and squashes the following commits: f94cd0b [huangzhaowei] modify function parameter 8fb9eb9 [huangzhaowei] explicit the comment 0cd55c9 [huangzhaowei] Rename function name to be an accurate one cf776a1 [huangzhaowei] [SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS connection.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala5
2 files changed, 10 insertions, 5 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 77af46c192..56e4741b93 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer(
sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
private val numFilesToKeep =
sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+ private val freshHadoopConf =
+ hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
/**
* Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer(
private def cleanupOldFiles(): Unit = {
import scala.concurrent.duration._
try {
- val remoteFs = FileSystem.get(hadoopConf)
+ val remoteFs = FileSystem.get(freshHadoopConf)
val credentialsPath = new Path(credentialsFile)
val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
hadoopUtil.listFilesSorted(
@@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer(
// Get a copy of the credentials
override def run(): Void = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
- hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
+ hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
null
}
})
// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
- val remoteFs = FileSystem.get(hadoopConf)
+ val remoteFs = FileSystem.get(freshHadoopConf)
// If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
// was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
// and update the lastCredentialsFileSuffix.
@@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer(
val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
val credentials = UserGroupInformation.getCurrentUser.getCredentials
- credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
+ credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
remoteFs.rename(tempTokenPath, tokenPath)
logInfo("Delegation token file rename complete.")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 229c2c4d5e..94feb6393f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -35,6 +35,9 @@ private[spark] class ExecutorDelegationTokenUpdater(
@volatile private var lastCredentialsFileSuffix = 0
private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+ private val freshHadoopConf =
+ SparkHadoopUtil.get.getConfBypassingFSCache(
+ hadoopConf, new Path(credentialsFile).toUri.getScheme)
private val delegationTokenRenewer =
Executors.newSingleThreadScheduledExecutor(
@@ -49,7 +52,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
def updateCredentialsIfRequired(): Unit = {
try {
val credentialsFilePath = new Path(credentialsFile)
- val remoteFs = FileSystem.get(hadoopConf)
+ val remoteFs = FileSystem.get(freshHadoopConf)
SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)