diff options
Diffstat (limited to 'resource-managers')
-rw-r--r-- | resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider | 2 | ||||
-rw-r--r-- | resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala | 2 | ||||
-rw-r--r-- | resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala (renamed from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala) | 55 | ||||
-rw-r--r-- | resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala | 8 | ||||
-rw-r--r-- | resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala (renamed from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala) | 20 |
5 files changed, 48 insertions, 39 deletions
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider index 22ead56d23..f5a807ecac 100644 --- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider @@ -1,3 +1,3 @@ -org.apache.spark.deploy.yarn.security.HDFSCredentialProvider +org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider org.apache.spark.deploy.yarn.security.HBaseCredentialProvider org.apache.spark.deploy.yarn.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala index 933736bd22..4f4be52a0d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils /** * A ConfigurableCredentialManager to manage all the registered credential providers and offer * APIs for other modules to obtain credentials as well as renewal time. By default - * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will + * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]] * interface and put into resources/META-INF/services to be loaded by ServiceLoader. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala index ebb176bc95..b4fb4a790a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala @@ -17,37 +17,40 @@ package org.apache.spark.deploy.yarn.security -import java.io.{ByteArrayInputStream, DataInputStream} - import scala.collection.JavaConverters._ +import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging { +private[security] class HadoopFSCredentialProvider + extends ServiceCredentialProvider with Logging { // Token renewal interval, this value will be set in the first call, - // if None means no token renewer specified, so cannot get token renewal interval. + // if None means no token renewer specified or no token can be renewed, + // so cannot get token renewal interval. private var tokenRenewalInterval: Option[Long] = null - override val serviceName: String = "hdfs" + override val serviceName: String = "hadoopfs" override def obtainCredentials( hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long] = { // NameNode to access, used to get tokens from different FileSystems + val tmpCreds = new Credentials() + val tokenRenewer = getTokenRenewer(hadoopConf) nnsToAccess(hadoopConf, sparkConf).foreach { dst => val dstFs = dst.getFileSystem(hadoopConf) - logInfo("getting token for namenode: " + dst) - dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds) + logInfo("getting token for: " + dst) + dstFs.addDelegationTokens(tokenRenewer, tmpCreds) } // Get the token renewal interval if it is not set. It will only be called once. @@ -56,15 +59,18 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider } // Get the time of next renewal. - tokenRenewalInterval.map { interval => - creds.getAllTokens.asScala - .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + val nextRenewalDate = tokenRenewalInterval.flatMap { interval => + val nextRenewalDates = tmpCreds.getAllTokens.asScala + .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) .map { t => - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] identifier.getIssueDate + interval - }.foldLeft(0L)(math.max) + } + if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) } + + creds.addAll(tmpCreds) + nextRenewalDate } private def getTokenRenewalInterval( @@ -78,16 +84,19 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider val dstFs = dst.getFileSystem(hadoopConf) dstFs.addDelegationTokens(renewer, creds) } - val hdfsToken = creds.getAllTokens.asScala - .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - hdfsToken.map { t => - val newExpiration = t.renew(hadoopConf) - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal Interval is $interval") - interval + + val renewIntervals = creds.getAllTokens.asScala.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") + interval + }.toOption } + if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala index db4619e80c..b0067aa451 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala @@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit test("Correctly load default credential providers") { credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - credentialManager.getServiceCredentialProvider("hdfs") should not be (None) + credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None) credentialManager.getServiceCredentialProvider("hive") should not be (None) } @@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - credentialManager.getServiceCredentialProvider("hdfs") should not be (None) + credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None) credentialManager.getServiceCredentialProvider("hive") should be (None) } test("using deprecated configurations") { - sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false") + sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false") sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false") credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - credentialManager.getServiceCredentialProvider("hdfs") should be (None) + credentialManager.getServiceCredentialProvider("hadoopfs") should be (None) credentialManager.getServiceCredentialProvider("hive") should be (None) credentialManager.getServiceCredentialProvider("test") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala index 7b2da3f26e..0eb2512723 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala @@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester} import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} -class HDFSCredentialProviderSuite +class HadoopFSCredentialProviderSuite extends SparkFunSuite with PrivateMethodTester with Matchers { private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer) private def getTokenRenewer( - hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = { - hdfsCredentialProvider invokePrivate _getTokenRenewer(conf) + fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = { + fsCredentialProvider invokePrivate _getTokenRenewer(conf) } - private var hdfsCredentialProvider: HDFSCredentialProvider = null + private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null override def beforeAll() { super.beforeAll() - if (hdfsCredentialProvider == null) { - hdfsCredentialProvider = new HDFSCredentialProvider() + if (hadoopFsCredentialProvider == null) { + hadoopFsCredentialProvider = new HadoopFSCredentialProvider() } } override def afterAll() { - if (hdfsCredentialProvider != null) { - hdfsCredentialProvider = null + if (hadoopFsCredentialProvider != null) { + hadoopFsCredentialProvider = null } super.afterAll() @@ -56,7 +56,7 @@ class HDFSCredentialProviderSuite val hadoopConf = new Configuration() hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM") - val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf) + val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf) renewer should be ("yarn/myrm:8032@SPARKTEST.COM") } @@ -64,7 +64,7 @@ class HDFSCredentialProviderSuite val hadoopConf = new Configuration() val caught = intercept[SparkException] { - getTokenRenewer(hdfsCredentialProvider, hadoopConf) + getTokenRenewer(hadoopFsCredentialProvider, hadoopConf) } assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") } |