aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2017-01-11 09:24:02 -0600
committerTom Graves <tgraves@yahoo-inc.com>2017-01-11 09:24:02 -0600
commit4239a1081ad96a503fbf9277e42b97422bb8af3e (patch)
tree7096161bfbf11404ae2cfc13189214a92a5fa833 /resource-managers
parenta6155135690433988aa0cbf22f260f52a235e9f5 (diff)
downloadspark-4239a1081ad96a503fbf9277e42b97422bb8af3e.tar.gz
spark-4239a1081ad96a503fbf9277e42b97422bb8af3e.tar.bz2
spark-4239a1081ad96a503fbf9277e42b97422bb8af3e.zip
[SPARK-19021][YARN] Generailize HDFSCredentialProvider to support non HDFS security filesystems
Currently Spark can only get token renewal interval from security HDFS (hdfs://), if Spark runs with other security file systems like webHDFS (webhdfs://), wasb (wasb://), ADLS, it will ignore these tokens and not get token renewal intervals from these tokens. These will make Spark unable to work with these security clusters. So instead of only checking HDFS token, we should generalize to support different DelegationTokenIdentifier. ## How was this patch tested? Manually verified in security cluster. Author: jerryshao <sshao@hortonworks.com> Closes #16432 from jerryshao/SPARK-19021.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider2
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala2
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala (renamed from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala)55
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala8
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala (renamed from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala)20
5 files changed, 48 insertions, 39 deletions
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
index 22ead56d23..f5a807ecac 100644
--- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -1,3 +1,3 @@
-org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
org.apache.spark.deploy.yarn.security.HiveCredentialProvider
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
index 933736bd22..4f4be52a0d 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
/**
* A ConfigurableCredentialManager to manage all the registered credential providers and offer
* APIs for other modules to obtain credentials as well as renewal time. By default
- * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
+ * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
* be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
* managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
* interface and put into resources/META-INF/services to be loaded by ServiceLoader.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
index ebb176bc95..b4fb4a790a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
@@ -17,37 +17,40 @@
package org.apache.spark.deploy.yarn.security
-import java.io.{ByteArrayInputStream, DataInputStream}
-
import scala.collection.JavaConverters._
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
+private[security] class HadoopFSCredentialProvider
+ extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call,
- // if None means no token renewer specified, so cannot get token renewal interval.
+ // if None means no token renewer specified or no token can be renewed,
+ // so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null
- override val serviceName: String = "hdfs"
+ override val serviceName: String = "hadoopfs"
override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
// NameNode to access, used to get tokens from different FileSystems
+ val tmpCreds = new Credentials()
+ val tokenRenewer = getTokenRenewer(hadoopConf)
nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf)
- logInfo("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
+ logInfo("getting token for: " + dst)
+ dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
}
// Get the token renewal interval if it is not set. It will only be called once.
@@ -56,15 +59,18 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
}
// Get the time of next renewal.
- tokenRenewalInterval.map { interval =>
- creds.getAllTokens.asScala
- .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
+ val nextRenewalDates = tmpCreds.getAllTokens.asScala
+ .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { t =>
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
- }.foldLeft(0L)(math.max)
+ }
+ if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}
+
+ creds.addAll(tmpCreds)
+ nextRenewalDate
}
private def getTokenRenewalInterval(
@@ -78,16 +84,19 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
val dstFs = dst.getFileSystem(hadoopConf)
dstFs.addDelegationTokens(renewer, creds)
}
- val hdfsToken = creds.getAllTokens.asScala
- .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- hdfsToken.map { t =>
- val newExpiration = t.renew(hadoopConf)
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal Interval is $interval")
- interval
+
+ val renewIntervals = creds.getAllTokens.asScala.filter {
+ _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+ }.flatMap { token =>
+ Try {
+ val newExpiration = token.renew(hadoopConf)
+ val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+ val interval = newExpiration - identifier.getIssueDate
+ logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
+ interval
+ }.toOption
}
+ if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
index db4619e80c..b0067aa451 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
test("Correctly load default credential providers") {
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should not be (None)
}
@@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None)
}
test("using deprecated configurations") {
- sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+ sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+ credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None)
credentialManager.getServiceCredentialProvider("test") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
index 7b2da3f26e..0eb2512723 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
@@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-class HDFSCredentialProviderSuite
+class HadoopFSCredentialProviderSuite
extends SparkFunSuite
with PrivateMethodTester
with Matchers {
private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
private def getTokenRenewer(
- hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
- hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+ fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = {
+ fsCredentialProvider invokePrivate _getTokenRenewer(conf)
}
- private var hdfsCredentialProvider: HDFSCredentialProvider = null
+ private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
override def beforeAll() {
super.beforeAll()
- if (hdfsCredentialProvider == null) {
- hdfsCredentialProvider = new HDFSCredentialProvider()
+ if (hadoopFsCredentialProvider == null) {
+ hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
}
}
override def afterAll() {
- if (hdfsCredentialProvider != null) {
- hdfsCredentialProvider = null
+ if (hadoopFsCredentialProvider != null) {
+ hadoopFsCredentialProvider = null
}
super.afterAll()
@@ -56,7 +56,7 @@ class HDFSCredentialProviderSuite
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
}
@@ -64,7 +64,7 @@ class HDFSCredentialProviderSuite
val hadoopConf = new Configuration()
val caught =
intercept[SparkException] {
- getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}