aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider2
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala2
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala (renamed from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala)55
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala8
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala (renamed from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala)20
5 files changed, 48 insertions, 39 deletions
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
index 22ead56d23..f5a807ecac 100644
--- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -1,3 +1,3 @@
-org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
org.apache.spark.deploy.yarn.security.HiveCredentialProvider
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
index 933736bd22..4f4be52a0d 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
/**
* A ConfigurableCredentialManager to manage all the registered credential providers and offer
* APIs for other modules to obtain credentials as well as renewal time. By default
- * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
+ * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
* be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
* managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
* interface and put into resources/META-INF/services to be loaded by ServiceLoader.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
index ebb176bc95..b4fb4a790a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
@@ -17,37 +17,40 @@
package org.apache.spark.deploy.yarn.security
-import java.io.{ByteArrayInputStream, DataInputStream}
-
import scala.collection.JavaConverters._
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
+private[security] class HadoopFSCredentialProvider
+ extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call,
- // if None means no token renewer specified, so cannot get token renewal interval.
+ // if None means no token renewer specified or no token can be renewed,
+ // so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null
- override val serviceName: String = "hdfs"
+ override val serviceName: String = "hadoopfs"
override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
// NameNode to access, used to get tokens from different FileSystems
+ val tmpCreds = new Credentials()
+ val tokenRenewer = getTokenRenewer(hadoopConf)
nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf)
- logInfo("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
+ logInfo("getting token for: " + dst)
+ dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
}
// Get the token renewal interval if it is not set. It will only be called once.
@@ -56,15 +59,18 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
}
// Get the time of next renewal.
- tokenRenewalInterval.map { interval =>
- creds.getAllTokens.asScala
- .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
+ val nextRenewalDates = tmpCreds.getAllTokens.asScala
+ .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { t =>
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
- }.foldLeft(0L)(math.max)
+ }
+ if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}
+
+ creds.addAll(tmpCreds)
+ nextRenewalDate
}
private def getTokenRenewalInterval(
@@ -78,16 +84,19 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
val dstFs = dst.getFileSystem(hadoopConf)
dstFs.addDelegationTokens(renewer, creds)
}
- val hdfsToken = creds.getAllTokens.asScala
- .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- hdfsToken.map { t =>
- val newExpiration = t.renew(hadoopConf)
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal Interval is $interval")
- interval
+
+ val renewIntervals = creds.getAllTokens.asScala.filter {
+ _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+ }.flatMap { token =>
+ Try {
+ val newExpiration = token.renew(hadoopConf)
+ val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+ val interval = newExpiration - identifier.getIssueDate
+ logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
+ interval
+ }.toOption
}
+ if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
index db4619e80c..b0067aa451 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
test("Correctly load default credential providers") {
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should not be (None)
}
@@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None)
}
test("using deprecated configurations") {
- sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+ sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+ credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None)
credentialManager.getServiceCredentialProvider("test") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
index 7b2da3f26e..0eb2512723 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
@@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-class HDFSCredentialProviderSuite
+class HadoopFSCredentialProviderSuite
extends SparkFunSuite
with PrivateMethodTester
with Matchers {
private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
private def getTokenRenewer(
- hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
- hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+ fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = {
+ fsCredentialProvider invokePrivate _getTokenRenewer(conf)
}
- private var hdfsCredentialProvider: HDFSCredentialProvider = null
+ private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
override def beforeAll() {
super.beforeAll()
- if (hdfsCredentialProvider == null) {
- hdfsCredentialProvider = new HDFSCredentialProvider()
+ if (hadoopFsCredentialProvider == null) {
+ hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
}
}
override def afterAll() {
- if (hdfsCredentialProvider != null) {
- hdfsCredentialProvider = null
+ if (hadoopFsCredentialProvider != null) {
+ hadoopFsCredentialProvider = null
}
super.afterAll()
@@ -56,7 +56,7 @@ class HDFSCredentialProviderSuite
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
}
@@ -64,7 +64,7 @@ class HDFSCredentialProviderSuite
val hadoopConf = new Configuration()
val caught =
intercept[SparkException] {
- getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}